Skip to content

Commit

Permalink
Add metrics reporting with statsd
Browse files Browse the repository at this point in the history
  • Loading branch information
priteau committed Mar 25, 2013
1 parent af1857a commit 792b83e
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 17 deletions.
51 changes: 49 additions & 2 deletions epu/dashiproc/dtrs.py
@@ -1,9 +1,15 @@
import logging
import time

from dashi import bootstrap, DashiError
from dashi.exceptions import NotFoundError as DashiNotFoundError
from dashi.exceptions import WriteConflictError as DashiWriteConflictError

try:
from statsd import StatsClient
except ImportError:
StatsClient = None

from epu.dtrs.core import DTRSCore
from epu.dtrs.store import get_dtrs_store
from epu.exceptions import DeployableTypeLookupError, DeployableTypeValidationError, NotFoundError, WriteConflictError
Expand Down Expand Up @@ -121,73 +127,114 @@ def update_credentials(self, caller, site_name, site_credentials):
def lookup(self, caller, dt_name, dtrs_request_node, vars):
return self.core.lookup(caller, dt_name, dtrs_request_node, vars)

def statsd(func):
def call(dtrs_client, *args, **kwargs):
before = time.time()
ret = func(dtrs_client, *args, **kwargs)
after = time.time()
if dtrs_client.statsd_client is not None:
try:
client_name = dtrs_client.client_name or "dtrs_client"
dtrs_client.statsd_client.timing('%s.%s.timing' % (client_name, func.__name__), (after - before) * 1000)
dtrs_client.statsd_client.incr('%s.%s.count' % (client_name, func.__name__))
except:
log.exception("Failed to submit metrics")
return ret
return call


class DTRSClient(object):

def __init__(self, dashi, topic=None):
def __init__(self, dashi, topic=None, statsd_cfg=None, client_name=None):
self.dashi = dashi
self.topic = topic or 'dtrs'

self.client_name = client_name
self.statsd_client = None
if statsd_cfg is not None:
try:
host = statsd_cfg["host"]
port = statsd_cfg["port"]
log.info("Setting up statsd client with host %s and port %d" % (host, port))
self.statsd_client = StatsClient(host, port)
except:
log.exception("Failed to set up statsd client")

@statsd
def add_dt(self, caller, dt_name, dt_definition):
return self.dashi.call(self.topic, 'add_dt', caller=caller,
dt_name=dt_name, dt_definition=dt_definition)

@statsd
def describe_dt(self, caller, dt_name):
return self.dashi.call(self.topic, 'describe_dt', caller=caller,
dt_name=dt_name)

@statsd
def list_dts(self, caller):
return self.dashi.call(self.topic, 'list_dts', caller=caller)

@statsd
def remove_dt(self, caller, dt_name):
return self.dashi.call(self.topic, 'remove_dt', caller=caller,
dt_name=dt_name)

@statsd
def update_dt(self, caller, dt_name, dt_definition):
return self.dashi.call(self.topic, 'update_dt', caller=caller,
dt_name=dt_name, dt_definition=dt_definition)

@statsd
def add_site(self, site_name, site_definition):
return self.dashi.call(self.topic, 'add_site', site_name=site_name,
site_definition=site_definition)

@statsd
def describe_site(self, site_name):
return self.dashi.call(self.topic, 'describe_site',
site_name=site_name)

@statsd
def list_sites(self):
return self.dashi.call(self.topic, 'list_sites')

@statsd
def remove_site(self, site_name):
return self.dashi.call(self.topic, 'remove_site', site_name=site_name)

@statsd
def update_site(self, site_name, site_definition):
return self.dashi.call(self.topic, 'update_site', site_name=site_name,
site_definition=site_definition)

@statsd
def add_credentials(self, caller, site_name, site_credentials):
return self.dashi.call(self.topic, 'add_credentials', caller=caller,
site_name=site_name,
site_credentials=site_credentials)

