Skip to content

Commit

Permalink
Implemented a more robust filtering mechanism
Browse files Browse the repository at this point in the history
You can now add custom filtering logic to determine if the error/message/payload
should be sent to Rollbar.

e.g. Only send messages that match a regular expression:

```py
import re

import rollbar
from rollbar import events

def ignore_unimportant_messages(message, **kw):
    if re.search(r'everthing is broken', message):
        return message
    return False

events.add_message_handler(ignore_unimportant_messages)

"""Don't report to Rollbar"""
rollbar.report_message('everything is fine...')

"""Report to Rollbar"""
rollbar.report_message('everything is broken')
```

@brianr cc @chrisfole
  • Loading branch information
coryvirok committed Nov 3, 2015
1 parent ac8264c commit 94d15f3
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 79 deletions.
119 changes: 73 additions & 46 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import six

from rollbar.lib import dict_merge, map, parse_qs, text, urljoin, urlparse, iteritems
from rollbar.lib import events, filters, dict_merge, map, parse_qs, text, urljoin, urlparse, iteritems


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -355,6 +355,9 @@ def init(access_token, environment='production', **kw):
**SETTINGS['locals']['sizes'])
_transforms.append(shortener)

events.reset()
filters.add_builtin_filters(SETTINGS)

if _initialized:
# NOTE: Temp solution to not being able to re-init.
# New versions of pyrollbar will support re-initialization
Expand Down Expand Up @@ -422,7 +425,7 @@ def report_message(message, level='error', request=None, extra_data=None, payloa

def send_payload(payload, access_token):
"""
Sends a payload object, (the result of calling _build_payload()).
Sends a payload object, (the result of calling _build_payload() + _serialize_payload()).
Uses the configured handler from SETTINGS['handler']
Available handlers:
Expand All @@ -431,29 +434,35 @@ def send_payload(payload, access_token):
- 'agent': writes to a log file to be processed by rollbar-agent
- 'tornado': calls _send_payload_tornado() (which makes an async HTTP request using tornado's AsyncHTTPClient)
"""
payload = events.on_payload(payload)
if payload is False:
return

payload_str = _serialize_payload(payload)

handler = SETTINGS.get('handler')
if handler == 'blocking':
_send_payload(payload, access_token)
_send_payload(payload_str, access_token)
elif handler == 'agent':
agent_log.error(payload, access_token)
agent_log.error(payload_str, access_token)
elif handler == 'tornado':
if TornadoAsyncHTTPClient is None:
log.error('Unable to find tornado')
return
_send_payload_tornado(payload, access_token)
_send_payload_tornado(payload_str, access_token)
elif handler == 'gae':
if AppEngineFetch is None:
log.error('Unable to find AppEngine URLFetch module')
return
_send_payload_appengine(payload, access_token)
_send_payload_appengine(payload_str, access_token)
elif handler == 'twisted':
if TwistedHTTPClient is None:
log.error('Unable to find twisted')
return
_send_payload_twisted(payload, access_token)
_send_payload_twisted(payload_str, access_token)
else:
# default to 'thread'
thread = threading.Thread(target=_send_payload, args=(payload, access_token))
thread = threading.Thread(target=_send_payload, args=(payload_str, access_token))
thread.start()


Expand Down Expand Up @@ -600,22 +609,27 @@ def _report_exc_info(exc_info, request, extra_data, payload_data, level=None):
"""
Called by report_exc_info() wrapper
"""
# check if exception is marked ignored
cls, exc, trace = exc_info
if getattr(exc, '_rollbar_ignore', False) or _is_ignored(exc):
return

if not _check_config():
return

data = _build_base_data(request)
filtered_level = _filtered_level(exc_info[1])
if level is None:
level = filtered_level

filtered_level = _filtered_level(exc)
if filtered_level:
data['level'] = filtered_level
filtered_exc_info = events.on_exception_info(exc_info,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_exc_info is False:
return

# explicitly override the level with provided level
if level:
cls, exc, trace = filtered_exc_info

data = _build_base_data(request)
if level is not None:
data['level'] = level

# exception info
Expand Down Expand Up @@ -661,12 +675,21 @@ def _report_message(message, level, request, extra_data, payload_data):
if not _check_config():
return

filtered_message = events.on_message(message,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_message is False:
return

data = _build_base_data(request, level=level)

# message
data['body'] = {
'message': {
'body': message
'body': filtered_message
}
}

Expand Down Expand Up @@ -1148,24 +1171,28 @@ def _build_payload(data):
'data': data
}

return payload


def _serialize_payload(payload):
return json.dumps(payload)


def _send_payload(payload, access_token):
def _send_payload(payload_str, access_token):
try:
_post_api('item/', payload, access_token=access_token)
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _send_payload_appengine(payload, access_token):
def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload, access_token=access_token)
_post_api_appengine('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _post_api_appengine(path, payload, access_token=None):
def _post_api_appengine(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
Expand All @@ -1174,29 +1201,29 @@ def _post_api_appengine(path, payload, access_token=None):
url = urljoin(SETTINGS['endpoint'], path)
resp = AppEngineFetch(url,
method="POST",
payload=payload,
payload=payload_str,
headers=headers,
allow_truncated=False,
deadline=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
validate_certificate=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _post_api(path, payload, access_token=None):
def _post_api(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)
resp = requests.post(url,
data=payload,
data=payload_str,
headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
verify=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _get_api(path, access_token=None, endpoint=None, **params):
Expand All @@ -1207,44 +1234,45 @@ def _get_api(path, access_token=None, endpoint=None, **params):
return _parse_response(path, access_token, params, resp, endpoint=endpoint)


def _send_payload_tornado(payload, access_token):
def _send_payload_tornado(payload_str, access_token):
try:
_post_api_tornado('item/', payload, access_token=access_token)
_post_api_tornado('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


@tornado_coroutine
def _post_api_tornado(path, payload, access_token=None):
def _post_api_tornado(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)

resp = yield TornadoAsyncHTTPClient().fetch(
url, body=payload, method='POST', connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT)
)
resp = yield TornadoAsyncHTTPClient().fetch(url,
body=payload_str,
method='POST',
connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))

r = requests.Response()
r._content = resp.body
r.status_code = resp.code
r.headers.update(resp.headers)

_parse_response(path, SETTINGS['access_token'], payload, r)
_parse_response(path, SETTINGS['access_token'], payload_str, r)


def _send_payload_twisted(payload, access_token):
def _send_payload_twisted(payload_str, access_token):
try:
_post_api_twisted('item/', payload, access_token=access_token)
_post_api_twisted('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


@inlineCallbacks
def _post_api_twisted(path, payload, access_token=None):
def _post_api_twisted(path, payload_str, access_token=None):
headers = {'Content-Type': ['application/json']}

if access_token is not None:
Expand All @@ -1253,11 +1281,10 @@ def _post_api_twisted(path, payload, access_token=None):
url = urljoin(SETTINGS['endpoint'], path)

agent = TwistedHTTPClient(reactor, connectTimeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))
resp = yield agent.request(
'POST',
url,
TwistedHeaders(headers),
StringProducer(payload))
resp = yield agent.request('POST',
url,
TwistedHeaders(headers),
StringProducer(payload_str))

r = requests.Response()
r.status_code = resp.code
Expand All @@ -1266,7 +1293,7 @@ def _post_api_twisted(path, payload, access_token=None):
resp.deliverBody(ResponseAccumulator(resp.length, bodyDeferred))
body = yield bodyDeferred
r._content = body
_parse_response(path, SETTINGS['access_token'], payload, r)
_parse_response(path, SETTINGS['access_token'], payload_str, r)
yield returnValue(None)


Expand Down
99 changes: 99 additions & 0 deletions rollbar/lib/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
EXCEPTION_INFO = 'exception_info'
MESSAGE = 'message'
PAYLOAD = 'payload'

_event_handlers = {
EXCEPTION_INFO: [],
MESSAGE: [],
PAYLOAD: []
}


def _check_type(type):
if type not in _event_handlers:
raise ValueError('Unknown type: %s. Must be one of %s' % (type, _event_handlers.keys()))


def _add_handler(type, handler_fn, pos):
_check_type(type)

pos = pos if pos is not None else -1
handlers = _event_handlers[type]

try:
handlers.index(handler_fn)
except ValueError:
handlers.insert(pos, handler_fn)


def _remove_handler(type, handler_fn):
_check_type(type)

handlers = _event_handlers[type]

try:
index = handlers.index(handler_fn)
handlers.pop(index)
except ValueError:
pass


def _on_event(type, target, **kw):
_check_type(type)

ref = target
for handler in _event_handlers[type]:
result = handler(ref, **kw)
if result is False:
return False

ref = result

return ref


# Add/remove event handlers

def add_exception_info_handler(handler_fn, pos=None):
_add_handler(EXCEPTION_INFO, handler_fn, pos)


def remove_exception_info_handler(handler_fn):
_remove_handler(EXCEPTION_INFO, handler_fn)


def add_message_handler(handler_fn, pos=None):
_add_handler(MESSAGE, handler_fn, pos)


def remove_message_handler(handler_fn):
_remove_handler(MESSAGE, handler_fn)


def add_payload_handler(handler_fn, pos=None):
_add_handler(PAYLOAD, handler_fn, pos)


def remove_payload_handler(handler_fn):
_remove_handler(PAYLOAD, handler_fn)


# Event handler processing

def on_exception_info(exc_info, **kw):
return _on_event(EXCEPTION_INFO, exc_info, **kw)


def on_message(message, **kw):
return _on_event(MESSAGE, message, **kw)


def on_payload(payload, **kw):
return _on_event(PAYLOAD, payload, **kw)


# Misc

def reset():
for handlers in _event_handlers.values():
del handlers[:]
11 changes: 11 additions & 0 deletions rollbar/lib/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from rollbar.lib import events
from rollbar.lib.filters.basic import filter_rollbar_ignored_exceptions, filter_by_level


def add_builtin_filters(settings):
# exc_info filters
events.add_exception_info_handler(filter_rollbar_ignored_exceptions)
events.add_exception_info_handler(filter_by_level)

# message filters
events.add_message_handler(filter_by_level)
Loading

0 comments on commit 94d15f3

Please sign in to comment.