Skip to content

Commit

Permalink
CBES-8: es/xdcr topology testsuite
Browse files Browse the repository at this point in the history
Change-Id: I8c19a791ffd03b0e73026d1d6a4013a045d7699b
Reviewed-on: http://review.couchbase.org/23878
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Tommie McAfee <tommie@couchbase.com>
  • Loading branch information
tahmmee committed Jan 22, 2013
1 parent 49dcb06 commit 28a01c3
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 54 deletions.
62 changes: 57 additions & 5 deletions conf/py-xdcr-elasticsearch.conf
@@ -1,7 +1,9 @@
xdcr.esXDCR.ESKVTests:
xdcr.esXDCR.ESTests:

test_plugin_connect

############ KV TESTS ###############

#Load with ops
load_with_async_ops,items=10000,rdirection=unidirection

Expand All @@ -19,11 +21,61 @@ xdcr.esXDCR.ESKVTests:
load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10

# multi-bucket
load_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1,dgm_run=True

load_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1
# multi-bucket mixed workload
load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10,standard_buckets=1,dgm_run=True
load_with_async_ops,items=10000,rdirection=unidirection,doc-ops=create-update-delete-read,expires=10,standard_buckets=1

# multi-bucket mixed doc types
test_multi_bucket_doctypes_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1,dgm_run=True
test_multi_bucket_doctypes_with_async_ops,items=10000,rdirection=unidirection,standard_buckets=1


############ ES TOPOLOGY TESTS ###############

# rebalance in/out swap simple kv
test_topology,items=10000,rdirection=unidirection,es_in=True
test_topology,items=10000,rdirection=unidirection,es_out=True
test_topology,items=10000,rdirection=unidirection,es_swap=True

# rebalance in/out/swap mixed doc types
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,es_swap=True

# rebalance in/out/swap multi-bucket mixed doc types
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,es_swap=True


############ CB TOPOLOGY TESTS ###############

# rebalance in/out swap simple kv
test_topology,items=10000,rdirection=unidirection,cb_in=True
test_topology,items=10000,rdirection=unidirection,cb_out=True
test_topology,items=10000,rdirection=unidirection,cb_swap=True

# rebalance in/out/swap mixed doc types
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_swap=True
# failover
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,cb_swap=True,cb_failover=True

# rebalance in/out/swap multi-bucket mixed doc types
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True
# failover
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True,cb_failover=True


############ ES+CB TOPOLOGY TESTS ###############
# rebalance in/out/swap multi-bucket mixed doc types
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True,es_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_in=True,es_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_in=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_out=True
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_swap=True,es_swap=True
# failover
test_topology,items=10000,rdirection=unidirection,doc-ops=read-create-delete-update,expires=10,standard_buckets=1,cb_out=True,es_out=True,cb_failover=True

39 changes: 29 additions & 10 deletions lib/membase/api/esrest_client.py
Expand Up @@ -27,8 +27,9 @@ def __init__(self, serverInfo, proto = "http"):

self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port)
self.capiBaseUrl = self.baseUrl
http_port = str(int(self.port) + 109)
self.conn = pyes.ES((proto,self.ip,http_port))
self.http_port = str(int(self.port) + 109)
self.proto = proto
self.conn = pyes.ES((self.proto,self.ip,self.http_port))
self.test_params = TestInputSingleton.input

def get_index_stats(self):
Expand Down Expand Up @@ -74,6 +75,10 @@ def delete_index(self, name):
return self.conn.indices.exists_index(name)

def create_index(self, name):

if self.conn.indices.exists_index(name):
self.delete_index(name)

self.conn.indices.create_index(name)
return self.conn.indices.exists_index(name)

Expand Down Expand Up @@ -155,10 +160,22 @@ def get_node_params(self, info):
# use params from master node
return master_node

def all_docs(self, keys_only = False):
def search_term(self, key, indices=["default"]):
result = None
params = {"term":{"_id":key}}
query = pyes.Search(params)
row = self.conn.search(query, indices = indices)
if row.total > 0:
result = row[0]
return result

def term_exists(self, key, indices=["default"]):
return self.search_term(key, indices = indices) is not None

def all_docs(self, keys_only = False, indices=["default"],size=10000):

query = pyes.Search({'match_all' : {}})
rows = self.conn.search(query)
rows = self.conn.search(query, indices=indices, size=size)
docs = []

for row in rows:
Expand Down Expand Up @@ -212,7 +229,7 @@ def start_es_node(self, node):

# wait for new node
tries = 0
while tries < 5:
while tries < 10:
for cluster_node in self.get_nodes():
if cluster_node.ip == node.ip and cluster_node.port == int(node.port):
return
Expand Down Expand Up @@ -343,16 +360,18 @@ def parse_addr(addr):
class ESNode(Node):
def __init__(self, info):
super(ESNode, self).__init__()
self.hostname = info['hostname']
self.key = str(info['key'])
self.ip, self.port = parse_addr(info["couchbase_address"])
self.tr_ip, self.tr_port = parse_addr(info["transport_address"])
self.ht_ip, self.ht_port = parse_addr(info["http_address"])
name = str(info['name']).replace(' ','')

if 'http_address' in info:
self.ht_ip, self.ht_port = parse_addr(info["http_address"])

# truncate after space, or comma
name = str(info['name'][:info['name'].find(' ')])
name = name[:name.find(',')]
self.id = "es_%s@%s" % (name, self.ip)