@statsd
def describe_credentials(self, caller, site_name):
return self.dashi.call(self.topic, 'describe_credentials',
caller=caller, site_name=site_name)

@statsd
def list_credentials(self, caller):
return self.dashi.call(self.topic, 'list_credentials', caller=caller)

@statsd
def remove_credentials(self, caller, site_name):
return self.dashi.call(self.topic, 'remove_credentials', caller=caller,
site_name=site_name)

@statsd
def update_credentials(self, caller, site_name, site_credentials):
return self.dashi.call(self.topic, 'update_credentials', caller=caller,
site_name=site_name,
site_credentials=site_credentials)

# Old DTRS methods - keeping the API unmodified for now

@statsd
def lookup(self, caller, dt_name, dtrs_request_node, vars=None):
try:
ret = self.dashi.call(self.topic, 'lookup', caller=caller,
Expand Down
8 changes: 5 additions & 3 deletions epu/dashiproc/epumanagement.py
Expand Up @@ -48,10 +48,12 @@ def __init__(self):
proc_name=self.proc_name)
self.store.initialize()

dtrs_client = DTRSClient(self.dashi)
statsd_cfg = self.CFG.get('statsd')

self.epumanagement = EPUManagement(self.CFG.epumanagement, SubscriberNotifier(self.dashi),
prov_client, ou_client, dtrs_client, store=self.store)
dtrs_client = DTRSClient(self.dashi, statsd_cfg=statsd_cfg, client_name=self.CFG.epumanagement.service_name)

self.epumanagement = EPUManagement(self.CFG.epumanagement, SubscriberNotifier(self.dashi), prov_client,
ou_client, dtrs_client, store=self.store, statsd_cfg=statsd_cfg)

# hack to inject epum reference for mock prov client
if isinstance(prov_client, MockProvisionerClient):
Expand Down
11 changes: 7 additions & 4 deletions epu/dashiproc/provisioner.py
Expand Up @@ -44,9 +44,12 @@ def __init__(self, *args, **kwargs):

self.dashi = bootstrap.dashi_connect(self.topic, self.CFG, self.amqp_uri)

statsd_cfg = kwargs.get('statsd')
statsd_cfg = statsd_cfg or self.CFG.get('statsd')

dtrs = kwargs.get('dtrs')
dtrs_topic = self.CFG.provisioner.dtrs_service_name
self.dtrs = dtrs or self._get_dtrs(dtrs_topic)
self.dtrs = dtrs or self._get_dtrs(dtrs_topic, statsd_cfg=statsd_cfg, client_name=self.topic)

contextualization_disabled = kwargs.get('contextualization_disabled')
if contextualization_disabled is None:
Expand All @@ -70,7 +73,7 @@ def __init__(self, *args, **kwargs):
core = core or self._get_core()

self.core = core(self.store, self.notifier, self.dtrs, context_client,
iaas_timeout=iaas_timeout)
iaas_timeout=iaas_timeout, statsd_cfg=statsd_cfg)

