Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a more robust filtering mechanism #81

Merged
merged 4 commits into from
Feb 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 72 additions & 46 deletions rollbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import requests
import six

from rollbar.lib import dict_merge, parse_qs, text, transport, urljoin, iteritems
from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems

__version__ = '0.13.17'
__log_name__ = 'rollbar'
Expand Down Expand Up @@ -342,8 +342,9 @@ def init(access_token, environment='production', **kw):
keys=shortener_keys,
**SETTINGS['locals']['sizes'])
_transforms.append(shortener)

_threads = queue.Queue()
events.reset()
filters.add_builtin_filters(SETTINGS)

_initialized = True

Expand Down Expand Up @@ -413,7 +414,7 @@ def report_message(message, level='error', request=None, extra_data=None, payloa

def send_payload(payload, access_token):
"""
Sends a payload object, (the result of calling _build_payload()).
Sends a payload object, (the result of calling _build_payload() + _serialize_payload()).
Uses the configured handler from SETTINGS['handler']

Available handlers:
Expand All @@ -424,29 +425,35 @@ def send_payload(payload, access_token):
- 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine)
- 'twisted': calls _send_payload_twisted() (which makes an async HTTP reqeust using Twisted and Treq)
"""
payload = events.on_payload(payload)
if payload is False:
return

payload_str = _serialize_payload(payload)

handler = SETTINGS.get('handler')
if handler == 'blocking':
_send_payload(payload, access_token)
_send_payload(payload_str, access_token)
elif handler == 'agent':
agent_log.error(payload)
agent_log.error(payload_str)
elif handler == 'tornado':
if TornadoAsyncHTTPClient is None:
log.error('Unable to find tornado')
return
_send_payload_tornado(payload, access_token)
_send_payload_tornado(payload_str, access_token)
elif handler == 'gae':
if AppEngineFetch is None:
log.error('Unable to find AppEngine URLFetch module')
return
_send_payload_appengine(payload, access_token)
_send_payload_appengine(payload_str, access_token)
elif handler == 'twisted':
if treq is None:
log.error('Unable to find Treq')
return
_send_payload_twisted(payload, access_token)
_send_payload_twisted(payload_str, access_token)
else:
# default to 'thread'
thread = threading.Thread(target=_send_payload, args=(payload, access_token))
thread = threading.Thread(target=_send_payload, args=(payload_str, access_token))
_threads.put(thread)
thread.start()

Expand Down Expand Up @@ -601,22 +608,27 @@ def _report_exc_info(exc_info, request, extra_data, payload_data, level=None):
"""
Called by report_exc_info() wrapper
"""
# check if exception is marked ignored
cls, exc, trace = exc_info
if getattr(exc, '_rollbar_ignore', False) or _is_ignored(exc):
return

if not _check_config():
return

data = _build_base_data(request)
filtered_level = _filtered_level(exc_info[1])
if level is None:
level = filtered_level

filtered_exc_info = events.on_exception_info(exc_info,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_exc_info is False:
return

filtered_level = _filtered_level(exc)
if filtered_level:
data['level'] = filtered_level
cls, exc, trace = filtered_exc_info

# explicitly override the level with provided level
if level:
data = _build_base_data(request)
if level is not None:
data['level'] = level

# walk the trace chain to collect cause and context exceptions
Expand Down Expand Up @@ -697,12 +709,21 @@ def _report_message(message, level, request, extra_data, payload_data):
if not _check_config():
return

filtered_message = events.on_message(message,
request=request,
extra_data=extra_data,
payload_data=payload_data,
level=level)

if filtered_message is False:
return

data = _build_base_data(request, level=level)

# message
data['body'] = {
'message': {
'body': message
'body': filtered_message
}
}

Expand Down Expand Up @@ -1201,12 +1222,16 @@ def _build_payload(data):
'data': data
}

return payload


def _serialize_payload(payload):
return json.dumps(payload)


def _send_payload(payload, access_token):
def _send_payload(payload_str, access_token):
try:
_post_api('item/', payload, access_token=access_token)
_post_api('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)
try:
Expand All @@ -1216,14 +1241,14 @@ def _send_payload(payload, access_token):
pass


def _send_payload_appengine(payload, access_token):
def _send_payload_appengine(payload_str, access_token):
try:
_post_api_appengine('item/', payload, access_token=access_token)
_post_api_appengine('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _post_api_appengine(path, payload, access_token=None):
def _post_api_appengine(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
Expand All @@ -1232,29 +1257,29 @@ def _post_api_appengine(path, payload, access_token=None):
url = urljoin(SETTINGS['endpoint'], path)
resp = AppEngineFetch(url,
method="POST",
payload=payload,
payload=payload_str,
headers=headers,
allow_truncated=False,
deadline=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
validate_certificate=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _post_api(path, payload, access_token=None):
def _post_api(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)
resp = transport.post(url,
data=payload,
headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
verify=SETTINGS.get('verify_https', True))
data=payload_str,
headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
verify=SETTINGS.get('verify_https', True))

return _parse_response(path, SETTINGS['access_token'], payload, resp)
return _parse_response(path, SETTINGS['access_token'], payload_str, resp)


def _get_api(path, access_token=None, endpoint=None, **params):
Expand All @@ -1265,46 +1290,47 @@ def _get_api(path, access_token=None, endpoint=None, **params):
return _parse_response(path, access_token, params, resp, endpoint=endpoint)


def _send_payload_tornado(payload, access_token):
def _send_payload_tornado(payload_str, access_token):
try:
_post_api_tornado('item/', payload, access_token=access_token)
_post_api_tornado('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


@tornado_coroutine
def _post_api_tornado(path, payload, access_token=None):
def _post_api_tornado(path, payload_str, access_token=None):
headers = {'Content-Type': 'application/json'}

if access_token is not None:
headers['X-Rollbar-Access-Token'] = access_token

url = urljoin(SETTINGS['endpoint'], path)

resp = yield TornadoAsyncHTTPClient().fetch(
url, body=payload, method='POST', connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT)
)
resp = yield TornadoAsyncHTTPClient().fetch(url,
body=payload_str,
method='POST',
connect_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT),
request_timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))

r = requests.Response()
r._content = resp.body
r.status_code = resp.code
r.headers.update(resp.headers)

_parse_response(path, SETTINGS['access_token'], payload, r)
_parse_response(path, SETTINGS['access_token'], payload_str, r)


def _send_payload_twisted(payload, access_token):
def _send_payload_twisted(payload_str, access_token):
try:
_post_api_twisted('item/', payload, access_token=access_token)
_post_api_twisted('item/', payload_str, access_token=access_token)
except Exception as e:
log.exception('Exception while posting item %r', e)


def _post_api_twisted(path, payload, access_token=None):
def _post_api_twisted(path, payload_str, access_token=None):
def post_data_cb(data, resp):
resp._content = data
_parse_response(path, SETTINGS['access_token'], payload, resp)
_parse_response(path, SETTINGS['access_token'], payload_str, resp)

def post_cb(resp):
r = requests.Response()
Expand All @@ -1317,7 +1343,7 @@ def post_cb(resp):
headers['X-Rollbar-Access-Token'] = [access_token]

url = urljoin(SETTINGS['endpoint'], path)
d = treq.post(url, payload, headers=headers,
d = treq.post(url, payload_str, headers=headers,
timeout=SETTINGS.get('timeout', DEFAULT_TIMEOUT))
d.addCallback(post_cb)

Expand Down
99 changes: 99 additions & 0 deletions rollbar/lib/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
EXCEPTION_INFO = 'exception_info'
MESSAGE = 'message'
PAYLOAD = 'payload'

_event_handlers = {
EXCEPTION_INFO: [],
MESSAGE: [],
PAYLOAD: []
}


def _check_type(typ):
if typ not in _event_handlers:
raise ValueError('Unknown type: %s. Must be one of %s' % (typ, _event_handlers.keys()))


def _add_handler(typ, handler_fn, pos):
_check_type(typ)

pos = pos if pos is not None else -1
handlers = _event_handlers[typ]

try:
handlers.index(handler_fn)
except ValueError:
handlers.insert(pos, handler_fn)


def _remove_handler(typ, handler_fn):
_check_type(typ)

handlers = _event_handlers[typ]

try:
index = handlers.index(handler_fn)
handlers.pop(index)
except ValueError:
pass


def _on_event(typ, target, **kw):
_check_type(typ)

ref = target
for handler in _event_handlers[typ]:
result = handler(ref, **kw)
if result is False:
return False

ref = result

return ref


# Add/remove event handlers

def add_exception_info_handler(handler_fn, pos=None):
_add_handler(EXCEPTION_INFO, handler_fn, pos)


def remove_exception_info_handler(handler_fn):
_remove_handler(EXCEPTION_INFO, handler_fn)


def add_message_handler(handler_fn, pos=None):
_add_handler(MESSAGE, handler_fn, pos)


def remove_message_handler(handler_fn):
_remove_handler(MESSAGE, handler_fn)


def add_payload_handler(handler_fn, pos=None):
_add_handler(PAYLOAD, handler_fn, pos)


def remove_payload_handler(handler_fn):
_remove_handler(PAYLOAD, handler_fn)


# Event handler processing

def on_exception_info(exc_info, **kw):
return _on_event(EXCEPTION_INFO, exc_info, **kw)


def on_message(message, **kw):
return _on_event(MESSAGE, message, **kw)


def on_payload(payload, **kw):
return _on_event(PAYLOAD, payload, **kw)


# Misc

def reset():
for handlers in _event_handlers.values():
del handlers[:]
11 changes: 11 additions & 0 deletions rollbar/lib/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from rollbar.lib import events
from rollbar.lib.filters.basic import filter_rollbar_ignored_exceptions, filter_by_level


def add_builtin_filters(settings):
# exc_info filters
events.add_exception_info_handler(filter_rollbar_ignored_exceptions)
events.add_exception_info_handler(filter_by_level)

# message filters
events.add_message_handler(filter_by_level)
Loading