self.ssh_username = info['ssh_username']
self.ssh_password = info['ssh_password']
self.ssh_key = ''


127 changes: 118 additions & 9 deletions pytests/xdcr/esXDCR.py
Expand Up @@ -8,15 +8,15 @@
import time

#Assumption that at least 2 nodes on every cluster
class ESKVTests(XDCRReplicationBaseTest, ESReplicationBaseTest):
class ESTests(XDCRReplicationBaseTest, ESReplicationBaseTest):
def setUp(self):
super(ESKVTests, self).setUp()
self.setup_xd_ref(self)
super(ESTests, self).setUp()
self.setup_es_params(self)
self.verify_dest_added()
self.setup_doc_gens()

def tearDown(self):
super(ESKVTests, self).tearDown()
super(ESTests, self).tearDown()

def setup_doc_gens(self):
# create json doc generators
Expand Down Expand Up @@ -59,14 +59,23 @@ def _async_modify_data(self):
tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_delete, "delete", 0))
if "read" in self._doc_ops:
tasks.extend(self._async_load_all_buckets(self.src_master, self.gen_create, "read", 0))
return tasks

def modify_data(self):
tasks = self._async_modify_data()
for task in tasks:
task.result()


#overriding xdcr verify results method for specific es verification
def verify_results(self, verify_src=False):
self.verify_es_results(verify_src)

"""Testing Unidirectional load( Loading only at source) Verifying whether ES/XDCR replication is successful on
subsequent destination clusters.Create/Update/Delete operations are performed based on doc-ops specified by the user. """
def load_with_async_ops(self):
self._load_all_buckets(self.src_master, self.gen_create, "create", 0)
self._async_modify_data()
self.modify_data()
self.verify_results()

def test_plugin_connect(self):
Expand All @@ -89,9 +98,109 @@ def test_multi_bucket_doctypes_with_async_ops(self):
bucket_idx = bucket_idx + 1

for task in tasks:
task.result()
task.result()

"""Test coverage for elasticsearch and couchbase topology changes during data loading"""
def test_topology(self):

availableESNodes = self.dest_nodes[1:]
availableCBNodes = self.src_nodes[1:]

if self._es_in:
task = self._async_rebalance(self.dest_nodes, [], availableESNodes)
availableESNodes = []

if self._cb_in:
tasks = self._async_rebalance(self.src_nodes, [], availableCBNodes)
[task.result() for task in tasks]
availableCBNodes = []

# load data
tasks = \
self._async_load_all_buckets(self.src_master, self.gen_create, "create", 0)


# peform initial rebalances
if self._es_out or self._es_swap:
availableESNodes = self._first_level_rebalance_out(self.dest_nodes,
availableESNodes,
monitor = False)
elif self._es_in:
availableESNodes = self._first_level_rebalance_in(self.dest_nodes,
monitor = False)

if self._cb_out or self._cb_swap:
availableCBNodes = self._first_level_rebalance_out(self.src_nodes,
availableCBNodes,
self._cb_failover)
elif self._cb_in:
availableCBNodes = self._first_level_rebalance_in(self.src_nodes)

# wait for initial data loading and load some more data
[task.result() for task in tasks]
tasks = self._async_modify_data()

# add/remove remaining nodes
if self._es_out or self._es_swap:
self._second_level_rebalance_out(self.dest_nodes,
availableESNodes,
self._es_swap,
monitor = False)
elif self._es_in:
self._second_level_rebalance_in(self.dest_nodes, monitor = False)

if self._cb_out or self._cb_swap:
self._second_level_rebalance_out(self.src_nodes,
availableCBNodes,
self._cb_swap)
elif self._cb_in:
self._second_level_rebalance_in(self.src_nodes)

# wait for secondary data loading tasks and verify results
[task.result() for task in tasks]
self.verify_results()

#overriding xdcr verify results method for specific es verification
def verify_results(self, verify_src=False):
self.verify_es_results(verify_src)
def _first_level_rebalance_out(self, param_nodes,
available_nodes,
do_failover = False,
monitor = True):

nodes_out = available_nodes[:1]
if do_failover:
self._cluster_helper.failover(param_nodes, nodes_out)

tasks = self._async_rebalance(param_nodes, [], nodes_out)
if monitor:
[task.result() for task in tasks]
return available_nodes[1:]

def _first_level_rebalance_in(self, param_nodes,
monitor = True):
nodes_in = []
if len(param_nodes) > 1:
nodes_in = [param_nodes[1]]
tasks = self._async_rebalance(param_nodes, nodes_in, [])
if monitor:
[task.result() for task in tasks]

return nodes_in

def _second_level_rebalance_out(self, param_nodes,
available_nodes,
do_swap = False,
monitor = True):
if len(available_nodes) > 1:
nodes_in = []
if do_swap:
nodes_in = [param_nodes[1]]
tasks = self._async_rebalance(param_nodes, nodes_in, available_nodes)
if monitor:
[task.result() for task in tasks]

def _second_level_rebalance_in(self, param_nodes, monitor = True):
if len(param_nodes) > 2:
nodes_in = param_nodes[2:]
tasks = self._async_rebalance(param_nodes, nodes_in, [])
if monitor:
[task.result() for task in tasks]

0 comments on commit 28a01c3

Please sign in to comment.