From 088a41b3225a0ed913946ce87b91a6ea1dcb1f62 Mon Sep 17 00:00:00 2001 From: "Ernest W. Durbin III" Date: Fri, 21 Jul 2017 18:49:52 -0400 Subject: [PATCH] datadogify pypi --- config.py | 10 ++++++++++ dogadapter.py | 11 +++++++++++ fncache.py | 8 ++++++++ requirements.in | 1 + requirements.txt | 3 +++ rpc.py | 32 ++++++++++++++++++++++++++++++++ store.py | 7 +++++++ webui.py | 14 ++++++++++++++ 8 files changed, 86 insertions(+) create mode 100644 dogadapter.py diff --git a/config.py b/config.py index 300e22a7..ac049e83 100755 --- a/config.py +++ b/config.py @@ -174,6 +174,16 @@ def __init__(self, configfile): self.google_consumer_id = c.get("google", "client_id") self.google_consumer_secret = c.get("google", "client_secret") + if c.has_option('datadog', 'dogstatsd_port'): + self.datadog_dogstatsd_port = c.getint('datadog', 'dogstatsd_port') + else: + self.datadog_dogstatsd_port = 8125 + + if c.has_option('datadog', 'tags'): + self.datadog_tags = c.get('datadog', 'tags').split(',') + else: + self.datadog_tags = [] + def make_https(self): if self.url.startswith("http:"): diff --git a/dogadapter.py b/dogadapter.py new file mode 100644 index 00000000..9b733774 --- /dev/null +++ b/dogadapter.py @@ -0,0 +1,11 @@ +import os + +from datadog import initialize +from datadog.dogstatsd import DogStatsd + +import config + +root = os.path.dirname(os.path.abspath(__file__)) +conf = config.Config(os.path.join(root, "config.ini")) + +dogstatsd = DogStatsd(host='localhost', port=conf.datadog_dogstatsd_port, constant_tags=conf.datadog_tags) diff --git a/fncache.py b/fncache.py index 65da0f9c..b4f1dcbf 100644 --- a/fncache.py +++ b/fncache.py @@ -19,6 +19,8 @@ from perfmetrics import statsd_client from perfmetrics import set_statsd_client +from dogadapter import dogstatsd + import config root = os.path.dirname(os.path.abspath(__file__)) @@ -53,6 +55,7 @@ def __init__(self, conn, expires=86400, capacity=5000, prefix="lru", tag=None, a self.kwarg_name = kwarg_name self.slice = slice_obj self.statsd = statsd_client() + self.dogstatsd = dogstatsd def format_key(self, func_name, tag): if tag is not None: @@ -61,6 +64,7 @@ def format_key(self, func_name, tag): def eject(self, func_name): self.statsd.incr('rpc-lru.eject') + self.dogstatsd.increment('xmlrpc.lru.eject') count = min((self.capacity / 10) or 1, 1000) cache_keys = self.format_key(func_name, '*') if self.conn.zcard(cache_keys) >= self.capacity: @@ -74,13 +78,16 @@ def get(self, func_name, key, tag): value = self.conn.hget(self.format_key(func_name, tag), key) if value: self.statsd.incr('rpc-lru.hit') + self.dogstatsd.increment('xmlrpc.lru.hit') value = json.loads(value) else: self.statsd.incr('rpc-lru.miss') + self.dogstatsd.increment('xmlrpc.lru.miss') return value def add(self, func_name, key, value, tag): self.statsd.incr('rpc-lru.add') + self.dogstatsd.increment('xmlrpc.lru.add') self.eject(func_name) pipeline = self.conn.pipeline() pipeline.hset(self.format_key(func_name, tag), key, json.dumps(value)) @@ -90,6 +97,7 @@ def add(self, func_name, key, value, tag): def purge(self, tag): self.statsd.incr('rpc-lru.purge') + self.dogstatsd.incr('xmlrpc.lru.purge') keys = self.conn.scan_iter(":".join([self.prefix, tag, '*'])) pipeline = self.conn.pipeline() for key in keys: diff --git a/requirements.in b/requirements.in index 98cce32c..15b7d0b9 100644 --- a/requirements.in +++ b/requirements.in @@ -3,6 +3,7 @@ argon2-cffi bcrypt bleach boto +datadog defusedxml docutils fs diff --git a/requirements.txt b/requirements.txt index b7a067e5..1cc9292d 100755 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,8 @@ cffi==1.6.0 # via argon2-cffi, bcrypt, cryptography click==6.6 # via rq contextlib2==0.5.3 # via raven cryptography==1.3.2 # via pyopenssl +datadog==0.16.0 +decorator==4.1.1 # via datadog defusedxml==0.4.1 docutils==0.12 enum34==1.1.6 # via argon2-cffi, cryptography @@ -42,6 +44,7 @@ redis==2.10.5 requests[security]==2.10.0 rfc3986==0.3.1 rq==0.6.0 +simplejson==3.11.1 # via datadog six==1.10.0 times==0.7 transaction==1.5.0 # via zope.traversing diff --git a/rpc.py b/rpc.py index 83c94413..78f8d6c2 100755 --- a/rpc.py +++ b/rpc.py @@ -25,6 +25,8 @@ from store import dependency from fncache import RedisLru +from dogadapter import dogstatsd + root = os.path.dirname(os.path.abspath(__file__)) conf = config.Config(os.path.join(root, "config.ini")) @@ -56,6 +58,7 @@ def log_xmlrpc_request(remote_addr, user_agent, data): try: with open(conf.xmlrpc_request_log_file, 'a') as f: params, method = xmlrpclib.loads(data) + dogstatsd.increment('xmlrpc.request', tags=['method:{}'.format(method)]) record = json.dumps({ 'timestamp': datetime.datetime.utcnow().isoformat(), 'remote_addr': remote_addr, @@ -74,6 +77,8 @@ def log_xmlrpc_response(remote_addr, user_agent, data, response_size): try: with open(conf.xmlrpc_request_log_file, 'a') as f: params, method = xmlrpclib.loads(data) + dogstatsd.increment('xmlrpc.response', tags=['method:{}'.format(method)]) + dogstatsd.histogram('xmlrpc.response.size', response_size, tags=['method:{}'.format(method)]) record = json.dumps({ 'timestamp': datetime.datetime.utcnow().isoformat(), 'remote_addr': remote_addr, @@ -92,6 +97,7 @@ def log_xmlrpc_throttle(remote_addr, enforced): if conf.xmlrpc_request_log_file: try: with open(conf.xmlrpc_request_log_file, 'a') as f: + dogstatsd.increment('xmlrpc.throttled', tags=['remote_addr:{}'.format(remote_addr), 'enforced:{}'.format(enforced)]) record = json.dumps({ 'timestamp': datetime.datetime.utcnow().isoformat(), 'remote_addr': remote_addr, @@ -108,18 +114,22 @@ def throttle_concurrent(remote_addr): throttled = False try: if xmlrpc_redis: + dogstatsd.increment('xmlrpc.rate-limit.invoke') statsd_reporter.incr('rpc-rl.invoke') pipeline = xmlrpc_redis.pipeline() pipeline.incr(remote_addr) pipeline.expire(remote_addr, 60) current = pipeline.execute()[0] if current >= conf.xmlrpc_concurrent_requests: + dogstatsd.increment('xmlrpc.rate-limit.over') statsd_reporter.incr('rpc-rl.over') log_xmlrpc_throttle(remote_addr, conf.xmlrpc_enforce) if conf.xmlrpc_enforce: + dogstatsd.increment('xmlrpc.rate-limit.enforce') statsd_reporter.incr('rpc-rl.enforce') throttled = True except Exception: + dogstatsd.increment('xmlrpc.rate-limit.context.before.error') statsd_reporter.incr('rpc-rl.context.before.error') pass yield throttled @@ -127,6 +137,7 @@ def throttle_concurrent(remote_addr): if xmlrpc_redis: xmlrpc_redis.decr(remote_addr) except Exception: + dogstatsd.increment('xmlrpc.rate-limit.context.after.error') statsd_reporter.incr('rpc-rl.context.after.error') pass @@ -158,6 +169,7 @@ def __init__(self): self.register_introspection_functions() self.register_multicall_functions() + @dogstatsd.timed('xmlrpc.call') @metricmethod def __call__(self, webui_obj): webui_obj.handler.send_response(200, 'OK') @@ -196,6 +208,7 @@ def __call__(self, webui_obj): log_xmlrpc_response(webui_obj.remote_addr, user_agent, data, len(response)) webui_obj.handler.wfile.write(response) + @dogstatsd.timed('xmlrpc.dispatch') @metricmethod def _dispatch(self, method, params): if not method.startswith('system.'): @@ -203,23 +216,27 @@ def _dispatch(self, method, params): params = (self.store,)+tuple(params) return SimpleXMLRPCDispatcher._dispatch(self, method, params) + @dogstatsd.timed('xmlrpc.multicall') @metricmethod def system_multicall(self, call_list): if len(call_list) > 100: raise Fault, "multicall too large" return SimpleXMLRPCDispatcher.system_multicall(self, call_list) +@dogstatsd.timed('xmlrpc.function', tags=['function:package_hosting_mode']) @metric def package_hosting_mode(store, package_name): """Returns the hosting mode for a given package.""" return store.get_package_hosting_mode(package_name) +@dogstatsd.timed('xmlrpc.function', tags=['function:release_downloads']) @metric @cache_by_pkg def release_downloads(store, package_name, version): '''Return download count for given release.''' return store.get_release_downloads(package_name, version) +@dogstatsd.timed('xmlrpc.function', tags=['function:package_roles']) @metric @cache_by_pkg def package_roles(store, package_name): @@ -227,21 +244,25 @@ def package_roles(store, package_name): result = store.get_package_roles(package_name) return [tuple(fields.values())for fields in result] +@dogstatsd.timed('xmlrpc.function', tags=['function:user_packages']) @metric def user_packages(store, user): '''Return associated packages for user.''' result = store.get_user_packages(user) return [tuple(fields.values()) for fields in result] +@dogstatsd.timed('xmlrpc.function', tags=['function:list_packages']) @metric def list_packages(store): result = store.get_packages() return [row['name'] for row in result] +@dogstatsd.timed('xmlrpc.function', tags=['function:list_packages_with_serial']) @metric def list_packages_with_serial(store): return store.get_packages_with_serial() +@dogstatsd.timed('xmlrpc.function', tags=['function:package_releases']) @metric @cache_by_pkg def package_releases(store, package_name, show_hidden=False): @@ -252,6 +273,7 @@ def package_releases(store, package_name, show_hidden=False): result = store.get_package_releases(package_name, hidden=hidden) return [row['version'] for row in result] +@dogstatsd.timed('xmlrpc.function', tags=['function:release_urls']) @metric def release_urls(store, package_name, version): result = [] @@ -268,6 +290,7 @@ def release_urls(store, package_name, version): package_urls = release_urls # "deprecated" +@dogstatsd.timed('xmlrpc.function', tags=['function:release_data']) @metric @cache_by_pkg def release_data(store, package_name, version): @@ -295,11 +318,13 @@ def release_data(store, package_name, version): return info package_data = release_data # "deprecated" +@dogstatsd.timed('xmlrpc.function', tags=['function:search']) @metric def search(store, spec, operator='and'): spec['_pypi_hidden'] = 'FALSE' return [row.as_dict() for row in store.search_packages(spec, operator)] +@dogstatsd.timed('xmlrpc.function', tags=['function:browse']) @metric def browse(store, categories): if not isinstance(categories, list): @@ -313,17 +338,20 @@ def browse(store, categories): packages, tally = store.browse(ids) return [(name, version) for name, version, desc in packages] +@dogstatsd.timed('xmlrpc.function', tags=['function:updated_releases']) @metric def updated_releases(store, since): result = store.updated_releases(since) return [(row['name'], row['version']) for row in result] +@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_last_serial']) @metric def changelog_last_serial(store): "return the last changelog event's serial" return store.changelog_last_serial() +@dogstatsd.timed('xmlrpc.function', tags=['function:changelog']) @metric def changelog(store, since, with_ids=False): result = [] @@ -339,6 +367,7 @@ def changelog(store, since, with_ids=False): result.append(t) return result +@dogstatsd.timed('xmlrpc.function', tags=['function:changelog_since_serial']) @metric def changelog_since_serial(store, since_serial): 'return the changes since the nominated event serial (id)' @@ -353,10 +382,12 @@ def changelog_since_serial(store, since_serial): row['action'], row['id'])) return result +@dogstatsd.timed('xmlrpc.function', tags=['function:changed_packages']) @metric def changed_packages(store, since): return store.changed_packages(since) +@dogstatsd.timed('xmlrpc.function', tags=['function:post_cheesecake_for_release']) @metric def post_cheesecake_for_release(store, name, version, score_data, password): if password != store.config.cheesecake_password: @@ -366,6 +397,7 @@ def post_cheesecake_for_release(store, name, version, score_data, password): store.commit() +@dogstatsd.timed('xmlrpc.function', tags=['function:top_packages']) @metric def top_packages(store, num=None): return store.top_packages(num=num) diff --git a/store.py b/store.py index 822acb6f..3bf23b3e 100755 --- a/store.py +++ b/store.py @@ -47,6 +47,8 @@ class LockedException(Exception): from perfmetrics import statsd_client_from_uri +from dogadapter import dogstatsd + def enumerate(sequence): return [(i, sequence[i]) for i in range(len(sequence))] @@ -300,6 +302,7 @@ def __init__(self, config, queue=None, redis=None, package_bucket=None): self.statsd_uri = "statsd://127.0.0.1:8125?prefix=%s" % (config.database_name) self.statsd_reporter = statsd_client_from_uri(self.statsd_uri) + self.dogstatsd = dogstatsd self.package_bucket = package_bucket @@ -843,9 +846,11 @@ def search_packages(self, spec, operator='and'): data = r.json() except requests.exceptions.Timeout: self.statsd_reporter.incr('store.search-packages.timeout') + self.dogstatsd.increment('store.search-packages.timeout') data = {} end_time = int(round(time.time() * 1000)) self.statsd_reporter.timing('store.search-packages', end_time - start_time) + self.dogstatsd.timing('store.search-packages', end_time - start_time) results = [] if 'hits' in data.keys(): results = [_format_es_fields(r) for r in data['hits']['hits'] if r['fields'].get('_pypi_hidden', [False])[0] == hidden] @@ -1878,9 +1883,11 @@ def browse(self, selected_classifiers): data = r.json() except requests.exceptions.Timeout: self.statsd_reporter.incr('store.browse.timeout') + self.dogstatsd.increment('store.browse.timeout') data = {} end_time = int(round(time.time() * 1000)) self.statsd_reporter.timing('store.browse', end_time - start_time) + self.dogstatsd.timing('store.browse', end_time - start_time) releases = [] tally = [] diff --git a/webui.py b/webui.py index aadf8e7d..3e585ca2 100644 --- a/webui.py +++ b/webui.py @@ -59,6 +59,8 @@ class OperationalError(Exception): from perfmetrics import statsd_client from perfmetrics import set_statsd_client +from dogadapter import dogstatsd + from constants import DOMAIN_BLACKLIST # Authomatic @@ -477,6 +479,7 @@ def run(self): package_bucket=self.package_bucket, ) self.statsd = statsd_client() + self.dogstatsd = dogstatsd try: try: self.store.get_cursor() # make sure we can connect @@ -923,6 +926,7 @@ def _handle_basic_auth(self, auth): self._check_credentials(username, password) self.statsd.incr('password_authentication.basic_auth') + self.dogstatsd.increment('password_authentication.basic_auth') def login_form(self): if self.env['REQUEST_METHOD'] == "POST": @@ -945,6 +949,7 @@ def login_form(self): raise BlockedIP self.statsd.incr('password_authentication.login_form') + self.dogstatsd.increment('password_authentication.login_form') self.usercookie = self.store.create_cookie(self.username) self.store.get_token(self.username) @@ -1484,6 +1489,7 @@ def openid_login(self): self.store.get_token(self.username) self.store.commit() self.statsd.incr('openid.client.login') + self.dogstatsd.increment('openid.client.login') self.home() else: return self.fail('OpenID: No associated user for {0}'.format(result_openid_id)) @@ -1522,6 +1528,7 @@ def openid_claim(self): self.store.associate_openid(self.username, result_openid_id) self.store.commit() self.statsd.incr('openid.client.claim') + self.dogstatsd.increment('openid.client.claim') self.home() self.handler.end_headers() @@ -3254,6 +3261,7 @@ def openid_discovery(self): ''' % (self.config.url+'?:action=openid_endpoint') self.statsd.incr('openid.provider.discovery') + self.dogstatsd.increment('openid.provider.discovery') self.handler.send_response(200) self.handler.send_header("Content-type", 'application/xrds+xml') self.handler.send_header("Content-length", str(len(payload))) @@ -3274,6 +3282,7 @@ def openid_user(self, user): ''' % (self.config.url+'?:action=openid_endpoint') self.statsd.incr('openid.provider.user') + self.dogstatsd.increment('openid.provider.user') self.handler.send_response(200) self.handler.send_header("Content-type", 'application/xrds+xml') self.handler.send_header("Content-length", str(len(payload))) @@ -3290,10 +3299,12 @@ def openid_endpoint(self): self.handler.send_header("Content-length", str(len(payload))) self.handler.end_headers() self.statsd.incr('openid.provider.proclamation') + self.dogstatsd.increment('openid.provider.proclamation') self.handler.wfile.write(payload) return if orequest.mode in ['checkid_immediate', 'checkid_setup']: self.statsd.incr('openid.provider.checkid') + self.dogstatsd.increment('openid.provider.checkid') if self.openid_is_authorized(orequest): answer = orequest.answer(True, identity=self.openid_user_url()) return self.openid_response(answer) @@ -3303,9 +3314,11 @@ def openid_endpoint(self): self.openid_decide_page(orequest) elif orequest.mode in ['associate', 'check_authentication']: self.statsd.incr('openid.provider.authenticate') + self.dogstatsd.increment('openid.provider.authenticate') self.openid_response(self.oid_server.handleRequest(orequest)) else: self.statsd.incr('openid.provider.error') + self.dogstatsd.increment('openid.provider.error') raise OpenIDError, "Unknown mode: %s" % orequest.mode def openid_decide_page(self, orequest): @@ -3417,6 +3430,7 @@ def google_login(self): self.store.get_token(self.username) self.store.commit() self.statsd.incr('google_authentication.login') + self.dogstatsd.increment('google_authentication.login') self.home() else: return self.fail("No PyPI user found associated with that Google Account, Associating new accounts has been deprecated.", code=400)