Permalink
Browse files

CBQE-0 : DB and View Compaction tests

Change-Id: I5a4be7a0727533e9abf1b6d39f27f75b92fc3ff1
Reviewed-on: http://review.couchbase.org/38005
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Iryna Mironava <irynamironava@yandex.ru>
Reviewed-by: Meenakshi Goel <meenakshi.goel@globallogic.com>
Tested-by: Meenakshi Goel <meenakshi.goel@globallogic.com>
  • Loading branch information...
1 parent d874faa commit db81a782544986bec1f49f35a0406e28a6c9fe89 @meenakshi25 meenakshi25 committed with meenakshi25 Jun 9, 2014
@@ -741,17 +741,33 @@ def bucket_flush(self, server, bucket='default', timeout=None):
_task = self.async_bucket_flush(server, bucket)
return _task.result(timeout)
- def async_monitor_db_fragmentation(self, server, fragmentation, bucket):
+ def async_monitor_db_fragmentation(self, server, fragmentation, bucket, get_view_frag=False):
"""Asyncronously monitor db fragmentation
Parameters:
servers - server to check(TestInputServers)
bucket - bucket to check
fragmentation - fragmentation to reach
+ get_view_frag - Monitor view fragmentation. In case enabled When <fragmentation_value> is reached this method will return (boolean)
Returns:
MonitorDBFragmentationTask - A task future that is a handle to the scheduled task"""
- _task = MonitorDBFragmentationTask(server, fragmentation, bucket)
+ _task = MonitorDBFragmentationTask(server, fragmentation, bucket, get_view_frag)
+ self.task_manager.schedule(_task)
+ return _task
+
+ def async_monitor_disk_size_fragmentation(self, server, fragmentation, bucket, get_view_frag=False):
+ """Asyncronously monitor disk size fragmentation
+
+ Parameters:
+ servers - server to check(TestInputServers)
+ bucket - bucket to check
+ fragmentation - fragmentation to reach
+ get_view_frag - Monitor view fragmentation. In case enabled When <fragmentation_value> is reached this method will return (boolean)
+
+ Returns:
+ MonitorDiskSizeFragmentationTask - A task future that is a handle to the scheduled task"""
+ _task = MonitorDiskSizeFragmentationTask(server, fragmentation, bucket, get_view_frag)
self.task_manager.schedule(_task)
return _task
@@ -826,3 +842,43 @@ def compact_bucket(self, server, bucket = "default"):
_task = self.async_compact_bucket(server, bucket)
status = _task.result()
return status
+
+ def async_monitor_compact_view(self, server, design_doc_name, bucket="default", with_rebalance=False, frag_value=0):
+ """Asynchronously montior view compaction.
+
+ Parameters:
+ server - The server to handle fragmentation config task. (TestInputServer)
+ design_doc_name - design doc with views represented in index file. (String)
+ bucket - The name of the bucket design_doc belongs to. (String)
+ with_rebalance - there are two cases that process this parameter:
+ "Error occured reading set_view _info" will be ignored if True
+ (This applies to rebalance in case),
+ and with concurrent updates(for instance, with rebalance)
+ it's possible that compaction value has not changed significantly
+ frag_value - ViewFragmentationThresholdPercentage set to be compared with fragmentaion value after compaction
+
+ Returns:
+ MonitorViewCompactionTask - A task future that is a handle to the scheduled task."""
+
+
+ _task = MonitorViewCompactionTask(server, design_doc_name, bucket, with_rebalance, frag_value)
+ self.task_manager.schedule(_task)
+ return _task
+
+ def monitor_compact_view(self, server, design_doc_name, bucket="default", timeout=None, with_rebalance=False, frag_value=0):
+ """Synchronously monitor view compaction.
+
+ Parameters:
+ server - The server to handle fragmentation config task. (TestInputServer)
+ design_doc_name - design doc with views represented in index file. (String)
+ bucket - The name of the bucket design_doc belongs to. (String)
+ with_rebalance - "Error occured reading set_view _info" will be ignored if True
+ and with concurrent updates(for instance, with rebalance)
+ it's possible that compaction value has not changed significantly
+ frag_value - ViewFragmentationThresholdPercentage set to be compared with fragmentaion value after compaction
+
+ Returns:
+ boolean - True file size reduced after compaction, False if successful but no work done """
+
+ _task = self.async_monitor_compact_view(server, design_doc_name, bucket, with_rebalance, frag_value)
+ return _task.result(timeout)
@@ -1877,8 +1877,8 @@ def set_mc_threads(self, mc_threads=4):
def set_auto_compaction(self, parallelDBAndVC="false",
dbFragmentThreshold=None,
viewFragmntThreshold=None,
- dbFragmentThresholdPercentage=100,
- viewFragmntThresholdPercentage=100,
+ dbFragmentThresholdPercentage=None,
+ viewFragmntThresholdPercentage=None,
allowedTimePeriodFromHour=None,
allowedTimePeriodFromMin=None,
allowedTimePeriodToHour=None,
@@ -1911,7 +1911,7 @@ def set_auto_compaction(self, parallelDBAndVC="false",
if dbFragmentThreshold is not None:
params["databaseFragmentationThreshold[size]"] = dbFragmentThreshold
if viewFragmntThreshold is not None:
- params["viewFragmentationThreshold[percentage]"] = viewFragmntThreshold
+ params["viewFragmentationThreshold[size]"] = viewFragmntThreshold
if dbFragmentThresholdPercentage is not None:
params["databaseFragmentationThreshold[percentage]"] = dbFragmentThresholdPercentage
if viewFragmntThresholdPercentage is not None:
View
@@ -1891,7 +1891,7 @@ def execute(self, task_manager):
for task in tasks:
if task["type"] == self.type and ((
self.target_key == "design_documents" and task[self.target_key][0] == self.target_value) or (
- self.target_key == "original_target" and task[self.target_key] == self.target_value) or (
+ self.target_key == "original_target" and task[self.target_key]["type"] == self.target_value) or (
self.target_key == "initial_build" and str(task[self.target_key]) == self.target_value)):
self.current_progress = task["progress"]
self.task_pid = task["pid"]
@@ -2779,13 +2779,13 @@ class MonitorDBFragmentationTask(Task):
it is best user to use lower value as this can lead to infinite monitoring.
"""
- def __init__(self, server, fragmentation_value=10, bucket="default"):
+ def __init__(self, server, fragmentation_value=10, bucket="default", get_view_frag=False):
Task.__init__(self, "monitor_frag_db_task")
self.server = server
self.bucket = bucket
self.fragmentation_value = fragmentation_value
-
+ self.get_view_frag = get_view_frag
def execute(self, task_manager):
@@ -2803,9 +2803,12 @@ def check(self, task_manager):
try:
rest = RestConnection(self.server)
stats = rest.fetch_bucket_stats(bucket=self.bucket)
- new_frag_value = stats["op"]["samples"]["couch_docs_fragmentation"][-1]
-
- self.log.info("current amount of fragmentation = %d" % new_frag_value)
+ if self.get_view_frag:
+ new_frag_value = stats["op"]["samples"]["couch_views_fragmentation"][-1]
+ self.log.info("Current amount of views fragmentation = %d" % new_frag_value)
+ else:
+ new_frag_value = stats["op"]["samples"]["couch_docs_fragmentation"][-1]
+ self.log.info("current amount of docs fragmentation = %d" % new_frag_value)
if new_frag_value >= self.fragmentation_value:
self.state = FINISHED
self.set_result(True)
@@ -2971,10 +2974,7 @@ def execute(self, task_manager):
task_manager.schedule(self)
-
def check(self, task_manager):
-
-
# check bucket compaction status across all nodes
nodes = self.rest.get_nodes()
@@ -2989,16 +2989,13 @@ def check(self, task_manager):
if running:
self.statuses[node.id] = (progress == 100)
-
done = all(self.statuses.values())
-
if done:
# task was completed sucessfully
self.set_result(True)
self.state = FINISHED
else:
-
if self.retries > 0:
# retry
self.retries = self.retries - 1
@@ -3007,3 +3004,171 @@ def check(self, task_manager):
# never detected a compaction task running
self.set_result(False)
self.state = FINISHED
+
+ def _get_disk_size(self):
+ stats = self.rest.fetch_bucket_stats(bucket=self.bucket)
+ total_disk_size = stats["op"]["samples"]["couch_total_disk_size"][-1]
+ self.log.info("Disk size is = %d" % total_disk_size)
+ return total_disk_size
+
+class MonitorViewCompactionTask(ViewCompactionTask):
+
+ def __init__(self, server, design_doc_name, bucket="default", with_rebalance=False, frag_value=0):
+ ViewCompactionTask.__init__(self, server, design_doc_name, bucket, with_rebalance)
+ self.ddoc_id = "_design%2f" + design_doc_name
+ self.compaction_revision = 0
+ self.precompacted_fragmentation = 0
+ self.fragmentation_value = frag_value
+ self.rest = RestConnection(self.server)
+
+ def execute(self, task_manager):
+ try:
+ self.compaction_revision, self.precompacted_fragmentation = self._get_compaction_details()
+ self.log.info("{0}: stats compaction before triggering it: ({1},{2})".
+ format(self.design_doc_name, self.compaction_revision, self.precompacted_fragmentation))
+ self.disk_size = self._get_disk_size()
+ self.log.info("Disk Size Before Compaction {0}".format(self.disk_size))
+ if self.precompacted_fragmentation == 0:
+ self.log.warn("%s: There is nothing to compact, fragmentation is 0" %self.design_doc_name)
+ self.set_result(False)
+ self.state = FINISHED
+ elif self.precompacted_fragmentation < self.fragmentation_value:
+ self.log.info("{0}: Compaction is already done and there is nothing to compact, current fragmentation is lesser {1} {2}".
+ format(self.design_doc_name, self.precompacted_fragmentation, self.fragmentation_value))
+ self.compaction_revision, self.precompacted_fragmentation = self._get_compaction_details()
+ self.log.info("{0}: stats compaction before triggering it: ({1},{2})".
+ format(self.design_doc_name, self.compaction_revision, self.precompacted_fragmentation))
+ self.set_result(True)
+ self.state = FINISHED
+ return
+ self.state = CHECKING
+ task_manager.schedule(self, 2)
+ except (CompactViewFailed, SetViewInfoNotFound) as ex:
+ self.state = FINISHED
+ self.set_exception(ex)
+ # catch and set all unexpected exceptions
+ except Exception as e:
+ self.state = FINISHED
+ self.log.error("Unexpected Exception Caught")
+ self.set_exception(e)
+
+ # verify compaction history incremented and some defraging occurred
+ def check(self, task_manager):
+ try:
+ _compaction_running = self._is_compacting()
+ new_compaction_revision, fragmentation = self._get_compaction_details()
+ self.log.info("{0}: stats compaction:revision and fragmentation: ({1},{2})".
+ format(self.design_doc_name, new_compaction_revision, fragmentation))
+ curr_disk_size = self._get_disk_size()
+ self.log.info("Current Disk Size {0}".format(curr_disk_size))
+ if new_compaction_revision == self.compaction_revision and _compaction_running:
+ # compaction ran successfully but compaction was not changed, perhaps we are still compacting
+ self.log.info("design doc {0} is compacting".format(self.design_doc_name))
+ task_manager.schedule(self, 3)
+ elif self.precompacted_fragmentation > fragmentation:
+ self.log.info("%s: Pre Compacted fragmentation is more, before Compaction %d and after Compaction %d" % \
+ (self.design_doc_name, self.precompacted_fragmentation, fragmentation))
+ frag_val_diff = fragmentation - self.precompacted_fragmentation
+ if new_compaction_revision == self.compaction_revision or new_compaction_revision > self.compaction_revision:
+ self.log.info("{1}: compactor was run, compaction revision was changed on {0}".
+ format(new_compaction_revision, self.design_doc_name))
+ self.log.info("%s: fragmentation went from %d to %d" % (self.design_doc_name, self.precompacted_fragmentation, fragmentation))
+ if frag_val_diff > 0:
+ if self._is_compacting():
+ task_manager.schedule(self, 5)
+ self.log.info("compaction was completed, but fragmentation value {0} is more than before compaction {1}".
+ format(fragmentation, self.precompacted_fragmentation))
+ self.log.info("Load is still in progress, Need to be checked")
+ self.set_result(self.with_rebalance)
+ else:
+ self.set_result(True)
+ self.state = FINISHED
+ else:
+ for i in xrange(10):
+ time.sleep(3)
+ if self._is_compacting():
+ task_manager.schedule(self, 2)
+ return
+ else:
+ new_compaction_revision, fragmentation = self._get_compaction_details()
+ self.log.info("{2}: stats compaction: ({0},{1})".format(new_compaction_revision, fragmentation,
+ self.design_doc_name))
+ curr_disk_size = self._get_disk_size()
+ self.log.info("Disk Size went from {0} {1}".format(self.disk_size, curr_disk_size))
+ if new_compaction_revision > self.compaction_revision and self.precompacted_fragmentation > fragmentation:
+ self.log.warn("the compaction revision was increase and fragmentation value went from {0} {1}".
+ format(self.precompacted_fragmentation, fragmentation))
+ self.set_result(True)
+ self.state = FINISHED
+ return
+ elif new_compaction_revision > self.compaction_revision and self.with_rebalance:
+ self.log.warn("the compaction revision was increased, but the actual fragmentation value has not changed significantly")
+ self.set_result(True)
+ self.state = FINISHED
+ return
+ else:
+ continue
+ self.log.info("design doc {0} is compacting:{1}".format(self.design_doc_name, self._is_compacting()))
+ new_compaction_revision, fragmentation = self._get_compaction_details()
+ self.log.error("stats compaction still: ({0},{1})".
+ format(new_compaction_revision, fragmentation))
+ status, content = self.rest.set_view_info(self.bucket, self.design_doc_name)
+ stats = content["stats"]
+ self.log.warn("general compaction stats:{0}".format(stats))
+ self.state = FINISHED
+ self.set_result(False)
+ self.set_exception(Exception("Check system logs, looks like compaction failed to start"))
+ except (SetViewInfoNotFound) as ex:
+ self.state = FINISHED
+ self.set_exception(ex)
+ # catch and set all unexpected exceptions
+ except Exception as e:
+ self.state = FINISHED
+ self.log.error("Unexpected Exception Caught")
+ self.set_exception(e)
+
+ def _get_disk_size(self):
+ nodes_ddoc_info = MonitorViewFragmentationTask.aggregate_ddoc_info(self.rest, self.design_doc_name,
+ self.bucket, self.with_rebalance)
+ disk_size = sum([content['disk_size'] for content in nodes_ddoc_info])
+ return disk_size
+
+class MonitorDiskSizeFragmentationTask(Task):
+ def __init__(self, server, fragmentation_value=10, bucket="default", get_view_frag=False):
+ Task.__init__(self, "monitor_frag_db_task")
+ self.server = server
+ self.bucket = bucket
+ self.fragmentation_value = fragmentation_value
+ self.get_view_frag = get_view_frag
+ self.rest = RestConnection(self.server)
+ self.curr_disk_size = 0
+
+ def execute(self, task_manager):
+ if self.fragmentation_value < 0:
+ err_msg = \
+ "Invalid value for fragmentation %d" % self.fragmentation_value
+ self.state = FINISHED
+ self.set_exception(Exception(err_msg))
+ self.state = CHECKING
+ task_manager.schedule(self, 5)
+
+ def check(self, task_manager):
+ try:
+ rest = RestConnection(self.server)
+ stats = rest.fetch_bucket_stats(bucket=self.bucket)
+ if self.get_view_frag:
+ new_disk_size = stats["op"]["samples"]["couch_views_actual_disk_size"][-1]
+ else:
+ new_disk_size = stats["op"]["samples"]["couch_total_disk_size"][-1]
+ if self.curr_disk_size > new_disk_size:
+ self.state = FINISHED
+ self.set_result(True)
+ else:
+ # try again
+ task_manager.schedule(self, 5)
+ self.log.info("New and Current Disk size is {0} {1}".format(new_disk_size, self.curr_disk_size))
+ self.curr_disk_size = new_disk_size
+ except Exception, ex:
+ self.state = FINISHED
+ self.set_result(False)
+ self.set_exception(ex)
@@ -26,6 +26,9 @@
VERSION_FILE = "VERSION.txt"
MIN_COMPACTION_THRESHOLD = 2
MAX_COMPACTION_THRESHOLD = 100
+MIN_TIME_VALUE = 0
+MAX_TIME_MINUTE = 59
+MAX_TIME_HOUR = 23
NUM_ERLANG_THREADS = 16
LINUX_COUCHBASE_BIN_PATH = "/opt/couchbase/bin/"
WIN_COUCHBASE_BIN_PATH = '/cygdrive/c/Program\ Files/Couchbase/Server/bin/'
Oops, something went wrong. Retry.

0 comments on commit db81a78

Please sign in to comment.