Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBQE-2336:: add additional tests for views+graceful failover, MB-11706

Change-Id: I5009315e4f23a49530ff770c3df50680e4db2f88
Reviewed-on: http://review.couchbase.org/39588
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Parag Agarwal <agarwal.parag@gmail.com>
Tested-by: Parag Agarwal <agarwal.parag@gmail.com>
  • Loading branch information...
commit 85f92ca3fe1c343fc24608a7b94e91439c7f3b6f 1 parent 5159dd3
@paragagarwal paragagarwal authored karma2ns committed
View
19 conf/py-newfailover.conf
@@ -1,6 +1,8 @@
failover.failovertests.FailoverTests:
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,failoverMaster=True,GROUP=P0
- test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,doc_ops=update:create:delete,withQueries=True,numViews=5,runViews=True,GROUP=P0
+ test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,failoverMaster=True,GROUP=P0
+ test_failover_firewall,replicas=2,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,failoverMaster=True,GROUP=P1
+ test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,doc_ops=update:create:delete,withQueries=True,numViews=5,withViewsOps=True,GROUP=P0
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,GROUP=P0
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,items=40000,sasl_buckets=1,GROUP=P1
test_failover_firewall,replicas=1,graceful=False,num_failed_nodes=1,load_ratio=10,bidirectional=True,GROUP=P0
@@ -10,7 +12,7 @@ failover.failovertests.FailoverTests:
test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,failoverMaster=True,GROUP=P0
test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=100000,standard_buckets=2,sasl_buckets=2,standard_bucket_priority=low:high,sasl_bucket_priority=low:high,dgm_run=True,failoverMaster=True,GROUP=P0
test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,GROUP=P0
- test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,withQueries=True,numViews=5,runViews=True,GROUP=P0
+ test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,withQueries=True,numViews=5,withViewsOps=True,GROUP=P0
test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,items=40000,standard_buckets=1,GROUP=P0;
test_failover_normal,replicas=1,graceful=False,num_failed_nodes=1,load_ratio=10,GROUP=P1
test_failover_normal,replicas=2,graceful=False,num_failed_nodes=2,items=20000,GROUP=P1
@@ -19,9 +21,11 @@ failover.failovertests.FailoverTests:
test_failover_normal,items=100000,graceful=False,during_ops=change_password,GROUP=P1;WINDOWS
test_failover_normal,items=100000,graceful=False,during_ops=change_port,failoverMaster=True,GROUP=P1;WINDOWS
#
+ test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,items=100000,dgm_run=True,failoverMaster=True,GROUP=P0
+ test_failover_stop_server,replicas=2,graceful=False,num_failed_nodes=1,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,items=100000,dgm_run=True,failoverMaster=True,GROUP=P1
test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,failoverMaster=True,GROUP=P0
test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,GROUP=P0
- test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,withQueries=True,numViews=5,runViews=True,GROUP=P0
+ test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,items=100000,dgm_run=True,withQueries=True,numViews=5,withViewsOps=True,GROUP=P0
test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,load_ratio=10,GROUP=P1
test_failover_stop_server,replicas=1,graceful=False,num_failed_nodes=1,load_ratio=1,GROUP=P2
test_failover_stop_server,replicas=2,graceful=False,num_failed_nodes=2,items=20000,GROUP=P0
@@ -30,7 +34,9 @@ failover.failovertests.FailoverTests:
test_failover_stop_server,replicas=3,graceful=False,num_failed_nodes=3,load_ratio=1,failoverMaster=True,GROUP=P1
# Graceful Failover and or Delta Recovery
- test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,withMutationOps=True,doc_ops=create:update:delete,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_normal,replicas=2,num_failed_nodes=1,items=100000,dgm_run=True,withMutationOps=True,doc_ops=create:update:delete,failoverMaster=True,graceful=True,GROUP=P1;GRACEFUL
+ test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,standard_buckets=2,sasl_buckets=2,standard_bucket_priority=low:high,sasl_bucket_priority=low:high,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,num_failed_nodes=1,items=200000,vbuckets=1024,stopGracefulFailover=True,dgm_run=True,,failoverMaster=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,num_failed_nodes=1,items=100000,dgm_run=True,graceful=True,GROUP=P0;GRACEFUL
@@ -42,8 +48,11 @@ failover.failovertests.FailoverTests:
test_failover_then_add_back,replicas=2,num_failed_nodes=2,items=100000,standard_buckets=1,recoveryType=delta:delta,deltaRecoveryBuckets=default,graceful=False,GROUP=P1;GRACEFUL
test_failover_then_add_back,replicas=2,num_failed_nodes=1,items=100000,standard_buckets=1,recoveryType=full,deltaRecoveryBuckets=default,graceful=True,GROUP=P1;GRACEFUL
test_failover_then_add_back,replicas=2,num_failed_nodes=1,items=100000,standard_buckets=1,recoveryType=delta,deltaRecoveryBuckets=default:standard_buckets0,graceful=True,GROUP=P1;GRACEFUL
+ test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,withMutationOps=True,doc_ops=create:update:delete,upr_check=False,recoveryType=full,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,withMutationOps=True,doc_ops=create:update:delete,upr_check=False,recoveryType=delta,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,sasl_buckets=1,upr_check=False,recoveryType=full,graceful=True,GROUP=P0;GRACEFUL
+ test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,numViews=5,withViewsOps=True,createIndexesDuringFailover=True,sasl_buckets=1,upr_check=False,recoveryType=delta,graceful=True,GROUP=P0;GRACEFUL
test_failover_then_add_back,replicas=1,num_failed_nodes=1,items=100000,sasl_buckets=1,upr_check=False,recoveryType=full,graceful=True,GROUP=P0;GRACEFUL
- test_failover_then_add_back,replicas=2,num_failed_nodes=1,items=100000,recoveryType=delta,standard_buckets=1,upr_check=False,withQueries=True,numViews=5,runViews=True,graceful=True,GROUP=P0;GRACEFUL
test_failover_normal,replicas=1,graceful=True,check_verify_failover_type=True,num_failed_nodes=1,items=100,dgm_run=True,,failoverMaster=True,graceful=True,runRebalanceAfterFailover=False,GROUP=P1;GRACEFUL
test_failover_normal,replicas=2,graceful=True,check_verify_failover_type=True,num_failed_nodes=3,items=100,dgm_run=True,,failoverMaster=True,graceful=True,runRebalanceAfterFailover=False,GROUP=P1;GRACEFUL
test_failover_normal,replicas=3,graceful=True,check_verify_failover_type=True,num_failed_nodes=4,items=100,dgm_run=True,,failoverMaster=True,graceful=True,runRebalanceAfterFailover=False,GROUP=P1;GRACEFUL
View
44 lib/couchbase/cluster.py
@@ -52,6 +52,20 @@ def async_create_sasl_bucket(self, server, name, password, size, replicas, enabl
self.task_manager.schedule(_task)
return _task
+ def async_failover(self, servers = [], failover_nodes =[], graceful = True):
+ """Asynchronously failover a set of nodes
+
+ Parameters:
+ servers - servers used for connection. (TestInputServer)
+ failover_nodes - The set of servers that will under go failover .(TestInputServer)
+ graceful = True/False. True - graceful, False - hard. (Boolean)
+
+ Returns:
+ FailOverTask - A task future that is a handle to the scheduled task."""
+ _task = FailoverTask(servers, to_failover = failover_nodes, graceful = graceful)
+ self.task_manager.schedule(_task)
+ return _task
+
def async_create_standard_bucket(self, server, name, port, size, replicas, enable_replica_index=1,
eviction_policy='valueOnly', bucket_priority=None):
"""Asynchronously creates a standard bucket
@@ -691,30 +705,18 @@ def compact_view(self, server, design_doc_name, bucket="default", timeout=None,
_task = self.async_compact_view(server, design_doc_name, bucket, with_rebalance)
return _task.result(timeout)
- def async_failover(self, servers, to_failover):
- """Asyncronously fails over nodes
-
- Parameters:
- servers - All servers participating in the failover ([TestInputServers])
- to_failover - All servers being failed over ([TestInputServers])
-
- Returns:
- FailoverTask - A task future that is a handle to the scheduled task"""
- _task = FailoverTask(servers, to_failover)
- self.task_manager.schedule(_task)
- return _task
-
- def failover(self, servers, to_failover, timeout=None):
- """Syncronously fails over nodes
+ def failover(self, servers = [], failover_nodes =[], graceful = True):
+ """Synchronously flushes a bucket
Parameters:
- servers - All servers participating in the failover ([TestInputServers])
- to_failover - All servers being failed over ([TestInputServers])
+ servers - node used for connection (TestInputServer)
+ failover_nodes - servers to be failover. (TestInputServer)
+ bucket - The name of the bucket to be flushed. (String)
Returns:
- boolean - Whether or not the failover was successful"""
- _task = self.async_failover(servers, to_failover)
- return _task.result(timeout)
+ boolean - Whether or not the bucket was flushed."""
+ _task = self.async_failover(servers, failover_nodes, graceful)
+ return _task.result()
def async_bucket_flush(self, server, bucket='default'):
"""Asynchronously flushes a bucket
@@ -741,6 +743,8 @@ def bucket_flush(self, server, bucket='default', timeout=None):
_task = self.async_bucket_flush(server, bucket)
return _task.result(timeout)
+
+
def async_monitor_db_fragmentation(self, server, fragmentation, bucket, get_view_frag=False):
"""Asyncronously monitor db fragmentation
View
5 lib/tasks/task.py
@@ -2219,10 +2219,11 @@ def _is_compacting(self):
'''task class for failover. This task will only failover nodes but doesn't
rebalance as there is already a task to do that'''
class FailoverTask(Task):
- def __init__(self, servers, to_failover=[], wait_for_pending=20):
+ def __init__(self, servers, to_failover=[], wait_for_pending=0,graceful = True):
Task.__init__(self, "failover_task")
self.servers = servers
self.to_failover = to_failover
+ self.graceful = graceful
self.wait_for_pending = wait_for_pending
def execute(self, task_manager):
@@ -2249,7 +2250,7 @@ def _failover_nodes(self, task_manager):
for node in rest.node_statuses():
if server.ip == node.ip and int(server.port) == int(node.port):
self.log.info("Failing over {0}:{1}".format(node.ip, node.port))
- rest.fail_over(node.id)
+ rest.fail_over(node.id,self.graceful)
class GenerateExpectedViewResultsTask(Task):
View
6 pytests/basetestcase.py
@@ -627,7 +627,7 @@ def _load_doc_data_all_buckets(self, data_op="create", batch_size=1000, gen_load
return gen_load
def verify_cluster_stats(self, servers=None, master=None, max_verify=None, timeout=None, check_items=True,
- only_store_hash=True, replica_to_read=None, batch_size=1000):
+ only_store_hash=True, replica_to_read=None, batch_size=1000, check_bucket_stats = True):
if servers is None:
servers = self.servers
if master is None:
@@ -645,7 +645,8 @@ def verify_cluster_stats(self, servers=None, master=None, max_verify=None, timeo
# get/verify stats if 'ValueError: Not able to get values for following keys' was gotten
self._verify_stats_all_buckets(servers, timeout=(timeout or 120))
raise e
- self._verify_stats_all_buckets(servers, timeout=(timeout or 120))
+ if check_bucket_stats:
+ self._verify_stats_all_buckets(servers, timeout=(timeout or 120))
# verify that curr_items_tot corresponds to sum of curr_items from all nodes
verified = True
for bucket in self.buckets:
@@ -654,6 +655,7 @@ def verify_cluster_stats(self, servers=None, master=None, max_verify=None, timeo
else:
self.log.warn("verification of items was omitted")
+
def _stats_befor_warmup(self, bucket_name):
self.pre_warmup_stats[bucket_name] = {}
self.stats_monitor = self.input.param("stats_monitor", "")
View
9 pytests/failover/failoverbasetests.py
@@ -19,8 +19,9 @@ def setUp(self):
self.failoverMaster = self.input.param("failoverMaster", False)
self.total_vbuckets = self.input.param("total_vbuckets", 1024)
self.std_vbucket_dist = self.input.param("std_vbucket_dist", None)
- self.withOps = self.input.param("withOps", False)
- self.runViews = self.input.param("runViews", False)
+ self.withMutationOps = self.input.param("withMutationOps", False)
+ self.withViewsOps = self.input.param("withViewsOps", False)
+ self.createIndexesDuringFailover = self.input.param("createIndexesDuringFailover", False)
self.upr_check = self.input.param("upr_check", True)
self.withQueries = self.input.param("withQueries", False)
self.numberViews = self.input.param("numberViews", False)
@@ -34,7 +35,6 @@ def setUp(self):
self._value_size = self.input.param("value_size", 256)
self.doc_ops = self.input.param("doc_ops", [])
self.deltaRecoveryBuckets = self.input.param("deltaRecoveryBuckets", None)
- self.runViewsDuringFailover = self.input.param("runViewsDuringFailover", False)
if self.doc_ops:
self.doc_ops = self.doc_ops.split(":")
self.num_failed_nodes = self.input.param("num_failed_nodes", 0)
@@ -52,6 +52,9 @@ def setUp(self):
self.gen_create = BlobGenerator('failover', 'failover', self.value_size, start=self.num_items + 1 , end=self.num_items * 1.5)
self.gen_update = BlobGenerator('failover', 'failover', self.value_size, start=self.num_items / 2, end=self.num_items)
self.gen_delete = BlobGenerator('failover', 'failover', self.value_size, start=self.num_items / 4, end=self.num_items / 2 - 1)
+ self.afterfailover_gen_create = BlobGenerator('failover', 'failover', self.value_size, start=self.num_items * 1.6 , end=self.num_items * 2)
+ self.afterfailover_gen_update = BlobGenerator('failover', 'failover', self.value_size, start=1 , end=self.num_items/4)
+ self.afterfailover_gen_delete = BlobGenerator('failover', 'failover', self.value_size, start=self.num_items * .5 , end=self.num_items* 0.75)
self.log.info("============== FailoverBaseTest setup was finished for test #{0} {1} =============="\
.format(self.case_number, self._testMethodName))
View
179 pytests/failover/failovertests.py
@@ -69,16 +69,17 @@ def common_test_body(self, failover_reason):
self.chosen = RebalanceHelper.pick_nodes(self.referenceNode, howmany=self.num_failed_nodes)
# Perform operations - Create/Update/Delete
- # self.withOps = True => Run Operations in parallel to failover
- # self.withOps = False => Run Operations Before failover
- self.ops_tasks = self.run_operation_tasks()
+ # self.withMutationOps = True => Run Operations in parallel to failover
+ # self.withMutationOps = False => Run Operations Before failover
+ self.load_initial_data()
+ if not self.withMutationOps:
+ self.run_mutation_operations()
# Perform View Creation Tasks and check for completion if required before failover
- if self.runViews:
+ if self.withViewsOps:
self.run_view_creation_operations(self.servers)
- if not self.runViewsDuringFailover:
- self.run_view_creation_operations(self.servers)
- self.monitor_view_tasks(self.servers)
+ if not self.createIndexesDuringFailover:
+ self.query_and_monitor_view_tasks(self.servers)
# Take snap-shot of data set used for validaiton
record_static_data_set = self.get_data_set_all(self.servers, self.buckets, path = None)
@@ -91,7 +92,10 @@ def common_test_body(self, failover_reason):
prev_failover_stats = self.get_failovers_logs(self.servers, self.buckets)
# Perform Operations relalted to failover
- self.run_failover_operations(self.chosen, failover_reason)
+ if self.withMutationOps or self.withViewsOps:
+ self.run_failover_operations_with_ops(self.chosen, failover_reason)
+ else:
+ self.run_failover_operations(self.chosen, failover_reason)
# Perform Add Back Operation with Rebalance Or only Rebalance with Verificaitons
if not self.gracefulFailoverFail and self.runRebalanceAfterFailover:
@@ -123,12 +127,19 @@ def run_rebalance_after_failover_and_verify(self, chosen, prev_vbucket_stats, re
self.change_port(new_port=self.input.param("new_port", "9090"))
self.rest = RestConnection(self.referenceNode)
+ # Peform View Validation if Supported
+ if self.withViewsOps:
+ self.query_and_monitor_view_tasks(self.servers)
+
# Run operations if required during rebalance after failover
- if self.withOps:
- for task in self.ops_tasks:
- task.result()
+ if self.withMutationOps:
+ self.run_mutation_operations_after_failover()
+
+ # Rebalance Monitoring
msg = "rebalance failed while removing failover nodes {0}".format([node.id for node in chosen])
self.assertTrue(self.rest.monitorRebalance(stop_if_loop=True), msg=msg)
+
+ # Reset password or port
if self.during_ops:
if self.during_ops == "change_password":
self.change_password(new_password=old_pass)
@@ -136,33 +147,27 @@ def run_rebalance_after_failover_and_verify(self, chosen, prev_vbucket_stats, re
self.change_port(new_port='8091',
current_port=self.input.param("new_port", "9090"))
return
+
# Drain Queue and make sure intra-cluster replication is complete
- self._verify_stats_all_buckets(_servers_,timeout = 120)
self._wait_for_stats_all_buckets(_servers_)
self.log.info("Begin VERIFICATION for Rebalance after Failover Only")
+
# Verify all data set with meta data if failover happens after failover
- if not self.withOps:
+ if not self.withMutationOps:
self.data_analysis_all(record_static_data_set, _servers_, self.buckets, path = None, addedItems = None)
# Check Cluster Stats and Data as well if max_verify > 0
- self.verify_cluster_stats(_servers_, self.referenceNode)
- # If views were created they can be verified
- if self.runViews:
- if self.runViewsDuringFailover:
- self.monitor_view_tasks(_servers_)
- self.verify_query_task()
+ self.verify_cluster_stats(_servers_, self.referenceNode, check_bucket_stats = True)
# Check Failover logs :: Not sure about this logic, currently not checking, will update code once confirmed
# Currently, only for checking case where we have graceful failover
if self.version_greater_than_2_5 and self.graceful and self.upr_check:
new_failover_stats = self.compare_failovers_logs(prev_failover_stats, _servers_, self.buckets)
new_vbucket_stats = self.compare_vbucket_seqnos(prev_vbucket_stats, _servers_, self.buckets)
self.compare_vbucketseq_failoverlogs(new_vbucket_stats, new_failover_stats)
-
# Verify Active and Replica Bucket Count
if self.num_replicas > 0:
nodes = self.get_nodes_in_cluster(self.referenceNode)
self.vb_distribution_analysis(servers = nodes, buckets = self.buckets, std = 1.0 , total_vbuckets = self.total_vbuckets)
-
self.log.info("End VERIFICATION for Rebalance after Failover Only")
def run_add_back_operation_and_verify(self, chosen, prev_vbucket_stats, record_static_data_set, prev_failover_stats):
@@ -183,15 +188,20 @@ def run_add_back_operation_and_verify(self, chosen, prev_vbucket_stats, record_s
index += 1
self.sleep(20, "After failover before invoking rebalance...")
self.rest.rebalance(otpNodes=[node.id for node in self.nodes],ejectedNodes=[],deltaRecoveryBuckets = self.deltaRecoveryBuckets)
- msg = "rebalance failed while removing failover nodes {0}".format(chosen)
+
+ # Peform View Validation if Supported
+ if self.withViewsOps:
+ self.query_and_monitor_view_tasks(self.servers)
+
# Run operations if required during rebalance after failover
- if self.withOps:
- for task in self.ops_tasks:
- task.result()
+ if self.withMutationOps:
+ self.run_mutation_operations_after_failover()
+
+ # Monitor Rebalance
+ msg = "rebalance failed while removing failover nodes {0}".format(chosen)
self.assertTrue(self.rest.monitorRebalance(stop_if_loop=True), msg=msg)
# Drain ep_queue and make sure that intra-cluster replication is complete
- self._verify_stats_all_buckets(self.servers,timeout = 120)
self._wait_for_stats_all_buckets(self.servers)
self.log.info("Begin VERIFICATION for Add-back and rebalance")
@@ -200,11 +210,11 @@ def run_add_back_operation_and_verify(self, chosen, prev_vbucket_stats, record_s
self.verify_for_recovery_type(chosen, serverMap, self.buckets,recoveryTypeMap, fileMapsForVerification, self.deltaRecoveryBuckets)
# Comparison of all data if required
- if not self.withOps:
+ if not self.withMutationOps:
self.data_analysis_all(record_static_data_set,self.servers, self.buckets, path = None, addedItems = None)
# Verify Stats of cluster and Data is max_verify > 0
- self.verify_cluster_stats(self.servers, self.referenceNode)
+ self.verify_cluster_stats(self.servers, self.referenceNode, check_bucket_stats = False)
# Verify if vbucket sequence numbers and failover logs are as expected
# We will check only for version > 2.5.* and if the failover is graceful
@@ -213,12 +223,6 @@ def run_add_back_operation_and_verify(self, chosen, prev_vbucket_stats, record_s
new_failover_stats = self.compare_failovers_logs(prev_failover_stats,self.servers,self.buckets)
self.compare_vbucketseq_failoverlogs(new_vbucket_stats, new_failover_stats)
- # Peform View Validation if Supported
- if self.runViews:
- if self.runViewsDuringFailover:
- self.monitor_view_tasks(self.servers)
- self.verify_query_task()
-
# Verify Active and Replica Bucket Count
if self.num_replicas > 0:
nodes = self.get_nodes_in_cluster(self.referenceNode)
@@ -326,31 +330,96 @@ def run_failover_operations(self, chosen, failover_reason):
nodes = self.filter_servers(self.servers,chosen)
self.vb_distribution_analysis(servers = nodes, buckets = self.buckets, std = 1.0 , total_vbuckets = self.total_vbuckets, type = "failover")
+ def run_failover_operations_with_ops(self, chosen, failover_reason):
+ """ Method to run fail over operations used in the test scenario based on failover reason """
+ # Perform Operations relalted to failover
+ failed_over = True
+ for node in chosen:
+ unreachable = False
+ if failover_reason == 'stop_server':
+ unreachable=True
+ self.stop_server(node)
+ self.log.info("10 seconds delay to wait for membase-server to shutdown")
+ # wait for 5 minutes until node is down
+ self.assertTrue(RestHelper(self.rest).wait_for_node_status(node, "unhealthy", 300),
+ msg="node status is not unhealthy even after waiting for 5 minutes")
+ elif failover_reason == "firewall":
+ unreachable=True
+ self.filter_list.append (node.ip)
+ server = [srv for srv in self.servers if node.ip == srv.ip][0]
+ RemoteUtilHelper.enable_firewall(server, bidirectional=self.bidirectional)
+ status = RestHelper(self.rest).wait_for_node_status(node, "unhealthy", 300)
+ if status:
+ self.log.info("node {0}:{1} is 'unhealthy' as expected".format(node.ip, node.port))
+ else:
+ # verify iptables on the node if something wrong
+ for server in self.servers:
+ if server.ip == node.ip:
+ shell = RemoteMachineShellConnection(server)
+ info = shell.extract_remote_info()
+ if info.type.lower() == "windows":
+ o, r = shell.execute_command("netsh advfirewall show allprofiles")
+ shell.log_command_output(o, r)
+ else:
+ o, r = shell.execute_command("/sbin/iptables --list")
+ shell.log_command_output(o, r)
+ shell.disconnect()
+ self.rest.print_UI_logs()
+ api = self.rest.baseUrl + 'nodeStatuses'
+ status, content, header = self.rest._http_request(api)
+ json_parsed = json.loads(content)
+ self.log.info("nodeStatuses: {0}".format(json_parsed))
+ self.fail("node status is not unhealthy even after waiting for 5 minutes")
+ nodes = self.filter_servers(self.servers,chosen)
+ failed_over = self.cluster.async_failover([self.referenceNode], failover_nodes = chosen, graceful=self.graceful)
+ # Run View Operations
+ if self.withViewsOps:
+ self.query_and_monitor_view_tasks(nodes)
+ # Run mutation operations
+ if self.withMutationOps:
+ self.run_mutation_operations()
+ failed_over.result()
+ msg = "rebalance failed while removing failover nodes {0}".format(node.id)
+ self.assertTrue(self.rest.monitorRebalance(stop_if_loop=True), msg=msg)
+ self.verify_unacked_bytes_all_buckets()
- def run_operation_tasks(self):
+ def load_initial_data(self):
""" Method to run operations Update/Delete/Create """
# Load All Buckets if num_items > 0
- tasks = []
+ tasks = []
tasks += self._async_load_all_buckets(self.referenceNode, self.gen_initial_create, "create", 0)
for task in tasks:
task.result()
- self._verify_stats_all_buckets(self.servers,timeout = 120)
self._wait_for_stats_all_buckets(self.servers)
- # Update or Delete buckets if items > 0 and options are passed in tests
- # These can run in parallel (withOps = True), or before (withOps = True)
- ops_tasks = []
+ self._verify_stats_all_buckets(self.servers,timeout = 120)
+
+ def run_mutation_operations(self):
+ mutation_ops_tasks = []
+ if("create" in self.doc_ops):
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_create, "create", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ if("update" in self.doc_ops):
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_update, "update", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ if("delete" in self.doc_ops):
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_delete, "delete", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ try:
+ for task in mutation_ops_tasks:
+ task.result()
+ except Exception, ex:
+ self.log.info(ex)
+
+ def run_mutation_operations_after_failover(self):
+ mutation_ops_tasks = []
if("create" in self.doc_ops):
- ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_update, "create", 0)
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.afterfailover_gen_create, "create", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
if("update" in self.doc_ops):
- ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_update, "update", 0)
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.afterfailover_gen_update, "update", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
if("delete" in self.doc_ops):
- ops_tasks += self._async_load_all_buckets(self.referenceNode, self.gen_delete, "delete", 0)
- if not self.withOps:
- for task in ops_tasks:
+ mutation_ops_tasks += self._async_load_all_buckets(self.referenceNode, self.afterfailover_gen_delete, "delete", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
+ try:
+ for task in mutation_ops_tasks:
task.result()
- self._wait_for_stats_all_buckets(self.servers)
- self._verify_stats_all_buckets(self.servers,timeout = 120)
- return ops_tasks
+ except Exception, ex:
+ self.log.info(ex)
def define_maps_during_failover(self, recoveryType = []):
""" Method to define nope ip, recovery type map """
@@ -424,23 +493,13 @@ def run_view_creation_operations(self, servers):
for task in tasks:
task.result(self.wait_timeout * 20)
- for bucket in self.buckets:
- for view in views:
- # run queries to create indexes
- self.cluster.query_view(self.master, prefix + ddoc_name, view.name, query)
- self.verify_query_task()
- active_tasks = self.cluster.async_monitor_active_task(servers, "indexer", "_design/" + prefix + ddoc_name, wait_task=False)
- for active_task in active_tasks:
- result = active_task.result()
- self.assertTrue(result)
-
- def monitor_view_tasks(self, servers):
+ def query_and_monitor_view_tasks(self, servers):
""" Monitor Query Tasks for their completion """
num_views = self.input.param("num_views", 5)
is_dev_ddoc = self.input.param("is_dev_ddoc", True)
ddoc_name = "ddoc1"
prefix = ("", "dev_")[is_dev_ddoc]
-
+ self.verify_query_task()
active_tasks = self.cluster.async_monitor_active_task(servers, "indexer", "_design/" + prefix + ddoc_name, wait_task=False)
for active_task in active_tasks:
result = active_task.result()
Please sign in to comment.
Something went wrong with that request. Please try again.