Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBQE-2332: Add Compaction + Rebalance/Failover Tests

Change-Id: I240f1e0dd9413c392b4b3076edfe5049f245c069
Reviewed-on: http://review.couchbase.org/39624
Reviewed-by: Parag Agarwal <agarwal.parag@gmail.com>
Tested-by: Parag Agarwal <agarwal.parag@gmail.com>
  • Loading branch information...
commit 551f829f13c1088c8d74e2344a37ad591c32567d 1 parent 85f92ca
@paragagarwal paragagarwal authored karma2ns committed
View
4 conf/py-newfailover.conf
@@ -3,6 +3,7 @@ failover.failovertests.FailoverTests:
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,failoverMaster=True,GROUP=P0
test_failover_firewall,replicas=2,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,failoverMaster=True,GROUP=P1
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,doc_ops=update:create:delete,withQueries=True,numViews=5,withViewsOps=True,GROUP=P0
+ test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,doc_ops=update:create:delete,compact=True,withQueries=True,numViews=5,withViewsOps=True,GROUP=P1
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,GROUP=P0
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=40000,sasl_buckets=1,GROUP=P1
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,load_ratio=10,bidirectional=True,GROUP=P0
@@ -31,9 +32,11 @@ failover.failovertests.FailoverTests:
test_failover_stop_server,replicas=2,graceful=False,num_failed_nodes=2,items=20000,GROUP=P0
test_failover_stop_server,replicas=2,graceful=False,num_failed_nodes=2,load_ratio=10,GROUP=P2
test_failover_stop_server,replicas=3,graceful=False,num_failed_nodes=3,load_ratio=1,GROUP=P1
+ test_failover_stop_server,replicas=3,graceful=False,num_failed_nodes=3,items=100000,compact=True,load_ratio=1,GROUP=P1
test_failover_stop_server,replicas=3,graceful=False,num_failed_nodes=3,load_ratio=1,failoverMaster=True,GROUP=P1
# Graceful Failover and or Delta Recovery
+ test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,compact=True,dgm_run=True,withMutationOps=True,doc_ops=create:update:delete,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,withMutationOps=True,doc_ops=create:update:delete,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=2,num_failed_nodes=1,items=100000,dgm_run=True,withMutationOps=True,doc_ops=create:update:delete,failoverMaster=True,graceful=True,GROUP=P1;GRACEFUL
test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
@@ -52,6 +55,7 @@ failover.failovertests.FailoverTests:
test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,withMutationOps=True,doc_ops=create:update:delete,upr_check=False,recoveryType=delta,graceful=True,GROUP=P0;GRACEFUL
test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,sasl_buckets=1,upr_check=False,recoveryType=full,graceful=True,GROUP=P0;GRACEFUL
test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,sasl_buckets=1,upr_check=False,recoveryType=delta,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,numViews=5,compact=True,withViewsOps=True,createIndexesDuringFailover=True,sasl_buckets=1,upr_check=False,recoveryType=delta,graceful=True,GROUP=P1;GRACEFUL
test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,sasl_buckets=1,upr_check=False,recoveryType=full,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,graceful=True,check_verify_failover_type=True,num_failed_nodes=1,items=100,dgm_run=True,,failoverMaster=True,graceful=True,runRebalanceAfterFailover=False,GROUP=P1;GRACEFUL
test_failover_normal,replicas=2,graceful=True,check_verify_failover_type=True,num_failed_nodes=3,items=100,dgm_run=True,,failoverMaster=True,graceful=True,runRebalanceAfterFailover=False,GROUP=P1;GRACEFUL
View
1  conf/rebalance/py-rebalancein.conf
@@ -1,5 +1,6 @@
rebalance.rebalancein.RebalanceInTests:
#default values:replicas=1,items=10000,nodes_in=1,value_size=256,doc_ops=None,GROUP=IN;P0
+ rebalance_in_with_compaction_and_ops,nodes_init=3,replicas=1,items=500000,doc_ops=create:update:delete,GROUP=IN;P1
rebalance_in_after_ops,nodes_in=1,nodes_init=3,replicas=1,items=100000,GROUP=IN;P0
rebalance_in_with_ops,nodes_in=3,items=0,GROUP=IN;P1
rebalance_in_with_ops,nodes_in=2,replicas=2,GROUP=IN;P2
View
1  conf/rebalance/py-rebalanceinout.conf
@@ -6,6 +6,7 @@ rebalance.rebalanceinout.RebalanceInOutTests:
#incremental_rebalance_in_out_with_mutation_and_expiration,items=500000,value_size=512,max_verify=100000,GROUP=IN_OUT;P0
incremental_rebalance_out_in_with_mutation,replicas=2,value_size=2048,GROUP=IN_OUT;P1
incremental_rebalance_out_in_with_mutation,init_num_nodes=3,items=400000,GROUP=IN_OUT;P0
+ incremental_rebalance_out_in_with_mutation_and_compaction,init_num_nodes=3,items=400000,GROUP=IN_OUT;P0
incremental_rebalance_out_in_with_mutation,init_num_nodes=3,items=400000,standard_buckets=2,sasl_buckets=2,standard_bucket_priority=low:high,sasl_bucket_priority=low:high,GROUP=IN_OUT;P0
incremental_rebalance_out_in_with_mutation,replicas=3,init_num_nodes=3,GROUP=IN_OUT;P2
start_stop_rebalance_in_out,nodes_init=1,nodes_in=2,nodes_out=0,extra_nodes_in=1,extra_nodes_out=0,items=100000,max_verify=10000,value_size=1024,GROUP=IN_OUT;P0
View
1  conf/rebalance/py-rebalanceout.conf
@@ -4,6 +4,7 @@ rebalance.rebalanceout.RebalanceOutTests:
rebalance_out_with_ops,nodes_out=3,replicas=3,items=0,GROUP=OUT;P2
rebalance_out_with_ops,nodes_out=5,items=0,GROUP=OUT;P1
rebalance_out_with_ops,nodes_out=5,replicas=2,items=0,GROUP=OUT;P1
+ rebalance_out_with_compaction_and_ops,nodes_out=5,replicas=2,doc_ops=create:delete:update,items=500000,GROUP=OUT;P1
rebalance_out_with_ops,nodes_out=5,GROUP=OUT;P2
rebalance_out_with_ops,nodes_out=5,replicas=3,GROUP=OUT;P2
rebalance_out_with_ops,nodes_out=1,doc_ops=create,GROUP=OUT;P2
View
1  pytests/failover/failoverbasetests.py
@@ -18,6 +18,7 @@ def setUp(self):
self.default_view = View(self.default_view_name, self.defaul_map_func, None)
self.failoverMaster = self.input.param("failoverMaster", False)
self.total_vbuckets = self.input.param("total_vbuckets", 1024)
+ self.compact = self.input.param("compact", False)
self.std_vbucket_dist = self.input.param("std_vbucket_dist", None)
self.withMutationOps = self.input.param("withMutationOps", False)
self.withViewsOps = self.input.param("withViewsOps", False)
View
15 pytests/failover/failovertests.py
@@ -92,7 +92,7 @@ def common_test_body(self, failover_reason):
prev_failover_stats = self.get_failovers_logs(self.servers, self.buckets)
# Perform Operations relalted to failover
- if self.withMutationOps or self.withViewsOps:
+ if self.withMutationOps or self.withViewsOps or self.compact:
self.run_failover_operations_with_ops(self.chosen, failover_reason)
else:
self.run_failover_operations(self.chosen, failover_reason)
@@ -126,7 +126,10 @@ def run_rebalance_after_failover_and_verify(self, chosen, prev_vbucket_stats, re
elif self.during_ops == "change_port":
self.change_port(new_port=self.input.param("new_port", "9090"))
self.rest = RestConnection(self.referenceNode)
-
+ # Perform Compaction
+ if self.compact:
+ for bucket in self.buckets:
+ self.cluster.compact_bucket(self.referenceNode,bucket)
# Peform View Validation if Supported
if self.withViewsOps:
self.query_and_monitor_view_tasks(self.servers)
@@ -189,6 +192,10 @@ def run_add_back_operation_and_verify(self, chosen, prev_vbucket_stats, record_s
self.sleep(20, "After failover before invoking rebalance...")
self.rest.rebalance(otpNodes=[node.id for node in self.nodes],ejectedNodes=[],deltaRecoveryBuckets = self.deltaRecoveryBuckets)
+ # Perform Compaction
+ if self.compact:
+ for bucket in self.buckets:
+ self.cluster.compact_bucket(self.referenceNode,bucket)
# Peform View Validation if Supported
if self.withViewsOps:
self.query_and_monitor_view_tasks(self.servers)
@@ -372,6 +379,10 @@ def run_failover_operations_with_ops(self, chosen, failover_reason):
self.fail("node status is not unhealthy even after waiting for 5 minutes")
nodes = self.filter_servers(self.servers,chosen)
failed_over = self.cluster.async_failover([self.referenceNode], failover_nodes = chosen, graceful=self.graceful)
+ # Perform Compaction
+ if self.compact:
+ for bucket in self.buckets:
+ self.cluster.compact_bucket(self.referenceNode,bucket)
# Run View Operations
if self.withViewsOps:
self.query_and_monitor_view_tasks(nodes)
View
29 pytests/rebalance/rebalancein.py
@@ -82,6 +82,35 @@ def rebalance_in_with_ops(self):
self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init])
self.verify_unacked_bytes_all_buckets()
+ """Rebalances nodes into a cluster while doing docs ops:create, delete, update.
+
+ This test begins by loading a given number of items into the cluster.
+ We later run compaction on all buckets and do ops as well
+ """
+ def rebalance_in_with_compaction_and_ops(self):
+ self.withOps = True
+ servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
+ tasks = [self.cluster.async_rebalance(self.servers[:self.nodes_init], servs_in, [])]
+ for bucket in self.buckets:
+ tasks += self.cluster.async_compact_bucket(self.master,bucket)
+ if(self.doc_ops is not None):
+ if("update" in self.doc_ops):
+ # 1/2th of data will be updated in each iteration
+ tasks += self._async_load_all_buckets(self.master, self.gen_update, "update", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ elif("create" in self.doc_ops):
+ # 1/2th of initial data will be added in each iteration
+ gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items * (1 + i) / 2.0 , end=self.num_items * (1 + i / 2.0))
+ tasks += self._async_load_all_buckets(self.master, gen_create, "create", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ elif("delete" in self.doc_ops):
+ # 1/(num_servers) of initial data will be removed after each iteration
+ # at the end we should get empty base( or couple items)
+ gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=int(self.num_items * (1 - i / (self.num_servers - 1.0))) + 1, end=int(self.num_items * (1 - (i - 1) / (self.num_servers - 1.0))))
+ tasks += self._async_load_all_buckets(self.master, gen_delete, "delete", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ for task in tasks:
+ task.result()
+ self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init])
+ self.verify_unacked_bytes_all_buckets()
+
def rebalance_in_with_ops_batch(self):
gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=(self.num_items / 2 - 1), end=self.num_items)
gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items + 1, end=self.num_items * 3 / 2)
View
32 pytests/rebalance/rebalanceinout.py
@@ -163,6 +163,38 @@ def incremental_rebalance_in_out_with_mutation(self):
self.verify_cluster_stats(self.servers[:self.num_servers])
self.verify_unacked_bytes_all_buckets()
+ """Rebalances nodes out and in of the cluster while doing mutations and compaction.
+
+ This test begins by loading a given number of items into the cluster. It then
+ removes one node, rebalances that node out the cluster, and then rebalances it back
+ in. During the rebalancing we update all of the items in the cluster. Once the
+ node has been removed and added back we wait for the disk queues to drain, and
+ then verify that there has been no data loss, sum(curr_items) match the curr_items_total.
+ We then remove and add back two nodes at a time and so on until we have reached the point
+ where we are adding back and removing at least half of the nodes."""
+ def incremental_rebalance_in_out_with_mutation_and_compaction(self):
+ self.cluster.rebalance(self.servers[:self.num_servers],
+ self.servers[1:self.num_servers], [])
+ gen = BlobGenerator('mike', 'mike-', self.value_size, end=self.num_items)
+ self._load_all_buckets(self.master, gen, "create", 0)
+ batch_size = 50
+ for i in reversed(range(self.num_servers)[self.num_servers / 2:]):
+ tasks = self._async_load_all_buckets(self.master, gen, "update", 0, batch_size=batch_size, timeout_secs=60)
+ for bucket in self.buckets:
+ self.cluster.compact_bucket(self.master,bucket)
+ self.cluster.rebalance(self.servers[:i], [], self.servers[i:self.num_servers])
+ self.sleep(10)
+
+ for task in tasks:
+ task.result(self.wait_timeout * 20)
+ tasks = self._async_load_all_buckets(self.master, gen, "update", 0, batch_size=batch_size, timeout_secs=60)
+ self.cluster.rebalance(self.servers[:self.num_servers],
+ self.servers[i:self.num_servers], [])
+ for task in tasks:
+ task.result(self.wait_timeout * 20)
+ self.verify_cluster_stats(self.servers[:self.num_servers])
+ self.verify_unacked_bytes_all_buckets()
+
"""Start-stop rebalance in/out with adding/removing aditional after stopping rebalance.
This test begins by loading a given number of items into the cluster. It then
View
30 pytests/rebalance/rebalanceout.py
@@ -89,6 +89,36 @@ def rebalance_out_with_ops(self):
self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
self.verify_unacked_bytes_all_buckets()
+ """Rebalances nodes out of a cluster while doing docs ops:create, delete, update along with compaction.
+
+ This test begins with all servers clustered together and loads a user defined
+ number of items into the cluster. It then remove nodes_out from the cluster at a time
+ and rebalances. During the rebalance we perform docs ops(add/remove/update/read)
+ in the cluster( operate with a half of items that were loaded before).
+ Once the cluster has been rebalanced we wait for the disk queues to drain,
+ and then verify that there has been no data loss, sum(curr_items) match the curr_items_total.
+ Once all nodes have been rebalanced the test is finished."""
+ def rebalance_out_with_compaction_and_ops(self):
+ gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items / 2, end=self.num_items)
+ gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items + 1, end=self.num_items * 3 / 2)
+ servs_out = [self.servers[self.num_servers - i - 1] for i in range(self.nodes_out)]
+ tasks = [self.cluster.async_rebalance(self.servers[:1], [], servs_out)]
+ for bucket in self.buckets:
+ self.cluster.compact_bucket(self.master,bucket)
+ # define which doc's ops will be performed during rebalancing
+ # allows multiple of them but one by one
+ if(self.doc_ops is not None):
+ if("update" in self.doc_ops):
+ tasks += self._async_load_all_buckets(self.master, self.gen_update, "update", 0)
+ if("create" in self.doc_ops):
+ tasks += self._async_load_all_buckets(self.master, gen_create, "create", 0)
+ if("delete" in self.doc_ops):
+ tasks += self._async_load_all_buckets(self.master, gen_delete, "delete", 0)
+ for task in tasks:
+ task.result()
+ self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
+ self.verify_unacked_bytes_all_buckets()
+
"""Rebalances nodes from a cluster during getting random keys.
This test begins with all servers clustered together and loads a user defined
Please sign in to comment.
Something went wrong with that request. Please try again.