Skip to content

Commit

Permalink
Revert removal of dataloading
Browse files Browse the repository at this point in the history
This reverts commit 0316c53.

Change-Id: I5f471820737aed7cc66fcd705e9cbd41cf22d5b1
Reviewed-on: http://review.couchbase.org/69843
Reviewed-by: Tommie McAfee <tommie@couchbase.com>
Tested-by: sandip nandi <sandip.nandi@couchbase.com>
  • Loading branch information
tahmmee committed Nov 14, 2016
1 parent 9f2605f commit 7d46ad0
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 59 deletions.
8 changes: 8 additions & 0 deletions lib/couchbase_helper/documentgenerator.py
Expand Up @@ -13,6 +13,14 @@ def __init__(self, name, start, end):
self.end = end
self.current = start
self.itr = start

def setrange(self, args):
self.itr = args['start']
self.end = args['end']

def getrange(self):
return self.start, self.end

def has_next(self):
return self.itr < self.end

Expand Down
1 change: 1 addition & 0 deletions lib/memcached/helper/data_helper.py
Expand Up @@ -728,6 +728,7 @@ def _time_to_stop(self):


class VBucketAwareMemcached(object):
is_mc_bin_client = True
def __init__(self, rest, bucket, info=None):
self.log = logger.Logger.get_logger()
self.info = info
Expand Down
14 changes: 6 additions & 8 deletions lib/memcached/helper/kvstore.py
Expand Up @@ -23,16 +23,14 @@ def acquire_partition(self, key):
return partition

def acquire_partitions(self, keys):
self.acquire_lock.acquire()
part_obj_keys = {}
part_obj_keys = collections.defaultdict(list)
for key in keys:
partition = self.cache[self._hash(key)]
partition_obj = partition["partition"]
if partition_obj not in part_obj_keys:
partition["lock"].acquire()
part_obj_keys[partition_obj] = []
part_obj_keys[partition_obj].append(key)
self.acquire_lock.release()
'''
frozenset because dict is mutable , cant be hashed
frozenset converts a dict to immutable object
'''
part_obj_keys[frozenset(partition.items())].append(key)
return part_obj_keys

def acquire_random_partition(self, has_valid=True):
Expand Down
6 changes: 3 additions & 3 deletions lib/tasks/task.py
Expand Up @@ -86,6 +86,8 @@ def create_pool(generator, numworker):
log = logger.Logger.get_logger()
workers = []
num_workers = cpu_count().real
if getattr(VBucketAwareMemcached, 'is_mc_bin_client', None):
num_workers = 1
generator = args[1]
parallelmethod = args[2]
for value in create_pool(generator, num_workers):
Expand Down Expand Up @@ -949,10 +951,8 @@ def next(self):
function += self.op_type
try:
if self.op_type == "create":
print " the load_kvstore_parallel is before ", self.kv_store, self.load_kvstore_parallel
if self.kv_store == None:
if not self.kv_store:
self.load_kvstore_parallel = None
print " the load_kvstore_parallel is", self.kv_store, self.load_kvstore_parallel
getattr(self, function)(self.generator, self.load_kvstore_parallel)
else:
while self.has_next():
Expand Down
13 changes: 7 additions & 6 deletions pytests/backup/backup_base.py
Expand Up @@ -117,9 +117,10 @@ def verify_results(self, server, kv_store=1):
for key in valid_keys:
matchObj = re.search(key_name, key, re.M | re.S) #use regex match to find out keys we need to verify
if matchObj is None:
partition = bucket.kvs[kv_store].acquire_partition(key)
partition.delete(key) #we delete keys whose prefix does not match the value assigned to -k in KVStore
bucket.kvs[kv_store].release_partition(key)
part = bucket.kvs[kv_store].acquire_partition(key)
with Synchronized(dict(part)) as partition:
partition.delete(key) #we delete keys whose prefix does not match the value assigned to -k in KVStore

