Skip to content

Commit

Permalink
CBQE-1148: add monitor db fragmentation task
Browse files Browse the repository at this point in the history
Change-Id: Ida3dad0fba3e8aebe65b7b5413d56172a744c107
Reviewed-on: http://review.couchbase.org/25467
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Deepkaran Salooja <deepkaran.salooja@globallogic.com>
  • Loading branch information
IrynaMironava committed Apr 11, 2013
1 parent 0670b11 commit c296178
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 10 deletions.
14 changes: 14 additions & 0 deletions lib/couchbase/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,17 @@ def bucket_flush(self, server, bucket='default', timeout=None):
boolean - Whether or not the bucket was flushed."""
_task = self.async_bucket_flush(server, bucket)
return _task.result(timeout)

def async_monitor_db_fragmentation(self, server, fragmentation, bucket):
"""Asyncronously monitor db fragmentation
Parameters:
servers - server to check(TestInputServers)
bucket - bucket to check
fragmentation - fragmentation to reach
Returns:
MonitorDBFragmentationTask - A task future that is a handle to the scheduled task"""
_task = MonitorDBFragmentationTask(server, fragmentation, bucket)
self.task_manager.schedule(_task)
return _task
49 changes: 49 additions & 0 deletions lib/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2219,3 +2219,52 @@ def check(self, task_manager):
self.state = FINISHED
self.log.error("Unexpected Exception Caught")
self.set_exception(e)

class MonitorDBFragmentationTask(Task):

"""
Attempt to monitor fragmentation that is occurring for a given bucket.
Note: If autocompaction is enabled and user attempts to monitor for fragmentation
value higher than level at which auto_compaction kicks in a warning is sent and
it is best user to use lower value as this can lead to infinite monitoring.
"""

def __init__(self, server, fragmentation_value=10, bucket="default"):

Task.__init__(self, "monitor_frag_db_task")
self.server = server
self.bucket = bucket
self.fragmentation_value = fragmentation_value


def execute(self, task_manager):

# sanity check of fragmentation value
if self.fragmentation_value < 0 or self.fragmentation_value > 100:
err_msg = \
"Invalid value for fragmentation %d" % self.fragmentation_value
self.state = FINISHED
self.set_exception(Exception(err_msg))

self.state = CHECKING
task_manager.schedule(self, 5)

def check(self, task_manager):

try:
rest = RestConnection(self.server)
stats = rest.fetch_bucket_stats(bucket=self.bucket)
new_frag_value = stats["op"]["samples"]["couch_docs_fragmentation"][-1]

self.log.info("current amount of fragmentation = %d" % new_frag_value)
if new_frag_value >= self.fragmentation_value:
self.state = FINISHED
self.set_result(True)
else:
# try again
task_manager.schedule(self, 2)
except Exception, ex:
self.state = FINISHED
self.set_result(False)
self.set_exception(ex)
37 changes: 27 additions & 10 deletions pytests/autocompaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@
from testconstants import MIN_COMPACTION_THRESHOLD
from testconstants import MAX_COMPACTION_THRESHOLD
from TestInput import TestInputSingleton
from basetestcase import BaseTestCase
from membase.api.rest_client import RestConnection
from membase.helper.bucket_helper import BucketOperationHelper
from remote.remote_util import RemoteMachineShellConnection
from couchbase.documentgenerator import BlobGenerator
from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached


class AutoCompactionTests(unittest.TestCase):
class AutoCompactionTests(BaseTestCase):

servers = None
clients = None
log = None
input = None

def setUp(self):
self.log = logger.Logger.get_logger()
self.input = TestInputSingleton.input
self.servers = self.input.servers
self.autocompaction_value = TestInputSingleton.input.param("autocompaction_value", 0)
super(AutoCompactionTests, self).setUp()
self.autocompaction_value = self.input.param("autocompaction_value", 0)
BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)

@staticmethod
Expand All @@ -39,12 +39,23 @@ def insert_key(serverInfo, bucket_name, count, size):
value = {"value" : MemcachedClientHelper.create_value("*", size)}
smart.memcached(key).set(key, 0, 0, json.dumps(value))

def load(self, server, compaction_value, bucket_name, gen):
monitor_fragm = self.cluster.async_monitor_db_fragmentation(server, compaction_value, bucket_name)
end_time = time.time() + self.wait_timeout * 50
# generate load until fragmentation reached
while monitor_fragm.state != "FINISHED":
if end_time < time.time():
self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 50)
# update docs to create fragmentation
self._load_all_buckets(server, gen, "update", 0)
monitor_fragm.result()

def test_database_fragmentation(self):
percent_threshold = self.autocompaction_value
bucket_name = "default"
MAX_RUN = 100
item_size = 1024
update_item_size = item_size * ((float(97 - percent_threshold)) / 100)
update_item_size = item_size * ((float(100 - percent_threshold)) / 100)
serverInfo = self.servers[0]
self.log.info(serverInfo)
rest = RestConnection(serverInfo)
Expand All @@ -67,25 +78,31 @@ def test_database_fragmentation(self):

available_ram = info.memoryQuota * (node_ram_ratio) / 2
items = (int(available_ram * 1000) / 2) / item_size
print "ITEMS =============%s" % items
rest.create_bucket(bucket=bucket_name, ramQuotaMB=int(available_ram), authType='sasl',
saslPassword='password', replicaNumber=1, proxyPort=11211)
BucketOperationHelper.wait_for_memcached(serverInfo, bucket_name)
BucketOperationHelper.wait_for_vbuckets_ready_state(serverInfo, bucket_name)

self.log.info("start to load {0}K keys with {1} bytes/key".format(items, item_size))
self.insert_key(serverInfo, bucket_name, items, item_size)
#self.insert_key(serverInfo, bucket_name, items, item_size)
generator = BlobGenerator('compact', 'compact-', int(item_size), start=0, end=(items * 1000))
self._load_all_buckets(self.master, generator, "create", 0, 1)
self.log.info("sleep 10 seconds before the next run")
time.sleep(10)

self.log.info("start to update {0}K keys with smaller value {1} bytes/key".format(items,
int(update_item_size)))

insert_thread = Thread(target=self.insert_key,
generator_update = BlobGenerator('compact', 'compact-', int(update_item_size), start=0, end=(items * 1000))
insert_thread = Thread(target=self.load,
name="insert",
args=(serverInfo, bucket_name, items, int(update_item_size)))
args=(self.master, self.autocompaction_value,
self.default_bucket_name, generator_update))
try:
insert_thread.start()
compact_run = remote_client.wait_till_compaction_end(rest, bucket_name, timeout_in_seconds=300)
compact_run = remote_client.wait_till_compaction_end(rest, bucket_name,
timeout_in_seconds=(self.wait_timeout * 50))
if not compact_run:
self.fail("auto compaction does not run")
elif compact_run:
Expand Down

0 comments on commit c296178

Please sign in to comment.