Skip to content
Browse files

CBQE-613: decouple node stats from workload ops

Collect stats wether or not workload is running.
Creates a new NodeStats object for keeping
track of stats across nodes

Change-Id: Ib2b62716bf2a7d5ec5d7c34026c6d39b703be3e4
Reviewed-on: http://review.couchbase.org/21087
Reviewed-by: Karan Kumar <karan@couchbase.com>
Tested-by: Tommie McAfee <tommie@couchbase.com>
  • Loading branch information...
1 parent e923570 commit b4d96a7f7f65d4950c69ec8e77c7b7b6b3de1d15 Tommie McAfeee committed Sep 25, 2012
Showing with 69 additions and 32 deletions.
  1. +48 −30 pysystests/app/stats.py
  2. +1 −1 pysystests/app/workload_manager.py
  3. +20 −1 pysystests/cache.py
View
78 pysystests/app/stats.py
@@ -4,7 +4,7 @@
import sys
from app.celery import celery
import testcfg as cfg
-from cache import WorkloadCacher
+from cache import NodeStatsCacher
sys.path=["../lib"] + sys.path
from membase.api.rest_client import RestConnection
@@ -16,38 +16,47 @@
@celery.task
def resource_monitor():
- cache = WorkloadCacher()
- for workload in cache.workloads:
+ rest = rest_connect(cfg.COUCHBASE_IP,
+ cfg.COUCHBASE_PORT,
+ cfg.COUCHBASE_USER,
+ cfg.COUCHBASE_PWD)
- # filter on active workload
- if workload.active:
- bucket = str(workload.bucket)
- stat_checker = StatChecker(cfg.COUCHBASE_IP +":"+cfg.COUCHBASE_PORT,
- bucket = bucket,
- username = cfg.COUCHBASE_USER,
- password = cfg.COUCHBASE_PWD)
+ nodes = rest.node_statuses()
+ # cache sample of latest stats on all nodes
+ for node in nodes:
- nodes = stat_checker.nodes()
- for node in nodes:
- # check if atop running (could be new node)
- if isinstance(node.ip, unicode):
- node.ip = str(node.ip)
- check_atop_proc(node.ip)
+ # check if atop running (could be new node)
+ if isinstance(node.ip, unicode):
+ node.ip = str(node.ip)
+ check_atop_proc(node.ip)
- # cache sample of latest stats on all nodes
- sample = get_atop_sample(node.ip)
- if node.ip not in workload.stats:
- workload.stats.update({node.ip : []})
- workload.stats[node.ip].append(sample)
+ # retrieve stats from cache
+ node_stats = NodeStatsCacher().nodestats(node.ip)
+ if node_stats is None:
+ node_stats = NodeStats(node.ip)
- #TODO: log to file, putting to stdout via error flag
- logger.error(workload.stats[node.ip][-1])
+ # get stats from node
+ sample = get_atop_sample(node.ip)
- WorkloadCacher().store(workload)
+ # update node state object
+ update_node_stats(node_stats, sample)
return True
+def update_node_stats(node_stats, sample):
+
+ cache = NodeStatsCacher()
+ for key in sample.keys():
+ if key != 'ip':
+
+ if key not in node_stats.samples:
+ node_stats.samples[key] = []
+
+ val = float(re.sub(r'[^\d.]+', '', sample[key]))
+ node_stats.samples[key].append(val)
+ cache.store(node_stats)
+
def check_atop_proc(ip):
proc_signature = "atop -a -w %s 3" % cfg.ATOP_LOG_FILE
res = exec_cmd(ip, "ps aux |grep '%s' | wc -l " % proc_signature)
@@ -144,12 +153,7 @@ def __init__(self, addr, bucket = "default", username = "Administrator", passwor
self.username = username
self.password = password
self.bucket = bucket
- serverInfo = { "ip" : self.ip,
- "port" : self.port,
- "rest_username" : self.username,
- "rest_password" : self.password }
- self.node = _dict_to_obj(serverInfo)
- self.rest = RestConnection(self.node)
+ self.rest = rest_connect(self.ip, self.port, self.username, self.password)
def check(self, condition, datatype = int):
@@ -193,6 +197,20 @@ def parse_condition(self, condition):
logger.error("Invalid condition syntax: %s" % condition)
raise AttributeError(condition)
+class NodeStats(object):
+ def __init__(self, ip):
+ self.id = ip
+ self.samples = {}
+
+def rest_connect(ip, port, username, password):
+ serverInfo = { "ip" : ip,
+ "port" : port,
+ "rest_username" : username,
+ "rest_password" : password }
+ node = _dict_to_obj(serverInfo)
+ rest = RestConnection(node)
+ return rest
+
def _dict_to_obj(dict_):
return type('OBJ', (object,), dict_)
View
2 pysystests/app/workload_manager.py
@@ -430,7 +430,6 @@ def __init__(self, params):
self.cc_queues = params["cc_queues"]
self.wait = params["wait"]
self.expires = params["expires"]
- self.stats = {}
# consume from cc_queue by default if not specified
if self.cc_queues != None:
@@ -483,3 +482,4 @@ def _set_mode(self, bucket, mode):
self.history[bucket]["mode"] = mode
+
View
21 pysystests/cache.py
@@ -7,6 +7,7 @@
WORKLOADCACHEKEY = "WORKLOADCACHEKEY"
TEMPLATECACHEKEY = "TEMPLATECACHEKEY"
BUCKETSTATUSCACHEKEY = "BUCKETSTATUSCACHEKEY"
+NODESTATSCACHEKEY = "NODESTATSCACHEKEY"
class Cache(object):
def __init__(self):
@@ -170,9 +171,27 @@ def bucketstatus(self, key):
def clear(self):
super(BucketStatusCacher, self).clear(BUCKETSTATUSCACHEKEY)
+class NodeStatsCacher(Cache):
+
+ @property
+ def allnodestats(self):
+ return self.fetchCollection(NODESTATSCACHEKEY)
+
+ def store(self, nodestats):
+ id_ = getContextKey(NODESTATSCACHEKEY, nodestats.id)
+ super(NodeStatsCacher, self).store(id_, nodestats, NODESTATSCACHEKEY)
+
+ def nodestats(self, key):
+ id_ = getContextKey(NODESTATSCACHEKEY, key)
+ return self.retrieve(id_)
+
+ def clear(self):
+ super(NodeStatsCacher, self).clear(NODESTATSCACHEKEY)
+
+
def cacheClean():
- for cacheKey in [WORKLOADCACHEKEY, BUCKETSTATUSCACHEKEY]:
+ for cacheKey in [WORKLOADCACHEKEY, BUCKETSTATUSCACHEKEY, NODESTATSCACHEKEY]:
Cache().clear(cacheKey)

0 comments on commit b4d96a7

Please sign in to comment.
Something went wrong with that request. Please try again.