Skip to content
Browse files

Adding test for checkpoint with failover

Change-Id: Ia02960513314a5bc03da8c44e385956b43dab41c
Reviewed-on: http://review.couchbase.org/16074
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
Tested-by: Ketaki Gangal <ketakigangal@gmail.com>
  • Loading branch information...
1 parent 0a07284 commit d06cc8848b385c5f8abd0993c313cb692be75118 @ketakigangal ketakigangal committed with ketakigangal
Showing with 124 additions and 10 deletions.
  1. +2 −1 conf/py-checkpoint.conf
  2. +122 −9 pytests/checkpoint.py
View
3 conf/py-checkpoint.conf
@@ -4,4 +4,5 @@ checkpoint.CheckpointTests:
checkpoint_create_time,servers=2,replicas=1
checkpoint_create_time,servers=4,replicas=3
checkpoint_deduplication,servers=2,replicas=1
- checkpoint_collapse,servers=4,replicas=3
+ checkpoint_collapse,servers=4,replicas=3
+ checkpoint_failover,server=4,replicas=2,items=30000,chk_size=5000
View
131 pytests/checkpoint.py
@@ -12,6 +12,7 @@
from membase.helper.bucket_helper import BucketOperationHelper
from membase.helper.cluster_helper import ClusterOperationHelper
from memcached.helper.data_helper import MemcachedClientHelper
+from membase.helper.failover_helper import FailoverHelper
log = logger.Logger.get_logger()
@@ -28,6 +29,8 @@ def setUp(self):
self.input = TestInputSingleton.input
self.servers = self.input.servers
self.num_servers = self.input.param("servers", 1)
+ self.items = self.input.param("items", 12000)
+ self.chk_size = self.input.param("chk_size", 5000)
master = self.servers[0]
num_replicas = self.input.param("replicas", 1)
@@ -66,7 +69,7 @@ def checkpoint_create_items(self):
self._set_checkpoint_size(self.servers[:self.num_servers], self.bucket, '5000')
chk_stats = StatsCommon.get_stats(self.servers[:self.num_servers], self.bucket,
param, stat_key)
- load_thread = self.generate_load(master, self.bucket, num_items)
+ load_thread = self.generate_load(master, self.bucket, num_items, "key1")
load_thread.join()
stats = []
for server, value in chk_stats.items():
@@ -90,7 +93,7 @@ def checkpoint_create_time(self):
stats = []
for server, value in chk_stats.items():
StatsCommon.build_stat_check(server, param, stat_key, '>', value, stats)
- load_thread = self.generate_load(master, self.bucket, 1)
+ load_thread = self.generate_load(master, self.bucket, 1, "key1")
load_thread.join()
log.info("Sleeping for {0} seconds)".format(timeout))
time.sleep(timeout)
@@ -114,7 +117,7 @@ def checkpoint_collapse(self):
self._set_checkpoint_size(self.servers[:self.num_servers], self.bucket, str(chk_size))
m_stats = StatsCommon.get_stats([master], self.bucket, param, stat_key)
self._stop_replication(slave2, self.bucket)
- load_thread = self.generate_load(master, self.bucket, num_items)
+ load_thread = self.generate_load(master, self.bucket, num_items, "key1")
load_thread.join()
stats = []
@@ -148,9 +151,9 @@ def checkpoint_deduplication(self):
slave1 = self._get_server_by_state(self.servers[:self.num_servers], self.bucket, REPLICA1)
self._set_checkpoint_size(self.servers[:self.num_servers], self.bucket, '5000')
self._stop_replication(slave1, self.bucket)
- load_thread = self.generate_load(master, self.bucket, 4500)
+ load_thread = self.generate_load(master, self.bucket, 4500, "key1")
load_thread.join()
- load_thread = self.generate_load(master, self.bucket, 1000)
+ load_thread = self.generate_load(master, self.bucket, 1000, "key1")
load_thread.join()
self._start_replication(slave1, self.bucket)
@@ -163,6 +166,84 @@ def checkpoint_deduplication(self):
except TimeoutError:
self.fail("Items weren't deduplicated")
+ def checkpoint_failover(self):
+ """Testing checkpoints in a setup- failover slave1 and stop replication on slave2
+ Expect backfill to trigger in, when more data is loaded on the master and slave2
+
+ Failure Condition:
+ Error: Checkpoint stats from either nodes dont match.
+
+ TODO :
+ Eliminate/Merge _get_stats_checkpoint with existing build_stats_check function
+ """
+ param = 'checkpoint'
+ stat_key = 'vb_0:last_closed_checkpoint_id'
+ master = self._get_server_by_state(self.servers[:self.num_servers], self.bucket, ACTIVE)
+ slave1 = self._get_server_by_state(self.servers[:self.num_servers], self.bucket, REPLICA1)
+ slave2 = self._get_server_by_state(self.servers[:self.num_servers], self.bucket, REPLICA2)
+ self._set_checkpoint_size(self.servers[:self.num_servers], self.bucket, str(self.chk_size))
+
+ delta = self._calc_checkpoint_delta(self.items)
+ chk_pnt_master = delta
+ chk_pnt_slave1 = delta
+ chk_pnt_slave2 = delta
+
+ log.info("Loaded %s items on master[%s], slave1[%s], slave2[%s]"
+ % (self.items, master, slave1, slave2))
+ # load items items
+ load_thread = self.generate_load(master, self.bucket, self.items, "key1")
+ load_thread.join()
+
+ self._get_checkpoint_stats(master, chk_pnt_master)
+ self._get_checkpoint_stats(slave1, chk_pnt_slave1)
+ self._get_checkpoint_stats(slave2, chk_pnt_slave2)
+
+ # Throttle slave2
+ log.info("Throttle slave2[node %s] and load %s items" % (slave2, self.items))
+ self._stop_replication(slave2, self.bucket)
+ # load items
+ load_thread = self.generate_load(master, self.bucket, self.items, "key2")
+ load_thread.join()
+
+ delta = self._calc_checkpoint_delta(self.items)
+ chk_pnt_master = chk_pnt_master + delta
+ chk_pnt_slave1 = chk_pnt_slave1 + delta
+
+ self._get_checkpoint_stats(master, chk_pnt_master)
+ self._get_checkpoint_stats(slave1, chk_pnt_slave1)
+ self._get_checkpoint_stats(slave2, chk_pnt_slave2)
+
+ #Failover slave1
+ if self._failover(master, slave1) is True:
+ log.info("Failed over slave1[node %s] and loading %s items" % (slave1, self.items))
+ else:
+ self.fail("Error! Unable to failover node %s" % slave1)
+
+ # load items
+ load_thread = self.generate_load(master, self.bucket, self.items, "key3")
+ load_thread.join()
+
+ delta = self._calc_checkpoint_delta(self.items)
+ chk_pnt_master = chk_pnt_master + delta
+ #error on execution -slave1 chkpoint ==master chkpoint
+
+ self._get_checkpoint_stats(master, chk_pnt_master)
+ self._get_checkpoint_stats(slave1, chk_pnt_slave1)
+ self._get_checkpoint_stats(slave2, chk_pnt_slave2)
+
+ # Enable Replication on slave2
+ log.info("Enable replication on slave2[node %s]" % slave2)
+ self._start_replication(slave2, self.bucket)
+
+ time.sleep(2)
+ chk_pnt_slave2 = chk_pnt_master
+ self._get_checkpoint_stats(master, chk_pnt_master)
+ self._get_checkpoint_stats(slave1, chk_pnt_slave1)
+ self._get_checkpoint_stats(slave2, chk_pnt_slave2)
+
+ # TODO:Add kv-store integrity stuff on this.
+
+
def _set_checkpoint_size(self, servers, bucket, size):
for server in servers:
client = MemcachedClientHelper.direct_client(server, bucket)
@@ -213,24 +294,56 @@ def _get_server_by_state(self, servers, bucket, vb_state):
return server
return None
- def generate_load(self, server, bucket, num_items):
+ def generate_load(self, server, bucket, num_items, prefix):
class LoadGen(Thread):
- def __init__(self, server, bucket, num_items):
+ def __init__(self, server, bucket, num_items, prefix):
Thread.__init__(self)
self.server = server
self.bucket = bucket
self.num_items = num_items
+ self.prefix = prefix
def run(self):
client = MemcachedClientHelper.direct_client(server, bucket)
for i in range(num_items):
- key = "key-{0}".format(i)
+ key = "key-{0}-{1}".format(prefix, i)
value = "value-{0}".format(str(uuid.uuid4())[:7])
client.set(key, 0, 0, value, 0)
log.info("Loaded {0} key".format(num_items))
- load_thread = LoadGen(server, bucket, num_items)
+ load_thread = LoadGen(server, bucket, num_items, prefix)
load_thread.start()
return load_thread
+ def _failover(self, master, failover_node):
+ rest = RestConnection(master)
+ return rest.fail_over(failover_node)
+
+ def _get_checkpoint_stats(self, server, checkpoint):
+ start = time.time()
+ while time.time() - start <= 60:
+ mc_conn = MemcachedClientHelper.direct_client(server, self.bucket)
+ stats = mc_conn.stats("checkpoint")
+ last_closed_chkpoint = stats["vb_0:last_closed_checkpoint_id"]
+
+ log.info(
+ "Bucket {0} node{1}:{2} \n open_checkpoint : {3}, closed checkpoint : {4} last_closed {5} expected {6}"
+ .format(self.bucket, server.ip, server.port, stats["vb_0:open_checkpoint_id"],
+ stats["vb_0:last_closed_checkpoint_id"], last_closed_chkpoint, checkpoint))
+ if int(last_closed_chkpoint) == checkpoint:
+ log.info(
+ "Success! Checkpoint_id: {0} matches expected : {1} for node {2}:{3}".format(
+ last_closed_chkpoint, checkpoint, server.ip,
+ server.port))
+ break
+ else:
+ log.info(
+ "Checkpoint_id: {0} does not match expected : {1} for node {0}:{1} do not match yet. Try again .."
+ .format(last_closed_chkpoint, checkpoint, server.ip, server.port))
+ time.sleep(2)
+ else:
+ self.fail("Unable to get checkpoint stats from the node{0}:{1}".format(server.ip, server.port))
+
+ def _calc_checkpoint_delta(self, new_items):
+ return int(new_items / self.chk_size)

0 comments on commit d06cc88

Please sign in to comment.
Something went wrong with that request. Please try again.