Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

MB-7341: Check vbucket state before querying kv table

In DGM case, sqlite db file is fragmented and it is time consuming
to even one query to retrieve kv items.
Based on the fact that all items in one kv table share the same
vbucket state value, we can decide to skip the whole kv table
or query it solely based on vbucket id and vbucket state.

Change-Id: If6ec8adf238591fd9f1187e186da78da353a4947
Reviewed-on: http://review.couchbase.org/23205
Reviewed-by: Chisheng Hong <chisheng@couchbase.com>
Tested-by: Bin Cui <bin.cui@gmail.com>
  • Loading branch information...
commit 5ed9a73a16e1201e137935bc080198c8a47873f9 1 parent 4c0ac1a
@bcui6611 bcui6611 authored
Showing with 34 additions and 16 deletions.
  1. +34 −16 pump_mbf.py
View
50 pump_mbf.py
@@ -51,12 +51,8 @@ def __init__(self, opts, spec, source_bucket, source_node,
self.cursor_todo = None
self.cursor_done = False
- self.s = """SELECT vbid, k, flags, exptime, cas, v
- FROM `%%s`.`%%s` as kv,
- `%s`.vbucket_states as vb
- WHERE kv.vbucket = vb.vbid
- AND kv.vb_version = vb.vb_version
- AND vb.state like '%s'"""
+ self.s = """SELECT vbucket, k, flags, exptime, cas, v, vb_version
+ FROM `%s`.`%s`"""
@staticmethod
def can_handle(opts, spec):
@@ -164,7 +160,7 @@ def provide_batch(self):
try:
if self.cursor_todo is None:
- rv, db, attached_dbs, table_dbs = self.connect_db()
+ rv, db, attached_dbs, table_dbs, vbucket_states = self.connect_db()
if rv != 0:
return rv, None
@@ -175,7 +171,6 @@ def provide_batch(self):
db.close()
return "error: no unique vbucket_states table", None
- sql = self.s % (state_db, source_vbucket_state)
kv_names = []
for kv_name, db_name in table_dbs.iteritems():
if (self.opts.id is None and
@@ -192,9 +187,9 @@ def provide_batch(self):
for db_name in sorted(table_dbs[kv_name]):
db_kv_names.append((db_name, kv_name))
- self.cursor_todo = (db, sql, db_kv_names, None)
+ self.cursor_todo = (db, db_kv_names, None, vbucket_states)
- db, sql, db_kv_names, cursor = self.cursor_todo
+ db, db_kv_names, cursor, vbucket_states = self.cursor_todo
if not db:
self.cursor_done = True
self.cursor_todo = None
@@ -211,13 +206,15 @@ def provide_batch(self):
break
db_name, kv_name = db_kv_names.pop()
+ vbucket_id = int(kv_name.split('_')[-1])
+ if not vbucket_states[source_vbucket_state].has_key(vbucket_id):
+ break
+
logging.debug(" MBFSource db/kv table: %s/%s" %
(db_name, kv_name))
-
cursor = db.cursor()
- cursor.execute(sql % (db_name, kv_name))
-
- self.cursor_todo = (db, sql, db_kv_names, cursor)
+ cursor.execute(self.s % (db_name, kv_name))
+ self.cursor_todo = (db, db_kv_names, cursor, vbucket_states)
row = cursor.fetchone()
if row:
@@ -227,17 +224,21 @@ def provide_batch(self):
exp = row[3]
cas = row[4]
val = row[5]
+ version = int(row[6])
if self.skip(key, vbucket_id):
continue
+ if version != vbucket_states[source_vbucket_state][vbucket_id]:
+ continue
+
meta = ''
flg = socket.ntohl(ctypes.c_uint32(flg).value)
batch.append((memcacheConstants.CMD_TAP_MUTATION,
vbucket_id, key, flg, exp, cas, meta, val), len(val))
else:
cursor.close()
- self.cursor_todo = (db, sql, db_kv_names, None)
+ self.cursor_todo = (db, db_kv_names, None, vbucket_states)
break # Close the batch; next pass hits new db_name/kv_name.
except Exception, e:
@@ -275,6 +276,23 @@ def total_msgs(opts, source_bucket, source_node, source_map):
return 0, total
def connect_db(self):
+ #Build vbucket state hash table
+ vbucket_states = defaultdict(dict)
+ sql = """SELECT vbid, vb_version, state FROM vbucket_states"""
+ try:
+ db = sqlite3.connect(self.spec)
+ cur = db.cursor()
+ for row in cur.execute(sql):
+ vbucket_id = int(row[0])
+ vb_version = int(row[1])
+ state = str(row[2])
+ vbucket_states[state][vbucket_id] = vb_version
+ cur.close()
+ db.close()
+ except sqlite3.DatabaseError, e:
+ return "error: no vbucket_states table was found;" + \
+ " check if db files are correct", None, None, None
+
db = sqlite3.connect(':memory:')
logging.debug(" MBFSource connect_db: %s" % self.spec)
@@ -301,4 +319,4 @@ def connect_db(self):
" check if db files are correct", None, None, None
logging.debug(" MBFSource total # tables: %s" % len(table_dbs))
- return 0, db, attached_dbs, table_dbs
+ return 0, db, attached_dbs, table_dbs, vbucket_states
Please sign in to comment.
Something went wrong with that request. Please try again.