Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: move metric increments to lowest callbacks
Browse files Browse the repository at this point in the history
The metric increments were being called for registration API
calls due to an error callback. They weren't called for success
cases as well. Moving them to the lower callbacks with a new
flag should help ensure they're incremented correctly.

Also fixed message_data calls to use increment instead of a gauge
and removed sending of base_tags in websocket.py to avoid too many
metric values.

Closes #958
  • Loading branch information
bbangert committed Jul 19, 2017
1 parent 261b95e commit 637e246
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 89 deletions.
15 changes: 8 additions & 7 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,15 @@ def _route(self, notification, router_data):
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
"updates.client.bridge.apns.{}.sent".format(
router_data["rel_channel"]
),
tags=self._base_tags
)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
10 changes: 5 additions & 5 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ def _process_reply(self, reply, notification, router_data, ttl):
router_data={},
)
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
10 changes: 5 additions & 5 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
log_exception=False)

self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
12 changes: 6 additions & 6 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ def amend_endpoint_response(self, response, router_data):
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
return RouterResponse(202, "Notification Stored")

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
return RouterResponse(200, "Delivered")

@inlineCallbacks
Expand Down
12 changes: 6 additions & 6 deletions autopush/router/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class WebPushRouter(SimpleRouter):
"""SimpleRouter subclass to store individual messages appropriately"""

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand All @@ -36,9 +36,9 @@ def delivered_response(self, notification):
logged_status=200)

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand Down
9 changes: 5 additions & 4 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):

ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci),
"message_size not logged")
eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'],
['source:Stored'])
inc_call = self.conn.db.metrics._client.increment.call_args_list[5]
eq_(inc_call[1]['tags'], ['source:Stored'])
yield self.shut_down(client)

@inlineCallbacks
Expand Down Expand Up @@ -1266,8 +1266,9 @@ def test_message_with_topic(self):
client = yield self.quick_register(use_webpush=True)
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('updates.notification.topic',
tags=['host:localhost', 'use_webpush:True'])
call('ua.command.hello'),
call('ua.command.register'),
call('ua.notification.topic')
])
yield self.shut_down(client)

Expand Down
4 changes: 2 additions & 2 deletions autopush/tests/test_web_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ def test_boto_err(self):
def test_router_response(self):
from autopush.router.interface import RouterResponse
response = RouterResponse(headers=dict(Location="http://a.com/"))
self.base._router_response(response)
self.base._router_response(response, None, None)
self.status_mock.assert_called_with(200, reason=None)

def test_router_response_client_error(self):
from autopush.router.interface import RouterResponse
response = RouterResponse(headers=dict(Location="http://a.com/"),
status_code=400)
self.base._router_response(response)
self.base._router_response(response, None, None)
self.status_mock.assert_called_with(400, reason=None)

def test_router_fail_err(self):
Expand Down
2 changes: 1 addition & 1 deletion autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ def test_notif_finished_with_too_many_messages(self):
d = Deferred()

def check(*args, **kwargs):
eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"])
eq_(self.metrics.increment.call_args[1]['tags'], ["source:Direct"])
ok_(self.proto.force_retry.called)
ok_(self.send_mock.called)
d.callback(True)
Expand Down
48 changes: 29 additions & 19 deletions autopush/web/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize(self):
self._base_tags = {}
self._start_time = time.time()
self._timings = {}
self._handling_message = False