leader = kwargs.get('leader')
self.leader = leader or ProvisionerLeader(self.store, self.core,
Expand Down Expand Up @@ -218,9 +221,9 @@ def _get_core(self):
core = get_class(core_name)
return core

def _get_dtrs(self, dtrs_topic):
def _get_dtrs(self, dtrs_topic, statsd_cfg=None, client_name=None):

dtrs = DTRSClient(self.dashi, topic=dtrs_topic)
dtrs = DTRSClient(self.dashi, topic=dtrs_topic, statsd_cfg=statsd_cfg, client_name=client_name)
return dtrs


Expand Down
11 changes: 6 additions & 5 deletions epu/epumanagement/__init__.py
Expand Up @@ -24,8 +24,8 @@ class EPUManagement(object):
in test/dev situations to bypass the messaging layer altogether.
"""

def __init__(self, initial_conf, notifier, provisioner_client,
ouagent_client, dtrs_client, epum_client=None, store=None):
def __init__(self, initial_conf, notifier, provisioner_client, ouagent_client, dtrs_client, epum_client=None,
store=None, statsd_cfg=None):
"""Given a configuration, instantiate all EPUM roles and objects
INITIAL_CONF dict:
Expand All @@ -46,6 +46,7 @@ def __init__(self, initial_conf, notifier, provisioner_client,
@param dtrs_client DTRSClient
@param epum_client EPUManagement client (See clients.py). If None, uses self (in-memory).
@param store EPUMStore implementation, or None
@param statsd_cfg statsd configuration dict, or None
"""

self.initialized = False
Expand Down Expand Up @@ -90,9 +91,9 @@ def __init__(self, initial_conf, notifier, provisioner_client,
# handles being available in the election.
decider_loop_interval = initial_conf.get(EPUM_CONF_DECIDER_LOOP_INTERVAL,
EPUM_DECIDER_DEFAULT_LOOP_INTERVAL)
self.decider = EPUMDecider(self.epum_store, self.domain_subscribers,
provisioner_client, epum_client, dtrs_client, disable_loop=self._external_decide_mode,
base_provisioner_vars=base_provisioner_vars, loop_interval=decider_loop_interval)
self.decider = EPUMDecider(self.epum_store, self.domain_subscribers, provisioner_client, epum_client,
dtrs_client, disable_loop=self._external_decide_mode, base_provisioner_vars=base_provisioner_vars,
loop_interval=decider_loop_interval, statsd_cfg=statsd_cfg)

# The instance of the EPUManagementService process that hosts a particular EPUMDoctor instance
# might not be the elected leader. When it is the elected leader, this EPUMDoctor handles that
Expand Down
23 changes: 22 additions & 1 deletion epu/epumanagement/decider.py
Expand Up @@ -5,6 +5,11 @@
from copy import deepcopy
from datetime import datetime, timedelta

try:
from statsd import StatsClient
except ImportError:
StatsClient = None

from epu import cei_events
from epu.epumanagement.conf import * # noqa
from epu.epumanagement.forengine import Control
Expand Down Expand Up @@ -44,7 +49,7 @@ class EPUMDecider(object):
"""

def __init__(self, epum_store, subscribers, provisioner_client, epum_client, dtrs_client,
disable_loop=False, base_provisioner_vars=None, loop_interval=5.0):
disable_loop=False, base_provisioner_vars=None, loop_interval=5.0, statsd_cfg=None):
"""
@param epum_store State abstraction for all domains
@type epum_store EPUMStore
Expand Down Expand Up @@ -79,6 +84,16 @@ def __init__(self, epum_store, subscribers, provisioner_client, epum_client, dtr
# The instances of Control (stateful) that are passed to each Engine to get info and execute cmds
self.controls = {}

self.statsd_client = None
if statsd_cfg is not None:
try:
host = statsd_cfg["host"]
port = statsd_cfg["port"]
log.info("Setting up statsd client with host %s and port %d" % (host, port))
self.statsd_client = StatsClient(host, port)
except:
log.exception("Failed to set up statsd client")

def recover(self):
"""Called whenever the whole EPUManagement instance is instantiated.
"""
Expand Down Expand Up @@ -173,6 +188,12 @@ def _loop_top(self):
log.error("Error creating engine '%s' for user '%s': %s",
domain.domain_id, domain.owner, str(e), exc_info=True)

if self.statsd_client is not None:
try:
self.statsd_client.gauge("active_domains", len(active_domains))
except:
log.exception("Failed to submit metrics")

for key in self.engines:
# Perhaps in the meantime, the leader connection failed, bail early
if not self.is_leader:
Expand Down
47 changes: 45 additions & 2 deletions epu/provisioner/core.py
Expand Up @@ -15,7 +15,10 @@
from libcloud.compute.base import Node as LibcloudNode
from libcloud.compute.base import NodeImage as LibcloudNodeImage
from libcloud.compute.base import NodeSize as LibcloudNodeSize

try:
from statsd import StatsClient
except ImportError:
StatsClient = None

from epu.provisioner.ctx import ContextClient, BrokerError, BrokerAuthError,\
ContextNotFoundError, NimbusClusterDocument, ValidationError
Expand Down Expand Up @@ -60,7 +63,7 @@ class ProvisionerCore(object):
# Maximum time that any IaaS query can take before throwing a timeout exception
_IAAS_DEFAULT_TIMEOUT = 60

def __init__(self, store, notifier, dtrs, context, logger=None, iaas_timeout=None):
def __init__(self, store, notifier, dtrs, context, logger=None, iaas_timeout=None, statsd_cfg=None):
"""
@type store: ProvisionerStore
Expand All @@ -81,6 +84,16 @@ def __init__(self, store, notifier, dtrs, context, logger=None, iaas_timeout=Non
else:
self.iaas_timeout = self._IAAS_DEFAULT_TIMEOUT

self.statsd_client = None
if statsd_cfg is not None:
try:
host = statsd_cfg["host"]
port = statsd_cfg["port"]
log.info("Setting up statsd client with host %s and port %d" % (host, port))
self.statsd_client = StatsClient(host, port)
except:
log.exception("Failed to set up statsd client")

if not context:
log.warn("No context client provided. Contextualization disabled.")

Expand Down Expand Up @@ -437,9 +450,17 @@ def _launch_one_group(self, spec, nodes, caller=None):
try:
driver = SiteDriver(site_description, credentials_description, timeout=self.iaas_timeout)
try:
before = time.time()
iaas_nodes = self._launch_node_spec(
spec, driver.driver,
ex_clienttoken=client_token)
after = time.time()
if self.statsd_client is not None:
try:
self.statsd_client.timing('provisioner.run_instances.timing', (after - before) * 1000)
self.statsd_client.incr('provisioner.run_instances.count')
except:
log.exception("Failed to submit metrics")
except timeout, t:
log.exception('Timeout when contacting IaaS to launch nodes: ' + str(t))

Expand Down Expand Up @@ -610,6 +631,20 @@ def query_nodes(self, concurrency=1):
if concurrency > 1:
pool.join()

if self.statsd_client is not None:
try:
nodes = self.store.get_nodes(max_state=states.TERMINATING)
self.statsd_client.gauge("instances", len(nodes))
pending_nodes = self.store.get_nodes(state=states.PENDING)
self.statsd_client.gauge("pending_instances", len(pending_nodes))
running_nodes = self.store.get_nodes(min_state=states.STARTED, max_state=states.RUNNING)
self.statsd_client.gauge("running_instances", len(running_nodes))
terminating_nodes = self.store.get_nodes(state=states.TERMINATING)
self.statsd_client.gauge("terminating_instances", len(terminating_nodes))
except:
log.exception("Failed to submit metrics")


def query_one_site(self, site, nodes, caller=None):
with EpuLoggerThreadSpecific(user=caller):
return self._query_one_site(site, nodes, caller=caller)
Expand All @@ -633,7 +668,15 @@ def _query_one_site(self, site, nodes, caller=None):
site_driver = SiteDriver(site_description, credentials_description, timeout=self.iaas_timeout)

try:
before = time.time()
libcloud_nodes = site_driver.driver.list_nodes()
after = time.time()
if self.statsd_client is not None:
try:
self.statsd_client.timing('provisioner.list_instances.timing', (after - before) * 1000)
self.statsd_client.incr('provisioner.list_instances.count')
except:
log.exception("Failed to submit metrics")
except timeout:
log.exception('Timeout when querying site "%s"', site)
raise
Expand Down

0 comments on commit 792b83e

Please sign in to comment.