if single_node_flag is False:
self._verify_all_buckets(server, kv_store, self.wait_timeout * 50, self.max_verify, True, 1)
else:
Expand Down Expand Up @@ -149,8 +150,8 @@ def verify_single_node(self, server, kv_store=1):
sub = which_server.find(":")
which_server_ip = which_server[:sub]
if which_server_ip != server.ip:
partition = bucket.kvs[kv_store].acquire_partition(key)
partition.delete(key)
bucket.kvs[kv_store].release_partition(key)
part = bucket.kvs[kv_store].acquire_partition(key)
with Synchronized(dict(part)) as partition:
partition.delete(key)

self._verify_all_buckets(server, kv_store, self.wait_timeout * 50, self.max_verify, True, 1)
3 changes: 2 additions & 1 deletion pytests/basetestcase.py
Expand Up @@ -137,6 +137,7 @@ def setUp(self):
self.gsi_type = self.input.param("gsi_type", 'forestdb')
self.bucket_size = self.input.param("bucket_size", None)
self.lww = self.input.param("lww", False) # only applies to LWW but is here because the bucket is created here
self.kv_store_required = self.input.param("kv_store_required", 1)
if self.skip_setup_cleanup:
self.buckets = RestConnection(self.master).get_buckets()
return
Expand Down Expand Up @@ -611,7 +612,7 @@ def _verify_stats_all_buckets(self, servers, master=None, timeout=60):
master = self.master
servers = self.get_kv_nodes(servers, master)
for bucket in self.buckets:
items = sum([len(kv_store) for kv_store in bucket.kvs.values()])
items = sum([len(kv_store) if kv_store else 0 for kv_store in bucket.kvs.values()])
if bucket.type == 'memcached':
items_actual = 0
for server in servers:
Expand Down
18 changes: 7 additions & 11 deletions pytests/fts/fts_base.py
Expand Up @@ -3216,22 +3216,18 @@ def generate_random_queries(self, index, num_queries=1, query_type=["match"],
"""
from random_query_generator.rand_query_gen import FTSESQueryGenerator
query_gen = FTSESQueryGenerator(num_queries, query_type=query_type,
seed=seed, dataset=self.dataset,
fields=index.smart_query_fields)
seed=seed, dataset=self.dataset,
fields=index.smart_query_fields)
'''
for fts_query in query_gen.fts_queries:
index.fts_queries.append(
json.loads(json.dumps(fts_query, ensure_ascii=False)))
'''

if self.compare_es:
for es_query in query_gen.es_queries:
# unlike fts, es queries are not nested before sending to fts
# so enclose in query dict here
es_query = {'query': es_query}
self.es.es_queries.append(
json.loads(json.dumps(es_query, ensure_ascii=False)))
return index.fts_queries, self.es.es_queries

return index.fts_queries
return query_gen.fts_queries, query_gen.es_queries

return query_gen.fts_queries

def create_index(self, bucket, index_name, index_params=None,
plan_params=None):
Expand Down
18 changes: 9 additions & 9 deletions pytests/xdcr/xdcrbasetests.py
Expand Up @@ -21,6 +21,7 @@
from couchbase_helper.stats_tools import StatsCommon
from scripts.collect_server_info import cbcollectRunner
from tasks.future import TimeoutError
from memcached.helper.kvstore import Synchronized

from couchbase_helper.documentgenerator import BlobGenerator
from membase.api.exception import ServerUnavailableException, XDCRException
Expand Down Expand Up @@ -1215,19 +1216,18 @@ def __merge_keys(self, kv_store_first, kv_store_second, kvs_num=1):
for key in valid_keys_second:
# replace/add the values for each key in first kvs
if key not in valid_keys_first:
partition1 = kv_store_first[kvs_num].acquire_partition(key)
partition2 = kv_store_second[kvs_num].acquire_partition(key)
key_add = partition2.get_key(key)
partition1.set(key, key_add["value"], key_add["expires"], key_add["flag"])
kv_store_first[kvs_num].release_partition(key)
kv_store_second[kvs_num].release_partition(key)
part1 = kv_store_first[kvs_num].acquire_partition(key)
part2 = kv_store_second[kvs_num].acquire_partition(key)
with Synchronized(dict(part1)) as partition1, Synchronized(dict(part2)) as partition2:
key_add = partition2.get_key(key)
partition1.set(key, key_add["value"], key_add["expires"], key_add["flag"])

for key in deleted_keys_second:
# add deleted keys to first kvs if the where deleted only in second kvs
if key not in deleted_keys_first:
partition1 = kv_store_first[kvs_num].acquire_partition(key)
partition1.delete(key)
kv_store_first[kvs_num].release_partition(key)
part1 = kv_store_first[kvs_num].acquire_partition(key)
with Synchronized(dict(part1)) as partition1:
partition1.delete(key)

def __do_merge_buckets(self, src_master, dest_master, bidirection):
src_buckets = self._get_cluster_buckets(src_master)
Expand Down
41 changes: 20 additions & 21 deletions pytests/xdcr/xdcrnewbasetests.py
Expand Up @@ -20,6 +20,7 @@
from scripts.collect_server_info import cbcollectRunner
from scripts import collect_data_files
from tasks.future import TimeoutError
from memcached.helper.kvstore import Synchronized

from couchbase_helper.documentgenerator import BlobGenerator
from lib.membase.api.exception import XDCRException
Expand Down Expand Up @@ -3061,34 +3062,32 @@ def __merge_keys(
for key in valid_keys_src:
# replace/add the values for each key in src kvs
if key not in deleted_keys_dest:
partition1 = kv_src_bucket[kvs_num].acquire_partition(key)
partition2 = kv_dest_bucket[kvs_num].acquire_partition(key)
part1 = kv_src_bucket[kvs_num].acquire_partition(key)
part2 = kv_dest_bucket[kvs_num].acquire_partition(key)
# In case of lww, if source's key timestamp is lower than
# destination than no need to set.
if self.__lww and partition1.get_timestamp(
key) < partition2.get_timestamp(key):
continue
key_add = partition1.get_key(key)
partition2.set(
key,
key_add["value"],
key_add["expires"],
key_add["flag"])
kv_src_bucket[kvs_num].release_partition(key)
kv_dest_bucket[kvs_num].release_partition(key)
with Synchronized(dict(part1)) as partition1, Synchronized(dict(part2)) as partition2:
if self.__lww and partition1.get_timestamp(
key) < partition2.get_timestamp(key):
continue
key_add = partition1.get_key(key)
partition2.set(
key,
key_add["value"],
key_add["expires"],
key_add["flag"])

for key in deleted_keys_src:
if key not in deleted_keys_dest:
partition1 = kv_src_bucket[kvs_num].acquire_partition(key)
partition2 = kv_dest_bucket[kvs_num].acquire_partition(key)
part1 = kv_src_bucket[kvs_num].acquire_partition(key)
part2 = kv_dest_bucket[kvs_num].acquire_partition(key)
# In case of lww, if source's key timestamp is lower than
# destination than no need to delete.
if self.__lww and partition1.get_timestamp(
key) < partition2.get_timestamp(key):
continue
partition2.delete(key)
kv_src_bucket[kvs_num].release_partition(key)
kv_dest_bucket[kvs_num].release_partition(key)
with Synchronized(dict(part1)) as partition1, Synchronized(dict(part2)) as partition2:
if self.__lww and partition1.get_timestamp(
key) < partition2.get_timestamp(key):
continue
partition2.delete(key)

valid_keys_dest, deleted_keys_dest = kv_dest_bucket[
kvs_num].key_set()
Expand Down

0 comments on commit 7d46ad0

Please sign in to comment.