Skip to content

Commit

Permalink
Implemented a more robust filtering mechanism
Browse files Browse the repository at this point in the history
You can now add custom filtering logic to determine if the error/message/payload
should be sent to Rollbar.

e.g. Only send messages that match a regular expression:

```py
import re

import rollbar
from rollbar import events

def ignore_unimportant_messages(message, **kw):
    if re.search(r'everthing is broken', message):
        return message
    return False

events.add_message_handler(ignore_unimportant_messages)

"""Don't report to Rollbar"""
rollbar.report_message('everything is fine...')

"""Report to Rollbar"""
rollbar.report_message('everything is broken')
```

@brianr cc @chrisfole
  • Loading branch information
coryvirok authored and rokob committed Feb 16, 2018
1 parent 6f2ae1b commit 91052a0
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 79 deletions.
118 changes: 72 additions & 46 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import requests
import six

from rollbar.lib import dict_merge, parse_qs, text, transport, urljoin, iteritems
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems

__version__ = '0.13.17'
__log_name__ = 'rollbar'
Expand Down Expand Up @@ -342,8 +342,9 @@ def init(access_token, environment='production', **kw):
keys=shortener_keys,
**SETTINGS['locals']['sizes'])
_transforms.append(shortener)

_threads = queue.Queue()
events.reset()
filters.add_builtin_filters(SETTINGS)

_initialized = True