@property
def routers(self):
Expand Down Expand Up @@ -186,7 +187,9 @@ def head(self, *args, **kwargs):
#############################################################
def _write_response(self, status_code, errno, message=None, error=None,
headers=None,
url=DEFAULT_ERR_URL):
url=DEFAULT_ERR_URL,
router_type=None,
vapid=None):
"""Writes out a full JSON error and sets the appropriate status"""
self.set_status(status_code, reason=error)
error_data = dict(
Expand All @@ -207,6 +210,13 @@ def _write_response(self, status_code, errno, message=None, error=None,
if status_code == 410:
self.set_header("Cache-Control", "max-age=86400")

if self._handling_message and status_code >= 300:
self.metrics.increment('notification.message.error',
tags=[
"code:{}".format(status_code),
"router:{}".format(router_type),
"vapid:{}".format(vapid is not None)
])
self._track_timing()
self.finish()

Expand Down Expand Up @@ -255,27 +265,41 @@ def _boto_err(self, fail):
self._write_response(503, errno=202,
message="Communication error, please retry")

def _router_response(self, response):
def _router_response(self, response, router_type, vapid):
for name, val in response.headers.items():
if val is not None:
self.set_header(name, val)

if 200 <= response.status_code < 300:
self.set_status(response.status_code, reason=None)
self.write(response.response_body)

dest = 'Direct'
if response.status_code == 202 or response.logged_status == 202:
dest = 'Stored'

if self._handling_message:
self.metrics.increment('notification.message.success',
tags=[
'destination:{}'.format(dest),
'router:{}'.format(router_type),
'vapid:{}'.format(vapid is not None)
])
self._track_timing(status_code=response.logged_status)
self.finish()
else:
self._write_response(
response.status_code,
errno=response.errno or 999,
message=response.response_body)
message=response.response_body,
router_type=router_type,
vapid=vapid
)

def _router_fail_err(self, fail, router_type=None, vapid=False):
"""errBack for router failures"""
fail.trap(RouterException)
exc = fail.value
success = False
if exc.log_exception:
if exc.status_code >= 500:
fmt = fail.value.message or 'Exception'
Expand All @@ -288,27 +312,13 @@ def _router_fail_err(self, fail, router_type=None, vapid=False):
self.log.debug(format="Success", status_code=exc.status_code,
logged_status=exc.logged_status or 0,
client_info=self._client_info)
success = True
self.metrics.increment('notification.message.success',
tags=[
'destination:Direct',
'router:{}'.format(router_type),
'vapid:{}'.format(vapid is not None)
])
elif 400 <= exc.status_code < 500:
self.log.debug(format="Client error",
status_code=exc.status_code,
logged_status=exc.logged_status or 0,
errno=exc.errno or 0,
client_info=self._client_info)
if not success:
self.metrics.increment('notification.message.error',
tags=[
"code:{}".format(exc.status_code),
"router:{}".format(router_type),
"vapid:{}".format(vapid is not None)
])
self._router_response(exc)
self._router_response(exc, router_type, vapid)

def _write_validation_err(self, errors):
"""Writes a set of validation errors out with details about what
Expand Down
15 changes: 6 additions & 9 deletions autopush/web/simplepush.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
)

from autopush.db import hasher
from autopush.metrics import make_tags
from autopush.web.base import (
threaded_validate,
Notification,
Expand Down Expand Up @@ -104,6 +103,11 @@ def extract_fields(self, d):
class SimplePushHandler(BaseWebHandler):
cors_methods = "PUT"

def initialize(self):
"""Must run on initialization to set ahead of validation"""
super(SimplePushHandler, self).initialize()
self._handling_message = True

@threaded_validate(SimplePushRequestSchema)
def put(self, subscription, version, data):
# type: (Dict[str, Any], str, str) -> Deferred
Expand All @@ -129,21 +133,14 @@ def put(self, subscription, version, data):

def _router_completed(self, response, uaid_data, warning=""):
"""Called after router has completed successfully"""
dest = 'Direct'
if response.status_code == 200 or response.logged_status == 200:
self.log.debug(format="Successful delivery",
client_info=self._client_info)
elif response.status_code == 202 or response.logged_status == 202:
self.log.debug(format="Router miss, message stored.",
client_info=self._client_info)
dest = 'Stored'
time_diff = time.time() - self._start_time
self.metrics.timing("notification.request_time", duration=time_diff)
self.metrics.increment('notification.message.success',
tags=make_tags(
destination=dest,
router='simplepush')
)
response.response_body = (
response.response_body + " " + warning).strip()
self._router_response(response)
self._router_response(response, router_type="simplepush", vapid=None)
20 changes: 9 additions & 11 deletions autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ class WebPushHandler(BaseWebHandler):
"authorization")
cors_response_headers = ("location", "www-authenticate")

def initialize(self):
"""Must run on initialization to set ahead of validation"""
super(WebPushHandler, self).initialize()
self._handling_message = True

@threaded_validate(WebPushRequestSchema)
def post(self,
subscription, # type: Dict[str, Any]
Expand Down Expand Up @@ -502,7 +507,9 @@ def _router_completed(self, response, uaid_data, warning="",
uaid_record=dump_uaid(uaid_data),
client_info=self._client_info)
d = deferToThread(self.db.router.drop_user, uaid_data["uaid"])
d.addCallback(lambda x: self._router_response(response))
d.addCallback(lambda x: self._router_response(response,
router_type,
vapid))
return d
# The router data needs to be updated to include any changes
# requested by the bridge system
Expand All @@ -520,24 +527,15 @@ def _router_completed(self, response, uaid_data, warning="",
return d
else:
# No changes are requested by the bridge system, proceed as normal
dest = 'Direct'
if response.status_code == 200 or response.logged_status == 200:
self.log.debug(format="Successful delivery",
client_info=self._client_info)
elif response.status_code == 202 or response.logged_status == 202:
self.log.debug(
format="Router miss, message stored.",
client_info=self._client_info)
dest = 'Stored'
self.metrics.timing("notification.request_time",
duration=time_diff)
self.metrics.increment('notification.message.success',
tags=make_tags(
destination=dest,
router=router_type,
vapid=(vapid is not None))
)

response.response_body = (
response.response_body + " " + warning).strip()
self._router_response(response)
self._router_response(response, router_type, vapid)
Loading

0 comments on commit 637e246

Please sign in to comment.