diff --git a/conf/py-xdcr-elasticsearch.conf b/conf/py-xdcr-elasticsearch.conf index 0326c7651..55239c4a5 100644 --- a/conf/py-xdcr-elasticsearch.conf +++ b/conf/py-xdcr-elasticsearch.conf @@ -1,7 +1,9 @@ -xdcr.esXDCR.ESKVTests: +xdcr.esXDCR.ESTests: test_plugin_connect + ############ KV TESTS ############### + #Load with ops load_with_async_ops,items=10000,rdirection=unidirection @@ -19,11 +21,61 @@ xdcr.esXDCR.ESKVTests: load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10 # multi-bucket - load_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1,dgm_run=True - + load_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1 # multi-bucket mixed workload - load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10,standard_buckets=1,dgm_run=True + load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10,standard_buckets=1 # multi-bucket mixed doc types - test_multi_bucket_doctypes_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1,dgm_run=True + test_multi_bucket_doctypes_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1 + + + ############ ES TOPOLOGY TESTS ############### + + # rebalance in/out swap simple kv + test_topology,items=10000,rdirection=unidirection,es_in=True + test_topology,items=10000,rdirection=unidirection,es_out=True + test_topology,items=10000,rdirection=unidirection,es_swap=True + + # rebalance in/out/swap mixed doc types + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_swap=True + + # rebalance in/out/swap multi-bucket mixed doc types + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_swap=True + + + ############ CB TOPOLOGY TESTS ############### + + # rebalance in/out swap simple kv + test_topology,items=10000,rdirection=unidirection,cb_in=True + test_topology,items=10000,rdirection=unidirection,cb_out=True + test_topology,items=10000,rdirection=unidirection,cb_swap=True + + # rebalance in/out/swap mixed doc types + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_swap=True + # failover + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_swap=True,cb_failover=True + + # rebalance in/out/swap multi-bucket mixed doc types + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True + # failover + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True,cb_failover=True + + + ############ ES+CB TOPOLOGY TESTS ############### + # rebalance in/out/swap multi-bucket mixed doc types + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True,es_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True,es_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_in=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_out=True + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True,es_swap=True + # failover + test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_out=True,cb_failover=True diff --git a/lib/membase/api/esrest_client.py b/lib/membase/api/esrest_client.py index fce541fec..408d898af 100644 --- a/lib/membase/api/esrest_client.py +++ b/lib/membase/api/esrest_client.py @@ -27,8 +27,9 @@ def __init__(self, serverInfo, proto = "http"): self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port) self.capiBaseUrl = self.baseUrl - http_port = str(int(self.port) + 109) - self.conn = pyes.ES((proto,self.ip,http_port)) + self.http_port = str(int(self.port) + 109) + self.proto = proto + self.conn = pyes.ES((self.proto,self.ip,self.http_port)) self.test_params = TestInputSingleton.input def get_index_stats(self): @@ -74,6 +75,10 @@ def delete_index(self, name): return self.conn.indices.exists_index(name) def create_index(self, name): + + if self.conn.indices.exists_index(name): + self.delete_index(name) + self.conn.indices.create_index(name) return self.conn.indices.exists_index(name) @@ -155,10 +160,22 @@ def get_node_params(self, info): # use params from master node return master_node - def all_docs(self, keys_only = False): + def search_term(self, key, indices=["default"]): + result = None + params = {"term":{"_id":key}} + query = pyes.Search(params) + row = self.conn.search(query, indices = indices) + if row.total > 0: + result = row[0] + return result + + def term_exists(self, key, indices=["default"]): + return self.search_term(key, indices = indices) is not None + + def all_docs(self, keys_only = False, indices=["default"],size=10000): query = pyes.Search({'match_all' : {}}) - rows = self.conn.search(query) + rows = self.conn.search(query, indices=indices, size=size) docs = [] for row in rows: @@ -212,7 +229,7 @@ def start_es_node(self, node): # wait for new node tries = 0 - while tries < 5: + while tries < 10: for cluster_node in self.get_nodes(): if cluster_node.ip == node.ip and cluster_node.port == int(node.port): return @@ -343,16 +360,18 @@ def parse_addr(addr): class ESNode(Node): def __init__(self, info): super(ESNode, self).__init__() - self.hostname = info['hostname'] self.key = str(info['key']) self.ip, self.port = parse_addr(info["couchbase_address"]) self.tr_ip, self.tr_port = parse_addr(info["transport_address"]) - self.ht_ip, self.ht_port = parse_addr(info["http_address"]) - name = str(info['name']).replace(' ','') + + if 'http_address' in info: + self.ht_ip, self.ht_port = parse_addr(info["http_address"]) + + # truncate after space, or comma + name = str(info['name'][:info['name'].find(' ')]) + name = name[:name.find(',')] self.id = "es_%s@%s" % (name, self.ip) self.ssh_username = info['ssh_username'] self.ssh_password = info['ssh_password'] self.ssh_key = '' - - diff --git a/pytests/xdcr/esXDCR.py b/pytests/xdcr/esXDCR.py index 33af39bea..4ae027d44 100644 --- a/pytests/xdcr/esXDCR.py +++ b/pytests/xdcr/esXDCR.py @@ -8,15 +8,15 @@ import time #Assumption that at least 2 nodes on every cluster -class ESKVTests(XDCRReplicationBaseTest, ESReplicationBaseTest): +class ESTests(XDCRReplicationBaseTest, ESReplicationBaseTest): def setUp(self): - super(ESKVTests, self).setUp() - self.setup_xd_ref(self) + super(ESTests, self).setUp() + self.setup_es_params(self) self.verify_dest_added() self.setup_doc_gens() def tearDown(self): - super(ESKVTests, self).tearDown() + super(ESTests, self).tearDown() def setup_doc_gens(self): # create json doc generators @@ -59,14 +59,23 @@ def _async_modify_data(self): tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_delete, "delete", 0)) if "read" in self._doc_ops: tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_create, "read", 0)) + return tasks + + def modify_data(self): + tasks = self._async_modify_data() for task in tasks: task.result() + + #overriding xdcr verify results method for specific es verification + def verify_results(self, verify_src=False): + self.verify_es_results(verify_src) + """Testing Unidirectional load( Loading only at source) Verifying whether ES/XDCR replication is successful on subsequent destination clusters.Create/Update/Delete operations are performed based on doc-ops specified by the user. """ def load_with_async_ops(self): self._load_all_buckets(self.src_master, self.gen_create, "create", 0) - self._async_modify_data() + self.modify_data() self.verify_results() def test_plugin_connect(self): @@ -89,9 +98,109 @@ def test_multi_bucket_doctypes_with_async_ops(self): bucket_idx = bucket_idx + 1 for task in tasks: - task.result() + task.result() + + """Test coverage for elasticsearch and couchbase topology changes during data loading""" + def test_topology(self): + + availableESNodes = self.dest_nodes[1:] + availableCBNodes = self.src_nodes[1:] + + if self._es_in: + task = self._async_rebalance(self.dest_nodes, [], availableESNodes) + availableESNodes = [] + + if self._cb_in: + tasks = self._async_rebalance(self.src_nodes, [], availableCBNodes) + [task.result() for task in tasks] + availableCBNodes = [] + + # load data + tasks = \ + self._async_load_all_buckets(self.src_master, self.gen_create, "create", 0) + + + # peform initial rebalances + if self._es_out or self._es_swap: + availableESNodes = self._first_level_rebalance_out(self.dest_nodes, + availableESNodes, + monitor = False) + elif self._es_in: + availableESNodes = self._first_level_rebalance_in(self.dest_nodes, + monitor = False) + + if self._cb_out or self._cb_swap: + availableCBNodes = self._first_level_rebalance_out(self.src_nodes, + availableCBNodes, + self._cb_failover) + elif self._cb_in: + availableCBNodes = self._first_level_rebalance_in(self.src_nodes) + + # wait for initial data loading and load some more data + [task.result() for task in tasks] + tasks = self._async_modify_data() + + # add/remove remaining nodes + if self._es_out or self._es_swap: + self._second_level_rebalance_out(self.dest_nodes, + availableESNodes, + self._es_swap, + monitor = False) + elif self._es_in: + self._second_level_rebalance_in(self.dest_nodes, monitor = False) + + if self._cb_out or self._cb_swap: + self._second_level_rebalance_out(self.src_nodes, + availableCBNodes, + self._cb_swap) + elif self._cb_in: + self._second_level_rebalance_in(self.src_nodes) + + # wait for secondary data loading tasks and verify results + [task.result() for task in tasks] + self.verify_results() - #overriding xdcr verify results method for specific es verification - def verify_results(self, verify_src=False): - self.verify_es_results(verify_src) + def _first_level_rebalance_out(self, param_nodes, + available_nodes, + do_failover = False, + monitor = True): + + nodes_out = available_nodes[:1] + if do_failover: + self._cluster_helper.failover(param_nodes, nodes_out) + + tasks = self._async_rebalance(param_nodes, [], nodes_out) + if monitor: + [task.result() for task in tasks] + return available_nodes[1:] + + def _first_level_rebalance_in(self, param_nodes, + monitor = True): + nodes_in = [] + if len(param_nodes) > 1: + nodes_in = [param_nodes[1]] + tasks = self._async_rebalance(param_nodes, nodes_in, []) + if monitor: + [task.result() for task in tasks] + + return nodes_in + + def _second_level_rebalance_out(self, param_nodes, + available_nodes, + do_swap = False, + monitor = True): + if len(available_nodes) > 1: + nodes_in = [] + if do_swap: + nodes_in = [param_nodes[1]] + tasks = self._async_rebalance(param_nodes, nodes_in, available_nodes) + if monitor: + [task.result() for task in tasks] + + def _second_level_rebalance_in(self, param_nodes, monitor = True): + if len(param_nodes) > 2: + nodes_in = param_nodes[2:] + tasks = self._async_rebalance(param_nodes, nodes_in, []) + if monitor: + [task.result() for task in tasks] diff --git a/pytests/xdcr/esbasetests.py b/pytests/xdcr/esbasetests.py index 3fab310c2..52c147090 100644 --- a/pytests/xdcr/esbasetests.py +++ b/pytests/xdcr/esbasetests.py @@ -1,5 +1,5 @@ from membase.api.rest_client import RestConnection -from memcached.helper.data_helper import MemcachedClientHelper +from memcached.helper.data_helper import VBucketAwareMemcached from mc_bin_client import MemcachedError import time import json @@ -12,12 +12,22 @@ class ESReplicationBaseTest(object): xd_ref = None - def setup_xd_ref(self, xd_ref): - self.xd_ref = self + def setup_es_params(self, xd_ref): - def verify_es_results(self, verify_src = False, verification_count = 1000): + self.xd_ref = xd_ref + self._es_in = xd_ref._input.param("es_in", False) + self._cb_in = xd_ref._input.param("cb_in", False) + self._es_out = xd_ref._input.param("es_out", False) + self._cb_out = xd_ref._input.param("cb_out", False) + self._es_swap = xd_ref._input.param("es_swap", False) + self._cb_swap = xd_ref._input.param("cb_swap", False) + self._cb_failover = xd_ref._input.param("cb_failover", False) + + + def verify_es_results(self, verify_src = False, verification_count = 10000): xd_ref = self.xd_ref + rest = RestConnection(self.src_nodes[0]) # Checking replication at destination clusters dest_key_index = 1 @@ -27,22 +37,25 @@ def verify_es_results(self, verify_src = False, verification_count = 1000): dest_key = xd_ref.ord_keys[dest_key_index] dest_nodes = xd_ref._clusters_dic[dest_key] - self.verify_es_stats(xd_ref.src_nodes, + src_nodes = rest.get_nodes() + self.verify_es_stats(src_nodes, dest_nodes, verify_src, verification_count) dest_key_index += 1 - def verify_es_stats(self, src_nodes, dest_nodes, verify_src = False, verification_count = 1000): + def verify_es_stats(self, src_nodes, dest_nodes, verify_src = False, verification_count = 10000): xd_ref = self.xd_ref + # prepare for verification xd_ref._wait_for_stats_all_buckets(src_nodes) xd_ref._expiry_pager(src_nodes[0]) if verify_src: xd_ref._verify_stats_all_buckets(src_nodes) + self.xd_ref._log.info("Verifing couchbase to elasticsearch replication") self.verify_es_num_docs(src_nodes[0], dest_nodes[0], verification_count = verification_count) if xd_ref._doc_ops is not None: @@ -57,53 +70,93 @@ def verify_es_stats(self, src_nodes, dest_nodes, verify_src = False, verificatio - def verify_es_num_docs(self, src_server, dest_server, kv_store = 1, retry = 5, verification_count = 1000): + def verify_es_num_docs(self, src_server, dest_server, kv_store = 1, retry = 10, verification_count = 10000): cb_rest = RestConnection(src_server) es_rest = RestConnection(dest_server) buckets = self.xd_ref._get_cluster_buckets(src_server) + wait = 20 for bucket in buckets: - cb_valid, cb_deleted = bucket.kvs[kv_store].key_set() + all_cb_docs = cb_rest.all_docs(bucket.name) + cb_valid = [str(row['id']) for row in all_cb_docs['rows']] cb_num_items = cb_rest.get_bucket_stats(bucket.name)['curr_items'] es_num_items = es_rest.get_bucket(bucket.name).stats.itemCount - while retry > 0 and cb_num_items != es_num_items: - self.xd_ref._log.info("elasticsearch items %s, expected: %s....retry" %\ - (es_num_items, cb_num_items)) - time.sleep(20) + _retry = retry + while _retry > 0 and cb_num_items != es_num_items: + self.xd_ref._log.info("elasticsearch items %s, expected: %s....retry after %s seconds" %\ + (es_num_items, cb_num_items, wait)) + time.sleep(wait) + last_es_items = es_num_items es_num_items = es_rest.get_bucket(bucket.name).stats.itemCount + if es_num_items == last_es_items: + _retry = _retry - 1 + # if index doesn't change reduce retry count + elif es_num_items <= last_es_items: + self.xd_ref._log.info("%s items removed from index " % (es_num_items - last_es_items)) + _retry = retry + elif es_num_items >= last_es_items: + self.xd_ref._log.info("%s items added to index" % (es_num_items - last_es_items)) + _retry = retry + + if es_num_items != cb_num_items: self.xd_ref.fail("Error: Couchbase has %s docs, ElasticSearch has %s docs " %\ (cb_num_items, es_num_items)) # query for all es keys - es_valid = es_rest.all_docs(keys_only=True) - for _id in es_valid[:verification_count]: # match at most 1k keys - if _id not in cb_valid: - self.xd_ref.fail("Document %s Missing from ES Index" % _id) + es_valid = es_rest.all_docs(keys_only=True,indices=[bucket.name], size = cb_num_items) + if len(es_valid) != cb_num_items: + self.xd_ref._log.info("WARNING: Couchbase has %s docs, ElasticSearch all_docs returned %s docs " %\ + (cb_num_items, len(es_valid))) + for _id in cb_valid[:verification_count]: # match at most 10k keys + if _id not in es_valid: + # document missing from all_docs query do manual term search + if es_rest.term_exists(_id, indices=[bucket.name]) == False: + self.xd_ref.fail("Document %s Missing from ES Index (%s)" % (_id, bucket.name)) - def _verify_es_revIds(self, src_server, dest_server, kv_store = 1, verification_count = 1000): + self.xd_ref._log.info("Verified couchbase bucket (%s) replicated (%s) docs to elasticSearch with matching keys" %\ + (bucket.name, cb_num_items)) + + + def _verify_es_revIds(self, src_server, dest_server, kv_store = 1, verification_count = 10000): cb_rest = RestConnection(src_server) es_rest = RestConnection(dest_server) buckets = self.xd_ref._get_cluster_buckets(src_server) for bucket in buckets: - all_cb_docs = cb_rest.all_docs(bucket.name) - es_valid = es_rest.all_docs() - # represent doc lists from couchbase and elastic search in following format + # retrieve all docs from couchbase and es + # then represent doc lists from couchbase + # and elastic search in following format # [ (id, rev), (id, rev) ... ] + all_cb_docs = cb_rest.all_docs(bucket.name) cb_id_rev_list = self.get_cb_id_rev_list(all_cb_docs) + + es_valid = es_rest.all_docs(indices=[bucket.name], size = len(cb_id_rev_list)) es_id_rev_list = self.get_es_id_rev_list(es_valid) # verify each (id, rev) pair returned from couchbase exists in elastic search for cb_id_rev_pair in cb_id_rev_list: + try: - es_id_rev_pair = es_id_rev_list[es_id_rev_list.index(cb_id_rev_pair)] + # lookup es document with matching _id and revid + # if no exception thrown then doc was properly indexed + es_list_pos = es_id_rev_list.index(cb_id_rev_pair) + except ValueError: + + # attempt manual lookup by search term + es_doc = es_rest.search_term(cb_id_rev_pair[0], indices = [bucket.name]) + if es_doc is None: + self.xd_ref.fail("Error during verification: %s does not exist in ES index %s" % (cb_id_rev_pair, bucket.name)) + + # compare + es_id_rev_pair = (es_doc['meta']['id'], es_doc['meta']['id']) if cb_id_rev_pair != es_id_rev_pair: self.xd_ref.fail("ES document %s has invalid revid (%s). Couchbase revid (%s). bucket (%s)" %\ - (es_id_rev_pair, cb_id_rev_pair, bucket.name)) - except ValueError: - self.xd_ref.fail("Error during verification: %s does not exist in ES index %s" % (cb_id_rev_pair, bucket.name)) + (es_id_rev_pair, cb_id_rev_pair, bucket.name)) + + self.xd_ref._log.info("Verified doc rev-ids in couchbase bucket (%s) match meta rev-ids elastic search" %\ + (bucket.name)) def get_cb_id_rev_list(self, docs): return [(row['id'],row['value']['rev']) for row in docs['rows']] @@ -112,16 +165,15 @@ def get_es_id_rev_list(self, docs): return [(row['doc']['_id'],row['meta']['rev']) for row in docs] - def _verify_es_values(self, src_server, dest_server, kv_store = 1, verification_count = 1000): + def _verify_es_values(self, src_server, dest_server, kv_store = 1, verification_count = 10000): cb_rest = RestConnection(src_server) es_rest = RestConnection(dest_server) buckets = self.xd_ref._get_cluster_buckets(src_server) for bucket in buckets: - mc = MemcachedClientHelper.direct_client(src_server, bucket) - cb_valid, cb_deleted = bucket.kvs[kv_store].key_set() - es_valid = es_rest.all_docs() + mc = VBucketAwareMemcached(cb_rest, bucket) + es_valid = es_rest.all_docs(indices=[bucket.name], size=verification_count) - # compare values of documents + # compare values of es documents to documents in couchbase for row in es_valid[:verification_count]: key = str(row['meta']['id']) @@ -135,6 +187,9 @@ def _verify_es_values(self, src_server, dest_server, kv_store = 1, verification_ except MemcachedError as e: self.xd_ref.fail("Error during verification. Index contains invalid key: %s" % key) + self.xd_ref._log.info("Verified doc values in couchbase bucket (%s) match values in elastic search" %\ + (bucket.name)) + def verify_dest_added(self): src_master = self.xd_ref.src_master dest_master = self.xd_ref.dest_master diff --git a/pytests/xdcr/xdcrbasetests.py b/pytests/xdcr/xdcrbasetests.py index 8e70cf31e..a712aa485 100644 --- a/pytests/xdcr/xdcrbasetests.py +++ b/pytests/xdcr/xdcrbasetests.py @@ -635,6 +635,7 @@ def verify_xdcr_stats(self, src_nodes, dest_nodes, verify_src=False): self.fail("Mismatches on Meta Information on xdcr-replicated items!") def verify_results(self, verify_src=False): + # Checking replication at destination clusters dest_key_index = 1 for key in self.ord_keys[1:]: @@ -642,10 +643,10 @@ def verify_results(self, verify_src=False): break dest_key = self.ord_keys[dest_key_index] self.dest_nodes = self._clusters_dic[dest_key] - self.verify_xdcr_stats(self.src_nodes, self.dest_nodes, verify_src) dest_key_index += 1 + def wait_warmup_completed(self, warmupnodes, bucket_names=["default"]): if isinstance(bucket_names, str): bucket_names = [bucket_names] diff --git a/resources/perf/es-3-node.ini b/resources/perf/es-3-node.ini new file mode 100644 index 000000000..c77d3cc74 --- /dev/null +++ b/resources/perf/es-3-node.ini @@ -0,0 +1,47 @@ +[global] +username:root +password:couchbase + +[membase] +rest_username:Administrator +rest_password:password + +[cluster2] +1:cb1 +2:cb2 +3:cb3 +4:cb4 + +[cluster1] +5:es1 +6:es2 +7:es3 + +[cb1] +ip:10.1.3.78 +port:8091 + +[cb2] +ip:10.1.3.79 +port:8091 + +[cb3] +ip:10.1.3.76 +port:8091 + +[cb4] +ip:10.1.3.250 +port:8091 + +[es1] +ip:10.1.3.76 +port:9091 + +# additional node ports must be in descending order if using dev cluster +[es2] +ip:10.1.3.76 +port:9093 + +[es3] +ip:10.1.3.76 +port:9092