Expand Down Expand Up @@ -413,7 +414,7 @@ def report_message(message, level='error', request=None, extra_data=None, payloa

def send_payload(payload, access_token):
"""
Sends a payload object, (the result of calling _build_payload()).
Sends a payload object, (the result of calling _build_payload() + _serialize_payload()).
Uses the configured handler from SETTINGS['handler']
Available handlers:
Expand All @@ -424,29 +425,35 @@ def send_payload(payload, access_token):
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP reqeust using Twisted and Treq)
"""
payload = events.on_payload(payload)
if payload is False:
return

payload_str = _serialize_payload(payload)

handler = SETTINGS.get('handler')
if handler == 'blocking':
_send_payload(payload, access_token)
_send_payload(payload_str, access_token)
elif handler == 'agent':
agent_log.error(payload)
agent_log.error(payload_str)
elif handler == 'tornado':
if TornadoAsyncHTTPClient is None:
log.error('Unable to find tornado')
return
_send_payload_tornado(payload, access_token)
_send_payload_tornado(payload_str, access_token)
elif handler == 'gae':
if AppEngineFetch is None:
log.error('Unable to find AppEngine URLFetch module')
return
_send_payload_appengine(payload, access_token)
_send_payload_appengine(payload_str, access_token)
elif handler == 'twisted':
if treq is None:
log.error('Unable to find Treq')
return
_send_payload_twisted(payload, access_token)
_send_payload_twisted(payload_str, access_token)
else:
# default to 'thread'
thread = threading.Thread(target=_send_payload, args=(payload, access_token))
thread = threading.Thread(target=_send_payload, args=(payload_str, access_token))
_threads.put(thread)
thread.start()

Expand Down Expand Up @@ -601,22 +608,27 @@ def _report_exc_info(exc_info, request, extra_data, payload_data, level=None):
"""
Called by report_exc_info() wrapper
"""
# check if exception is marked ignored
cls, exc, trace = exc_info
if getattr(exc, '_rollbar_ignore', False) or _is_ignored(exc):
return

if not _check_config():
return

data = _build_base_data(request)
filtered_level = _filtered_level(exc_info[1])
if level is None:
level = filtered_level

filtered_exc_info = events.on_exception_info(exc_info,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_exc_info is False:
return

filtered_level = _filtered_level(exc)
if filtered_level:
data['level'] = filtered_level
cls, exc, trace = filtered_exc_info

# explicitly override the level with provided level
if level:
data = _build_base_data(request)
if level is not None:
data['level'] = level

# walk the trace chain to collect cause and context exceptions
Expand Down Expand Up @@ -697,12 +709,21 @@ def _report_message(message, level, request, extra_data, payload_data):
if not _check_config():
return

filtered_message = events.on_message(message,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_message is False:
return

data = _build_base_data(request, level=level)

# message
data['body'] = {
'message': {
'body': message
'body': filtered_message
}
}

Expand Down Expand Up @@ -1201,12 +1222,16 @@ def _build_payload(data):
'data': data
}

return payload


def _serialize_payload(payload):
return json.dumps(payload)


def _send_payload(payload, access_token):
def _send_payload(payload_str, access_token):
try:
_post_api('item/', payload, access_token=access_token)
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)
try:
Expand All @@ -1216,14 +1241,14 @@ def _send_payload(payload, access_token):
pass


def _send_payload_appengine(payload, access_token):
def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload, access_token=access_token)
_post_api_appengine('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _post_api_appengine(path, payload, access_token=None):
def _post_api_appengine(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
Expand All @@ -1232,29 +1257,29 @@ def _post_api_appengine(path, payload, access_token=None):
url = urljoin(SETTINGS['endpoint'], path)
resp = AppEngineFetch(url,
method="POST",
payload=payload,
payload=payload_str,
headers=headers,
allow_truncated=False,
deadline=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
validate_certificate=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _post_api(path, payload, access_token=None):
def _post_api(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)
resp = transport.post(url,
data=payload,
headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
verify=SETTINGS.get('verify_https', True))
data=payload_str,
headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
verify=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _get_api(path, access_token=None, endpoint=None, **params):
Expand All @@ -1265,46 +1290,47 @@ def _get_api(path, access_token=None, endpoint=None, **params):
return _parse_response(path, access_token, params, resp, endpoint=endpoint)


def _send_payload_tornado(payload, access_token):
def _send_payload_tornado(payload_str, access_token):
try:
_post_api_tornado('item/', payload, access_token=access_token)
_post_api_tornado('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


@tornado_coroutine
def _post_api_tornado(path, payload, access_token=None):
def _post_api_tornado(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)

resp = yield TornadoAsyncHTTPClient().fetch(
url, body=payload, method='POST', connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT)
)
resp = yield TornadoAsyncHTTPClient().fetch(url,
body=payload_str,
method='POST',
connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))

r = requests.Response()
r._content = resp.body
r.status_code = resp.code
r.headers.update(resp.headers)

_parse_response(path, SETTINGS['access_token'], payload, r)
_parse_response(path, SETTINGS['access_token'], payload_str, r)


def _send_payload_twisted(payload, access_token):
def _send_payload_twisted(payload_str, access_token):
try:
_post_api_twisted('item/', payload, access_token=access_token)
_post_api_twisted('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _post_api_twisted(path, payload, access_token=None):
def _post_api_twisted(path, payload_str, access_token=None):
def post_data_cb(data, resp):
resp._content = data
_parse_response(path, SETTINGS['access_token'], payload, resp)
_parse_response(path, SETTINGS['access_token'], payload_str, resp)

def post_cb(resp):
r = requests.Response()
Expand All @@ -1317,7 +1343,7 @@ def post_cb(resp):
headers['X-Rollbar-Access-Token'] = [access_token]

url = urljoin(SETTINGS['endpoint'], path)
d = treq.post(url, payload, headers=headers,
d = treq.post(url, payload_str, headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))
d.addCallback(post_cb)

Expand Down
99 changes: 99 additions & 0 deletions rollbar/lib/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
EXCEPTION_INFO = 'exception_info'
MESSAGE = 'message'
PAYLOAD = 'payload'

_event_handlers = {
EXCEPTION_INFO: [],
MESSAGE: [],
PAYLOAD: []
}


def _check_type(type):
if type not in _event_handlers:
raise ValueError('Unknown type: %s. Must be one of %s' % (type, _event_handlers.keys()))


def _add_handler(type, handler_fn, pos):
_check_type(type)

pos = pos if pos is not None else -1
handlers = _event_handlers[type]

try:
handlers.index(handler_fn)
except ValueError:
handlers.insert(pos, handler_fn)


def _remove_handler(type, handler_fn):
_check_type(type)

handlers = _event_handlers[type]

try:
index = handlers.index(handler_fn)
handlers.pop(index)
except ValueError:
pass


def _on_event(type, target, **kw):
_check_type(type)

ref = target
for handler in _event_handlers[type]:
result = handler(ref, **kw)
if result is False:
return False

ref = result

return ref


# Add/remove event handlers

def add_exception_info_handler(handler_fn, pos=None):
_add_handler(EXCEPTION_INFO, handler_fn, pos)


def remove_exception_info_handler(handler_fn):
_remove_handler(EXCEPTION_INFO, handler_fn)


def add_message_handler(handler_fn, pos=None):
_add_handler(MESSAGE, handler_fn, pos)


def remove_message_handler(handler_fn):
_remove_handler(MESSAGE, handler_fn)


def add_payload_handler(handler_fn, pos=None):
_add_handler(PAYLOAD, handler_fn, pos)


def remove_payload_handler(handler_fn):
_remove_handler(PAYLOAD, handler_fn)


# Event handler processing

def on_exception_info(exc_info, **kw):
return _on_event(EXCEPTION_INFO, exc_info, **kw)


def on_message(message, **kw):
return _on_event(MESSAGE, message, **kw)


def on_payload(payload, **kw):
return _on_event(PAYLOAD, payload, **kw)


# Misc

def reset():
for handlers in _event_handlers.values():
del handlers[:]
11 changes: 11 additions & 0 deletions rollbar/lib/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from rollbar.lib import events
from rollbar.lib.filters.basic import filter_rollbar_ignored_exceptions, filter_by_level


def add_builtin_filters(settings):
# exc_info filters
events.add_exception_info_handler(filter_rollbar_ignored_exceptions)
events.add_exception_info_handler(filter_by_level)

# message filters
events.add_message_handler(filter_by_level)
Loading

0 comments on commit 91052a0

Please sign in to comment.