Skip to content
This repository has been archived by the owner on May 30, 2020. It is now read-only.

Commit

Permalink
datadogify pypi
Browse files Browse the repository at this point in the history
  • Loading branch information
ewdurbin committed Jul 22, 2017
1 parent 9310a79 commit 088a41b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 0 deletions.
10 changes: 10 additions & 0 deletions config.py
Expand Up @@ -174,6 +174,16 @@ def __init__(self, configfile):
self.google_consumer_id = c.get("google", "client_id")
self.google_consumer_secret = c.get("google", "client_secret")

if c.has_option('datadog', 'dogstatsd_port'):
self.datadog_dogstatsd_port = c.getint('datadog', 'dogstatsd_port')
else:
self.datadog_dogstatsd_port = 8125

if c.has_option('datadog', 'tags'):
self.datadog_tags = c.get('datadog', 'tags').split(',')
else:
self.datadog_tags = []


def make_https(self):
if self.url.startswith("http:"):
Expand Down
11 changes: 11 additions & 0 deletions dogadapter.py
@@ -0,0 +1,11 @@
import os

from datadog import initialize
from datadog.dogstatsd import DogStatsd

import config

root = os.path.dirname(os.path.abspath(__file__))
conf = config.Config(os.path.join(root, "config.ini"))

dogstatsd = DogStatsd(host='localhost', port=conf.datadog_dogstatsd_port, constant_tags=conf.datadog_tags)
8 changes: 8 additions & 0 deletions fncache.py
Expand Up @@ -19,6 +19,8 @@
from perfmetrics import statsd_client
from perfmetrics import set_statsd_client

from dogadapter import dogstatsd

import config

root = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(self, conn, expires=86400, capacity=5000, prefix="lru", tag=None, a
self.kwarg_name = kwarg_name
self.slice = slice_obj
self.statsd = statsd_client()
self.dogstatsd = dogstatsd

def format_key(self, func_name, tag):
if tag is not None:
Expand All @@ -61,6 +64,7 @@ def format_key(self, func_name, tag):

def eject(self, func_name):
self.statsd.incr('rpc-lru.eject')
self.dogstatsd.increment('xmlrpc.lru.eject')
count = min((self.capacity / 10) or 1, 1000)
cache_keys = self.format_key(func_name, '*')
if self.conn.zcard(cache_keys) >= self.capacity:
Expand All @@ -74,13 +78,16 @@ def get(self, func_name, key, tag):
value = self.conn.hget(self.format_key(func_name, tag), key)
if value:
self.statsd.incr('rpc-lru.hit')
self.dogstatsd.increment('xmlrpc.lru.hit')
value = json.loads(value)
else:
self.statsd.incr('rpc-lru.miss')
self.dogstatsd.increment('xmlrpc.lru.miss')
return value

def add(self, func_name, key, value, tag):
self.statsd.incr('rpc-lru.add')
self.dogstatsd.increment('xmlrpc.lru.add')
self.eject(func_name)
pipeline = self.conn.pipeline()
pipeline.hset(self.format_key(func_name, tag), key, json.dumps(value))
Expand All @@ -90,6 +97,7 @@ def add(self, func_name, key, value, tag):

def purge(self, tag):
self.statsd.incr('rpc-lru.purge')
self.dogstatsd.incr('xmlrpc.lru.purge')
keys = self.conn.scan_iter(":".join([self.prefix, tag, '*']))
pipeline = self.conn.pipeline()
for key in keys:
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Expand Up @@ -3,6 +3,7 @@ argon2-cffi
bcrypt
bleach
boto
datadog
defusedxml
docutils
fs
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Expand Up @@ -14,6 +14,8 @@ cffi==1.6.0 # via argon2-cffi, bcrypt, cryptography
click==6.6 # via rq
contextlib2==0.5.3 # via raven
cryptography==1.3.2 # via pyopenssl
datadog==0.16.0
decorator==4.1.1 # via datadog
defusedxml==0.4.1
docutils==0.12
enum34==1.1.6 # via argon2-cffi, cryptography
Expand Down Expand Up @@ -42,6 +44,7 @@ redis==2.10.5
requests[security]==2.10.0
rfc3986==0.3.1
rq==0.6.0
simplejson==3.11.1 # via datadog
six==1.10.0
times==0.7
transaction==1.5.0 # via zope.traversing
Expand Down
32 changes: 32 additions & 0 deletions rpc.py
Expand Up @@ -25,6 +25,8 @@
from store import dependency
from fncache import RedisLru

from dogadapter import dogstatsd

root = os.path.dirname(os.path.abspath(__file__))
conf = config.Config(os.path.join(root, "config.ini"))

Expand Down Expand Up @@ -56,6 +58,7 @@ def log_xmlrpc_request(remote_addr, user_agent, data):
try:
with open(conf.xmlrpc_request_log_file, 'a') as f:
params, method = xmlrpclib.loads(data)
dogstatsd.increment('xmlrpc.request', tags=['method:{}'.format(method)])
record = json.dumps({
'timestamp': datetime.datetime.utcnow().isoformat(),
'remote_addr': remote_addr,
Expand All @@ -74,6 +77,8 @@ def log_xmlrpc_response(remote_addr, user_agent, data, response_size):
try:
with open(conf.xmlrpc_request_log_file, 'a') as f:
params, method = xmlrpclib.loads(data)
dogstatsd.increment('xmlrpc.response', tags=['method:{}'.format(method)])
dogstatsd.histogram('xmlrpc.response.size', response_size, tags=['method:{}'.format(method)])
record = json.dumps({
'timestamp': datetime.datetime.utcnow().isoformat(),
'remote_addr': remote_addr,
Expand All @@ -92,6 +97,7 @@ def log_xmlrpc_throttle(remote_addr, enforced):
if conf.xmlrpc_request_log_file:
try:
with open(conf.xmlrpc_request_log_file, 'a') as f:
dogstatsd.increment('xmlrpc.throttled', tags=['remote_addr:{}'.format(remote_addr), 'enforced:{}'.format(enforced)])
record = json.dumps({
'timestamp': datetime.datetime.utcnow().isoformat(),
'remote_addr': remote_addr,
Expand All @@ -108,25 +114,30 @@ def throttle_concurrent(remote_addr):
throttled = False
try:
if xmlrpc_redis:
dogstatsd.increment('xmlrpc.rate-limit.invoke')
statsd_reporter.incr('rpc-rl.invoke')
pipeline = xmlrpc_redis.pipeline()
pipeline.incr(remote_addr)
pipeline.expire(remote_addr, 60)
current = pipeline.execute()[0]
if current >= conf.xmlrpc_concurrent_requests:
dogstatsd.increment('xmlrpc.rate-limit.over')
statsd_reporter.incr('rpc-rl.over')
log_xmlrpc_throttle(remote_addr, conf.xmlrpc_enforce)
if conf.xmlrpc_enforce:
dogstatsd.increment('xmlrpc.rate-limit.enforce')
statsd_reporter.incr('rpc-rl.enforce')
throttled = True
except Exception:
dogstatsd.increment('xmlrpc.rate-limit.context.before.error')
statsd_reporter.incr('rpc-rl.context.before.error')
pass
yield throttled
try:
if xmlrpc_redis:
xmlrpc_redis.decr(remote_addr)
except Exception:
dogstatsd.increment('xmlrpc.rate-limit.context.after.error')
statsd_reporter.incr('rpc-rl.context.after.error')
pass

Expand Down Expand Up @@ -158,6 +169,7 @@ def __init__(self):
self.register_introspection_functions()
self.register_multicall_functions()

@dogstatsd.timed('xmlrpc.call')
@metricmethod
def __call__(self, webui_obj):
webui_obj.handler.send_response(200, 'OK')
Expand Down Expand Up @@ -196,52 +208,61 @@ def __call__(self, webui_obj):
log_xmlrpc_response(webui_obj.remote_addr, user_agent, data, len(response))
webui_obj.handler.wfile.write(response)

@dogstatsd.timed('xmlrpc.dispatch')
@metricmethod
def _dispatch(self, method, params):
if not method.startswith('system.'):
# Add store to all of our own methods
params = (self.store,)+tuple(params)
return SimpleXMLRPCDispatcher._dispatch(self, method, params)

@dogstatsd.timed('xmlrpc.multicall')
@metricmethod
def system_multicall(self, call_list):
if len(call_list) > 100:
raise Fault, "multicall too large"
return SimpleXMLRPCDispatcher.system_multicall(self, call_list)

@dogstatsd.timed('xmlrpc.function', tags=['function:package_hosting_mode'])
@metric
def package_hosting_mode(store, package_name):
"""Returns the hosting mode for a given package."""
return store.get_package_hosting_mode(package_name)

@dogstatsd.timed('xmlrpc.function', tags=['function:release_downloads'])
@metric
@cache_by_pkg
def release_downloads(store, package_name, version):
'''Return download count for given release.'''
return store.get_release_downloads(package_name, version)

@dogstatsd.timed('xmlrpc.function', tags=['function:package_roles'])
@metric
@cache_by_pkg
def package_roles(store, package_name):
'''Return associated users and package roles.'''
result = store.get_package_roles(package_name)
return [tuple(fields.values())for fields in result]

@dogstatsd.timed('xmlrpc.function', tags=['function:user_packages'])
@metric
def user_packages(store, user):
'''Return associated packages for user.'''
result = store.get_user_packages(user)
return [tuple(fields.values()) for fields in result]

@dogstatsd.timed('xmlrpc.function', tags=['function:list_packages'])
@metric
def list_packages(store):
result = store.get_packages()
return [row['name'] for row in result]

@dogstatsd.timed('xmlrpc.function', tags=['function:list_packages_with_serial'])
@metric
def list_packages_with_serial(store):
return store.get_packages_with_serial()

@dogstatsd.timed('xmlrpc.function', tags=['function:package_releases'])
@metric
@cache_by_pkg
def package_releases(store, package_name, show_hidden=False):
Expand All @@ -252,6 +273,7 @@ def package_releases(store, package_name, show_hidden=False):
result = store.get_package_releases(package_name, hidden=hidden)
return [row['version'] for row in result]

@dogstatsd.timed('xmlrpc.function', tags=['function:release_urls'])
@metric
def release_urls(store, package_name, version):
result = []
Expand All @@ -268,6 +290,7 @@ def release_urls(store, package_name, version):
package_urls = release_urls # "deprecated"


@dogstatsd.timed('xmlrpc.function', tags=['function:release_data'])
@metric
@cache_by_pkg
def release_data(store, package_name, version):
Expand Down Expand Up @@ -295,11 +318,13 @@ def release_data(store, package_name, version):
return info
package_data = release_data # "deprecated"

@dogstatsd.timed('xmlrpc.function', tags=['function:search'])
@metric
def search(store, spec, operator='and'):
spec['_pypi_hidden'] = 'FALSE'
return [row.as_dict() for row in store.search_packages(spec, operator)]

@dogstatsd.timed('xmlrpc.function', tags=['function:browse'])
@metric
def browse(store, categories):
if not isinstance(categories, list):
Expand All @@ -313,17 +338,20 @@ def browse(store, categories):
packages, tally = store.browse(ids)
return [(name, version) for name, version, desc in packages]

@dogstatsd.timed('xmlrpc.function', tags=['function:updated_releases'])
@metric
def updated_releases(store, since):
result = store.updated_releases(since)
return [(row['name'], row['version']) for row in result]


@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_last_serial'])
@metric
def changelog_last_serial(store):
"return the last changelog event's serial"
return store.changelog_last_serial()

@dogstatsd.timed('xmlrpc.function', tags=['function:changelog'])
@metric
def changelog(store, since, with_ids=False):
result = []
Expand All @@ -339,6 +367,7 @@ def changelog(store, since, with_ids=False):
result.append(t)
return result

@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_since_serial'])
@metric
def changelog_since_serial(store, since_serial):
'return the changes since the nominated event serial (id)'
Expand All @@ -353,10 +382,12 @@ def changelog_since_serial(store, since_serial):
row['action'], row['id']))
return result

@dogstatsd.timed('xmlrpc.function', tags=['function:changed_packages'])
@metric
def changed_packages(store, since):
return store.changed_packages(since)

@dogstatsd.timed('xmlrpc.function', tags=['function:post_cheesecake_for_release'])
@metric
def post_cheesecake_for_release(store, name, version, score_data, password):
if password != store.config.cheesecake_password:
Expand All @@ -366,6 +397,7 @@ def post_cheesecake_for_release(store, name, version, score_data, password):
store.commit()


@dogstatsd.timed('xmlrpc.function', tags=['function:top_packages'])
@metric
def top_packages(store, num=None):
return store.top_packages(num=num)
Expand Down
7 changes: 7 additions & 0 deletions store.py
Expand Up @@ -47,6 +47,8 @@ class LockedException(Exception):

from perfmetrics import statsd_client_from_uri

from dogadapter import dogstatsd

def enumerate(sequence):
return [(i, sequence[i]) for i in range(len(sequence))]

Expand Down Expand Up @@ -300,6 +302,7 @@ def __init__(self, config, queue=None, redis=None, package_bucket=None):

self.statsd_uri = "statsd://127.0.0.1:8125?prefix=%s" % (config.database_name)
self.statsd_reporter = statsd_client_from_uri(self.statsd_uri)
self.dogstatsd = dogstatsd

self.package_bucket = package_bucket

Expand Down Expand Up @@ -843,9 +846,11 @@ def search_packages(self, spec, operator='and'):
data = r.json()
except requests.exceptions.Timeout:
self.statsd_reporter.incr('store.search-packages.timeout')
self.dogstatsd.increment('store.search-packages.timeout')
data = {}
end_time = int(round(time.time() * 1000))
self.statsd_reporter.timing('store.search-packages', end_time - start_time)
self.dogstatsd.timing('store.search-packages', end_time - start_time)
results = []
if 'hits' in data.keys():
results = [_format_es_fields(r) for r in data['hits']['hits'] if r['fields'].get('_pypi_hidden', [False])[0] == hidden]
Expand Down Expand Up @@ -1878,9 +1883,11 @@ def browse(self, selected_classifiers):
data = r.json()
except requests.exceptions.Timeout:
self.statsd_reporter.incr('store.browse.timeout')
self.dogstatsd.increment('store.browse.timeout')
data = {}
end_time = int(round(time.time() * 1000))
self.statsd_reporter.timing('store.browse', end_time - start_time)
self.dogstatsd.timing('store.browse', end_time - start_time)

releases = []
tally = []
Expand Down

0 comments on commit 088a41b

Please sign in to comment.