Permalink
Browse files

Add cbrecovery implementation

  • Loading branch information...
1 parent dda06c1 commit 1252754a01fbfa76563a0ab6fd5aaf11a6b141ee @bcui6611 bcui6611 committed Feb 22, 2013
Showing with 232 additions and 23 deletions.
  1. +130 −0 cbrecovery
  2. +27 −21 pump.py
  3. +69 −1 pump_tap.py
  4. +6 −1 pump_transfer.py
View
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# -*-python-*-
+
+"""
+Recover missing vbuckets from remote cluster due to failed rebalancing operatio
+"""
+
+
+import pump_transfer
+import pump
+import pump_tab
+import sys
+import optparse
+import platform
+import os
+import urllib
+
+class Recovery(pump_transfer.Transfer):
+ """Entry point for 2.0 cbrecovery."""
+
+ def __init__(self):
+ self.name = "cbrecovery"
+ self.source_alias = "cluster to recover from"
+ self.sink_alias = "cluster to recover to"
+ self.vbucket_list = None
+ self.sink_bucket = None
+ self.source_bucket = None
+
+ self.usage = \
+ "%prog [options] source destination\n\n" \
+ "Recover missing vbuckets from remote cluster .\n\n" \
+ "Examples:\n" \
+ " %prog [options] http://SOURCE:8091 http://DEST:8091"
+
+ def main(self, argv):
+
+ err, opts, source, sink = self.opt_parse(argv)
+ if err:
+ return err
+ self.sink_bucket = getattr(opts, "bucket_destination", None)
+ if not self.sink_bucket:
+ return "Should specify destionation bucket for restore"
+ self.source_bucket = getattr(opts, "bucket_source", None)
+ if not self.sink_bucket:
+ return "Should specify source bucket for restore"
+
+ #communicate with destination before
+ err = self.pre_transfer(opts, source, sink)
+ if err:
+ return err
+
+ vbucket_list=[]
+ if self.vbucket_list:
+ for vlist in self.vbucket_list.itervalues():
+ for v in vlist:
+ vbucket_list.append(v)
+ if vbucket_list:
+ args.append("--vbucket-list")
+ args.append(str(vbucket_list))
+
+ #only care about data recovery
+ args.append("--extra")
+ args.append("data_only=1")
+
+ err = pump_transfer.Transfer.main(self, args)
+ if err:
+ return err
+
+ #communicate after
+ return self.post_transfer(opts, source, sink)
+
+ def pre_transfer(self, opts, source, sink):
+
+ host, port, user, pwd, path = \
+ pump.parse_spec(opts, args[1], 8091)
+
+ #retrieve a list of missing vbucket
+ err, conn, response = \
+ pump.rest_request_json(host, int(port), user, pwd,
+ "/pools/default/buckets/%s/beginRecovery" % self.sink_bucket,
+ method='GET', reason='begin_recovery')
+ if err:
+ return err
+ self.vbucket_list = response
+
+ if not self.vblist:
+ return "No missing vbucket retrieved"
+
+ return 0
+
+ def post_transfer(self, opts, source, sink):
+ if not self.sink_bucket:
+ return "Should specify destionation bucket for restore"
+
+ host, port, user, pwd, path = \
+ pump.parse_spec(opts, sink, 8091)
+
+ err, conn = \
+ pump.rest_request(host, int(port), user, pwd,
+ '/pools/default/buckets/%s/endRecovery' % self.sink_bucket,
+ method='POST', reason='set state to active')
+ return err
+
+ def add_parser_options(self, p):
+ p.add_option("-b", "--bucket-source",
+ action="store", type="string", default=None,
+ metavar="default",
+ help="""source bucket to recover from """)
+ p.add_option("-B", "--bucket-destination",
+ action="store", type="string", default=None,
+ metavar="default",
+ help="""destination bucket to recover to """)
+ p.add_option("-U", "--username-destination",
+ action="store", type="string", default=None,
+ help="REST username for destination cluster or server node")
+ p.add_option("-P", "--password-destination",
+ action="store", type="string", default=None,
+ help="REST password for destination cluster or server node")
+
+ Transfer.opt_parser_options_common(self, p)
+
+ def find_handlers(self, opts, source, sink):
+ return pump_tap.TAPDumpSource, pump_tap.TapSink
+
+if __name__ == '__main__':
+ if platform.system() == "Windows":
+ python_lib = os.path.join(os.path.dirname(sys.argv[0]), '..')
+ sys.path.append(python_lib)
+
+ pump_transfer.exit_handler(WorkloadGen().main(sys.argv))
View
48 pump.py
@@ -15,7 +15,7 @@
import zlib
import platform
import subprocess
-import memcacheConstants
+import couchbaseConstants
from cbcollections import defaultdict
from cbqueue import PumpQueue
@@ -120,13 +120,19 @@ def run(self):
key=lambda b: b['name']):
logging.info("bucket: " + source_bucket['name'])
- rv = self.transfer_bucket_msgs(source_bucket, source_map, sink_map)
- if rv != 0:
- return rv
+ if not self.opts.extra.get("design_doc_only", 0):
+ rv = self.transfer_bucket_msgs(source_bucket, source_map, sink_map)
+ if rv != 0:
+ return rv
+ else:
+ sys.stderr.write("transfer design doc only. bucket msgs will be skipped.\n")
- rv = self.transfer_bucket_design(source_bucket, source_map, sink_map)
- if rv != 0:
- return rv
+ if not self.opts.extra.get("data_only", 0):
+ rv = self.transfer_bucket_design(source_bucket, source_map, sink_map)
+ if rv != 0:
+ return rv
+ else:
+ sys.stderr.write("transfer data only. bucket design docs will be skipped.\n")
# TODO: (5) PumpingStation - validate bucket transfers.
@@ -654,7 +660,7 @@ def provide_batch(self):
elif parts[0] == 'set' or parts[0] == 'add':
if len(parts) != 5:
return "error: length of set/add line: " + line, None
- cmd = memcacheConstants.CMD_TAP_MUTATION
+ cmd = couchbaseConstants.CMD_TAP_MUTATION
key = parts[1]
flg = int(parts[2])
exp = int(parts[3])
@@ -675,7 +681,7 @@ def provide_batch(self):
elif parts[0] == 'delete':
if len(parts) != 2:
return "error: length of delete line: " + line, None
- cmd = memcacheConstants.CMD_TAP_DELETE
+ cmd = couchbaseConstants.CMD_TAP_DELETE
key = parts[1]
if not self.skip(key, vbucket_id):
msg = (cmd, vbucket_id, key, 0, 0, 0, '', '')
@@ -747,7 +753,7 @@ def consume_batch_async(self, batch):
continue
try:
- if cmd == memcacheConstants.CMD_TAP_MUTATION:
+ if cmd == couchbaseConstants.CMD_TAP_MUTATION:
if op_mutate:
# <op> <key> <flags> <exptime> <bytes> [noreply]\r\n
stdout.write("%s %s %s %s %s\r\n" %
@@ -756,10 +762,10 @@ def consume_batch_async(self, batch):
stdout.write("\r\n")
elif op == 'get':
stdout.write("get %s\r\n" % (key))
- elif cmd == memcacheConstants.CMD_TAP_DELETE:
+ elif cmd == couchbaseConstants.CMD_TAP_DELETE:
if op_mutate:
stdout.write("delete %s\r\n" % (key))
- elif cmd == memcacheConstants.CMD_GET:
+ elif cmd == couchbaseConstants.CMD_GET:
stdout.write("get %s\r\n" % (key))
else:
return "error: StdOutSink - unknown cmd: " + str(cmd), None
@@ -774,15 +780,15 @@ def consume_batch_async(self, batch):
# --------------------------------------------------
CMD_STR = {
- memcacheConstants.CMD_TAP_CONNECT: "TAP_CONNECT",
- memcacheConstants.CMD_TAP_MUTATION: "TAP_MUTATION",
- memcacheConstants.CMD_TAP_DELETE: "TAP_DELETE",
- memcacheConstants.CMD_TAP_FLUSH: "TAP_FLUSH",
- memcacheConstants.CMD_TAP_OPAQUE: "TAP_OPAQUE",
- memcacheConstants.CMD_TAP_VBUCKET_SET: "TAP_VBUCKET_SET",
- memcacheConstants.CMD_TAP_CHECKPOINT_START: "TAP_CHECKPOINT_START",
- memcacheConstants.CMD_TAP_CHECKPOINT_END: "TAP_CHECKPOINT_END",
- memcacheConstants.CMD_NOOP: "NOOP"
+ couchbaseConstants.CMD_TAP_CONNECT: "TAP_CONNECT",
+ couchbaseConstants.CMD_TAP_MUTATION: "TAP_MUTATION",
+ couchbaseConstants.CMD_TAP_DELETE: "TAP_DELETE",
+ couchbaseConstants.CMD_TAP_FLUSH: "TAP_FLUSH",
+ couchbaseConstants.CMD_TAP_OPAQUE: "TAP_OPAQUE",
+ couchbaseConstants.CMD_TAP_VBUCKET_SET: "TAP_VBUCKET_SET",
+ couchbaseConstants.CMD_TAP_CHECKPOINT_START: "TAP_CHECKPOINT_START",
+ couchbaseConstants.CMD_TAP_CHECKPOINT_END: "TAP_CHECKPOINT_END",
+ couchbaseConstants.CMD_NOOP: "NOOP"
}
def parse_spec(opts, spec, port):
View
@@ -12,6 +12,8 @@
import memcacheConstants
import pump
+import pump_mc
+import pump_cb
# TODO: (1) TAPDumpSource - handle TAP_FLAG_NETWORK_BYTE_ORDER.
@@ -30,6 +32,7 @@ def __init__(self, opts, spec, source_bucket, source_node,
self.num_msg = 0
self.recv_min_bytes = int(getattr(opts, "recv_min_bytes", 4096))
+ self.vbucket_list = getattr(opts, "vbucket_list", None)
@staticmethod
def can_handle(opts, spec):
@@ -245,6 +248,9 @@ def get_tap_conn(self):
if self.tap_conn.tap_fix_flag_byteorder:
tap_opts[memcacheConstants.TAP_FLAG_TAP_FIX_FLAG_BYTEORDER] = ''
+ if self.vbucket_list:
+ tap_opts[coucbaseConstants.TAP_FLAG_LIST_VBUCKETS] = ''
+
ext, val = TAPDumpSource.encode_tap_connect_opts(tap_opts)
self.tap_conn._sendCmd(memcacheConstants.CMD_TAP_CONNECT,
@@ -328,7 +334,7 @@ def recv(self, skt, nbytes, buf):
return joined[:nbytes], joined[nbytes:]
@staticmethod
- def encode_tap_connect_opts(opts, backfill=False):
+ def encode_tap_connect_opts(opts, backfill=False, vblist=None):
header = 0
val = []
@@ -341,6 +347,11 @@ def encode_tap_connect_opts(opts, backfill=False):
if opts[op][2] >= 0:
val.append(struct.pack(">HHQ",
opts[op][0], opts[op][1], opts[op][2]))
+ elif vblist and op == memcacheConstants.TAP_FLAG_LIST_VBUCKETS:
+ val.apend(struct.pack(">H", len(vblist))
+ vblist = vblist[1:-1].split(",")
+ for v in vblist:
+ val.apend(struct.pack(">H", int(v)))
else:
val.append(opts[op])
@@ -371,3 +382,60 @@ def total_msgs(opts, source_bucket, source_node, source_map):
return 0, None
return 0, curr_items[-1]
+
+class TapSink(pump_mc.CBSink):
+ """Smart client sink using tap protocal to couchbase cluster."""
+ def __init__(self, opts, spec, source_bucket, source_node,
+ source_map, sink_map, ctl, cur):
+ super(TapSink, self).__init__(opts, spec, source_bucket, source_node,
+ source_map, sink_map, ctl, cur)
+
+ self.tap_name = "".join(random.sample(string.letters, 16))
+
+ @staticmethod
+ def check_base(opts, spec):
+ #allow destination vbucket state to be anything
+
+ op = getattr(opts, "destination_operation", None)
+ if not op in [None, 'set', 'add', 'get']:
+ return ("error: --destination-operation unsupported value: %s" +
+ "; use set, add, get") % (op)
+
+ return pump.EndPoint.check_base(opts, spec)
+
+ def find_conn(self, mconns, vbucket_id):
+
+ rc, conn = super(TapSink, self).find_conn(mconns, vbucket_id)
+ if rc != 0:
+ return rc, None
+
+ tap_opts = {memcacheConstants.TAP_FLAG_SUPPORT_ACK: ''}
+
+ conn.tap_fix_flag_byteorder = version.split(".") >= ["2", "0", "0"]
+ if self.tap_conn.tap_fix_flag_byteorder:
+ tap_opts[memcacheConstants.TAP_FLAG_TAP_FIX_FLAG_BYTEORDER] = ''
+
+ ext, val = TapSink.encode_tap_connect_opts(tap_opts)
+
+ conn._sendCmd(memcacheConstants.CMD_TAP_CONNECT,
+ self.tap_name, val, 0, ext)
+ return rv, conn
+
+ def send_msgs(self, conn, msgs, operation, vbucket_id=None):
+ rv = super(TapSink, self).sendMsg(conn, msgs, operation, vbucket_id)
+ if rv != 0:
+ return rv
+
+ #send vbucket recovery commit msg
+ host, port, user, pwd, path = \
+ pump.parse_spec(self.opts, self.sink, 8091)
+
+ params={"vbucket", vbucket_id}
+ err, conn = \
+ pump.rest_request(host, int(port), user, pwd,
+ '/pools/default/buckets/%s/commitVbucketRecovery' % self.sink_bucket,
+ method='POST', body=params, reason='notify vbucket recovery done')
+ if err:
+ logging.error("error: fail to notify that vbucket msg transferring is done")
+
+ return rv
View
@@ -143,6 +143,9 @@ def opt_parser_options_common(self, p):
p.add_option("-k", "--key",
action="store", type="string", default=None,
help="""allow only items with keys that match a regexp""")
+ p.add_option("", "--vbucket-list",
+ action="store", type="string", default=None,
+ help="""transfer items from specified vbuckets only""")
p.add_option("-n", "--dry-run",
action="store_true", default=False,
help="""no actual work; just validate parameters, files,
@@ -175,7 +178,9 @@ def opt_extra_defaults(self):
"recv_min_bytes": (4096, "amount of bytes for every recv() call"),
"try_xwm": (1, "1 to first try XXX-WITH-META commands"),
"nmv_retry": (1, "1 to retry after NOT_MY_VBUCKET replies"),
- "rehash": (0, "1 to rehash vbucket id when transfering")
+ "rehash": (0, "1 to rehash vbucket id when transfering"),
+ "data_only": (0, "1 to transfer data only and without design docs"),
+ "design_doc_only": (0, "1 to transfer design docs only and without data")
}
def find_handlers(self, opts, source, sink):

0 comments on commit 1252754

Please sign in to comment.