From 2511c7d1a593ed3b4c60ae2a0844625f5e7e8855 Mon Sep 17 00:00:00 2001 From: sandipnd Date: Thu, 17 Nov 2016 18:32:25 -0800 Subject: [PATCH] more fix for testrunner-ng branch Change-Id: Ied60dfeb7c0136dd13d6b9ec087368bb1e049e7a Reviewed-on: http://review.couchbase.org/70066 Reviewed-by: Tommie McAfee Reviewed-by: sandip nandi Tested-by: sandip nandi --- lib/memcached/helper/kvstore.py | 1 + lib/tasks/task.py | 30 ++++++++++++++---------------- pytests/view/createdeleteview.py | 2 +- pytests/xdcr/upgradeXDCR.py | 8 ++++---- pytests/xdcr/xdcrbasetests.py | 4 ++-- pytests/xdcr/xdcrnewbasetests.py | 4 ++-- 6 files changed, 24 insertions(+), 25 deletions(-) diff --git a/lib/memcached/helper/kvstore.py b/lib/memcached/helper/kvstore.py index 05ae254ee..67e9b18d9 100644 --- a/lib/memcached/helper/kvstore.py +++ b/lib/memcached/helper/kvstore.py @@ -51,6 +51,7 @@ def key_set(self): partition = self.cache[itr]["partition"] valid_keys.extend(partition.valid_key_set()) deleted_keys.extend(partition.deleted_key_set()) + self.cache[itr]["lock"].release() return valid_keys, deleted_keys def __len__(self): diff --git a/lib/tasks/task.py b/lib/tasks/task.py index 24ef5f070..93c97aed4 100644 --- a/lib/tasks/task.py +++ b/lib/tasks/task.py @@ -730,7 +730,8 @@ def __init__(self, server, bucket, kv_store, batch_size=1, pause_secs=1, timeout self.timeout = timeout_secs self.server = server self.bucket = bucket - self.client = VBucketAwareMemcached(RestConnection(server), bucket) + self.rest = RestConnection(self.server) + self.client = VBucketAwareMemcached(self.rest, self.bucket) def execute(self, task_manager): self.start() @@ -776,11 +777,14 @@ def _unlocked_create(self, distributed_work, parallelmethod = None): ''' localgenerator = self.generator localgenerator.setrange(distributed_work) - self.client = VBucketAwareMemcached(RestConnection(self.server), self.bucket) + if getattr(VBucketAwareMemcached, 'is_mc_bin_client', None): + client = self.client + else: + client = VBucketAwareMemcached(RestConnection(self.server), self.bucket) while self.has_next(): try: key, value = localgenerator.next() - self.client.set(key, self.exp, self.flag, value) + client.set(key, self.exp, self.flag, value) except Exception as e: raise e @@ -857,10 +861,13 @@ def _unlocked_append(self, key, partition, value=None): def _batch_create(self, distributed_work, parallel_method=None): localgenerator = self.generator localgenerator._doc_gen.setrange(distributed_work) - self.client = VBucketAwareMemcached(RestConnection(self.server), self.bucket) + if getattr(VBucketAwareMemcached, 'is_mc_bin_client', None): + client = self.client + else: + client = VBucketAwareMemcached(RestConnection(self.server), self.bucket) while self.has_next(): key_val = localgenerator.next_batch() - self.client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=True) + client.setMulti(self.exp, self.flag, key_val, self.pause, self.timeout, parallel=True) def _batch_update(self, partition_keys_dic, key_val): self._process_values_for_update(partition_keys_dic, key_val) @@ -884,16 +891,6 @@ def _read_batch(self, partition_keys_dic, key_val): def _process_values_for_create(self, key_val): for key, value in key_val.items(): key_val[key] = value - ''' - try: - json_ - value = json.loads(value) - except ValueError: - index = random.choice(range(len(value))) - value = value[0:index] + random.choice(string.ascii_uppercase) + value[index + 1:] - finally: - key_val[key] = value - ''' def _process_values_for_update(self, partition_keys_dic, key_val): for part, keys in partition_keys_dic.items(): @@ -951,7 +948,7 @@ def next(self): function += self.op_type try: if self.op_type == "create": - if not self.kv_store: + if self.kv_store is None: self.load_kvstore_parallel = None getattr(self, function)(self.generator, self.load_kvstore_parallel) else: @@ -1283,6 +1280,7 @@ def _delete_random_key(self): self._unlocked_delete(partition, key) + class ValidateDataTask(GenericLoadingTask): def __init__(self, server, bucket, kv_store, max_verify=None, only_store_hash=True, replica_to_read=None): GenericLoadingTask.__init__(self, server, bucket, kv_store) diff --git a/pytests/view/createdeleteview.py b/pytests/view/createdeleteview.py index 873bc88b6..69a03cc35 100644 --- a/pytests/view/createdeleteview.py +++ b/pytests/view/createdeleteview.py @@ -268,7 +268,7 @@ def _verify_ddoc_data_all_buckets(self): rest = RestConnection(self.master) query = {"stale" : "false", "full_set" : "true", "connection_timeout" : 60000} for bucket, self.ddoc_view_map in self.bucket_ddoc_map.items(): - num_items = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + num_items = sum([len(kv_store) if kv_store else 0 for kv_store in bucket.kvs.values()]) self.log.info("DDoc Data Validation Started on bucket {0}. Expected Data Items {1}".format(bucket, num_items)) for ddoc_name, view_list in self.ddoc_view_map.items(): for view in view_list: diff --git a/pytests/xdcr/upgradeXDCR.py b/pytests/xdcr/upgradeXDCR.py index 3ab57eee3..6355ea9fd 100644 --- a/pytests/xdcr/upgradeXDCR.py +++ b/pytests/xdcr/upgradeXDCR.py @@ -228,13 +228,13 @@ def offline_cluster_upgrade(self): if self.ddocs_src: for bucket_name in self.buckets_on_src: bucket = self.src_cluster.get_bucket_by_name(bucket_name) - expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_src, self.src_master) if self.ddocs_dest: for bucket_name in self.buckets_on_dest: bucket = self.dest_cluster.get_bucket_by_name(bucket_name) - expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_dest, self.dest_master) if float(self.upgrade_versions[0][:3]) == 4.6: @@ -388,13 +388,13 @@ def online_cluster_upgrade(self): if self.ddocs_src: for bucket_name in self.buckets_on_src: bucket = self.src_cluster.get_bucket_by_name(bucket_name) - expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_src, self.src_master) if self.ddocs_dest: for bucket_name in self.buckets_on_dest: bucket = self.dest_cluster.get_bucket_by_name(bucket_name) - expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + expected_rows = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) self._verify_ddocs(expected_rows, [bucket_name], self.ddocs_dest, self.dest_master) if float(self.upgrade_versions[0][:3]) == 4.6: diff --git a/pytests/xdcr/xdcrbasetests.py b/pytests/xdcr/xdcrbasetests.py index 9be851d85..9561e3b34 100644 --- a/pytests/xdcr/xdcrbasetests.py +++ b/pytests/xdcr/xdcrbasetests.py @@ -635,7 +635,7 @@ def adding_back_a_node(self, master, server): def _get_active_replica_count_from_cluster(self, master): buckets = self._get_cluster_buckets(master) for bucket in buckets: - keys_loaded = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + keys_loaded = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) self.log.info("Keys loaded into bucket {0}:{1}".format(bucket.name, keys_loaded)) self.log.info("Stat: vb_active_curr_items = {0}". @@ -1345,7 +1345,7 @@ def _verify_item_count(self, master, servers, timeout=120): buckets = self._get_cluster_buckets(master) self.assertTrue(buckets, "No buckets received from the server {0} for verification".format(master.ip)) for bucket in buckets: - items = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) for stat in ['curr_items', 'vb_active_curr_items']: stats_tasks.append(self.cluster.async_wait_for_stats(servers, bucket, '', stat, '==', items)) diff --git a/pytests/xdcr/xdcrnewbasetests.py b/pytests/xdcr/xdcrnewbasetests.py index 9e8cdd75d..bec5df08b 100644 --- a/pytests/xdcr/xdcrnewbasetests.py +++ b/pytests/xdcr/xdcrnewbasetests.py @@ -2130,7 +2130,7 @@ def verify_items_count(self, timeout=300): buckets = copy.copy(self.get_buckets()) for bucket in buckets: - items = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) while True: try: active_keys = int(rest.get_active_key_count(bucket.name)) @@ -2160,7 +2160,7 @@ def verify_items_count(self, timeout=300): for bucket in buckets: if len(self.__nodes) > 1: - items = sum([len(kv_store) for kv_store in bucket.kvs.values()]) + items = sum([len(kv_store) for kv_store in bucket.kvs.values() if kv_store is not None]) items = items * bucket.numReplicas else: items = 0