Skip to content

Commit

Permalink
more fix for testrunner-ng branch
Browse files Browse the repository at this point in the history
Change-Id: Ied60dfeb7c0136dd13d6b9ec087368bb1e049e7a
Reviewed-on: http://review.couchbase.org/70066
Reviewed-by: Tommie McAfee <tommie@couchbase.com>
Reviewed-by: sandip nandi <sandip.nandi@couchbase.com>
Tested-by: sandip nandi <sandip.nandi@couchbase.com>
  • Loading branch information
sandipnd committed Nov 18, 2016
1 parent 7d46ad0 commit 2511c7d
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 25 deletions.
1 change: 1 addition & 0 deletions lib/memcached/helper/kvstore.py
Expand Up @@ -51,6 +51,7 @@ def key_set(self):
partition = self.cache[itr]["partition"]
valid_keys.extend(partition.valid_key_set())
deleted_keys.extend(partition.deleted_key_set())
self.cache[itr]["lock"].release()
return valid_keys, deleted_keys

def __len__(self):
Expand Down
30 changes: 14 additions & 16 deletions lib/tasks/task.py
Expand Up @@ -730,7 +730,8 @@ def __init__(self, server, bucket, kv_store, batch_size=1, pause_secs=1, timeout
self.timeout = timeout_secs
self.server = server
self.bucket = bucket
self.client = VBucketAwareMemcached(RestConnection(server), bucket)
self.rest = RestConnection(self.server)
self.client = VBucketAwareMemcached(self.rest, self.bucket)

def execute(self, task_manager):
self.start()
Expand Down Expand Up @@ -776,11 +777,14 @@ def _unlocked_create(self, distributed_work, parallelmethod = None):
'''
localgenerator = self.generator
localgenerator.setrange(distributed_work)
self.client = VBucketAwareMemcached(RestConnection(self.server), self.bucket)
if getattr(VBucketAwareMemcached, 'is_mc_bin_client', None):
client = self.client
else:
client = VBucketAwareMemcached(RestConnection(self.server), self.bucket)
while self.has_next():
try:
key, value = localgenerator.next()
self.client.set(key, self.exp, self.flag, value)
client.set(key, self.exp, self.flag, value)
except Exception as e:
raise e

Expand Down Expand Up @@ -857,10 +861,13 @@ def _unlocked_append(self, key, partition, value=None):
def _batch_create(self, distributed_work, parallel_method=None):
localgenerator = self.generator
localgenerator._doc_gen.setrange(distributed_work)
self.client = VBucketAwareMemcached(RestConnection(self.server), self.bucket)
if getattr(VBucketAwareMemcached, 'is_mc_bin_client', None):
client = self.client
else:
client = VBucketAwareMemcached(RestConnection(self.server), self.bucket)
while self.has_next():
key_val = localgenerator.next_batch()
self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=True)
client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=True)

def _batch_update(self, partition_keys_dic, key_val):
self._process_values_for_update(partition_keys_dic, key_val)
Expand All @@ -884,16 +891,6 @@ def _read_batch(self, partition_keys_dic, key_val):
def _process_values_for_create(self, key_val):
for key, value in key_val.items():
key_val[key] = value
'''
try:
json_
value = json.loads(value)
except ValueError:
index = random.choice(range(len(value)))
value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:]
finally:
key_val[key] = value
'''

def _process_values_for_update(self, partition_keys_dic, key_val):
for part, keys in partition_keys_dic.items():
Expand Down Expand Up @@ -951,7 +948,7 @@ def next(self):
function += self.op_type
try:
if self.op_type == "create":
if not self.kv_store:
if self.kv_store is None:
self.load_kvstore_parallel = None
getattr(self, function)(self.generator, self.load_kvstore_parallel)
else:
Expand Down Expand Up @@ -1283,6 +1280,7 @@ def _delete_random_key(self):

self._unlocked_delete(partition, key)


class ValidateDataTask(GenericLoadingTask):
def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, replica_to_read=None):
GenericLoadingTask.__init__(self, server, bucket, kv_store)
Expand Down
2 changes: 1 addition & 1 deletion pytests/view/createdeleteview.py
Expand Up @@ -268,7 +268,7 @@ def _verify_ddoc_data_all_buckets(self):
rest = RestConnection(self.master)
query = {"stale" : "false", "full_set" : "true", "connection_timeout" : 60000}
for bucket, self.ddoc_view_map in self.bucket_ddoc_map.items():
num_items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
num_items = sum([len(kv_store) if kv_store else 0 for kv_store in bucket.kvs.values()])
self.log.info("DDoc Data Validation Started on bucket {0}. Expected Data Items {1}".format(bucket, num_items))
for ddoc_name, view_list in self.ddoc_view_map.items():
for view in view_list:
Expand Down
8 changes: 4 additions & 4 deletions pytests/xdcr/upgradeXDCR.py
Expand Up @@ -228,13 +228,13 @@ def offline_cluster_upgrade(self):
if self.ddocs_src:
for bucket_name in self.buckets_on_src:
bucket = self.src_cluster.get_bucket_by_name(bucket_name)
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()])
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_src, self.src_master)

if self.ddocs_dest:
for bucket_name in self.buckets_on_dest:
bucket = self.dest_cluster.get_bucket_by_name(bucket_name)
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()])
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_dest, self.dest_master)

if float(self.upgrade_versions[0][:3]) == 4.6:
Expand Down Expand Up @@ -388,13 +388,13 @@ def online_cluster_upgrade(self):
if self.ddocs_src:
for bucket_name in self.buckets_on_src:
bucket = self.src_cluster.get_bucket_by_name(bucket_name)
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()])
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_src, self.src_master)

if self.ddocs_dest:
for bucket_name in self.buckets_on_dest:
bucket = self.dest_cluster.get_bucket_by_name(bucket_name)
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()])
expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_dest, self.dest_master)

if float(self.upgrade_versions[0][:3]) == 4.6:
Expand Down
4 changes: 2 additions & 2 deletions pytests/xdcr/xdcrbasetests.py
Expand Up @@ -635,7 +635,7 @@ def adding_back_a_node(self, master, server):
def _get_active_replica_count_from_cluster(self, master):
buckets = self._get_cluster_buckets(master)
for bucket in buckets:
keys_loaded = sum([len(kv_store) for kv_store in bucket.kvs.values()])
keys_loaded = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
self.log.info("Keys loaded into bucket {0}:{1}".format(bucket.name,
keys_loaded))
self.log.info("Stat: vb_active_curr_items = {0}".
Expand Down Expand Up @@ -1345,7 +1345,7 @@ def _verify_item_count(self, master, servers, timeout=120):
buckets = self._get_cluster_buckets(master)
self.assertTrue(buckets, "No buckets received from the server {0} for verification".format(master.ip))
for bucket in buckets:
items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
for stat in ['curr_items', 'vb_active_curr_items']:
stats_tasks.append(self.cluster.async_wait_for_stats(servers, bucket, '',
stat, '==', items))
Expand Down
4 changes: 2 additions & 2 deletions pytests/xdcr/xdcrnewbasetests.py
Expand Up @@ -2130,7 +2130,7 @@ def verify_items_count(self, timeout=300):
buckets = copy.copy(self.get_buckets())

for bucket in buckets:
items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
while True:
try:
active_keys = int(rest.get_active_key_count(bucket.name))
Expand Down Expand Up @@ -2160,7 +2160,7 @@ def verify_items_count(self, timeout=300):

for bucket in buckets:
if len(self.__nodes) > 1:
items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None])
items = items * bucket.numReplicas
else:
items = 0
Expand Down

0 comments on commit 2511c7d

Please sign in to comment.