/
breakpad_resource.py
520 lines (409 loc) · 18.9 KB
/
breakpad_resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import cgi
from collections import deque
import hashlib
import io
import logging
import json
import time
import zlib
from everett.component import ConfigOptions, RequiredConfigMixin
from everett.manager import parse_class
import falcon
from falcon.request_helpers import BoundedStream
from gevent.pool import Pool
import markus
from antenna.heartbeat import register_for_life, register_for_heartbeat
from antenna.throttler import (
REJECT,
FAKEACCEPT,
RESULT_TO_TEXT,
Throttler,
)
from antenna.util import (
create_crash_id,
sanitize_dump_name,
utc_now,
validate_crash_id,
)
logger = logging.getLogger(__name__)
mymetrics = markus.get_metrics('breakpad_resource')
#: Maximum number of attempts to save a crash before we give up
MAX_ATTEMPTS = 20
#: SAVE and PUBLISH states of the crash mover
STATE_SAVE = 'save'
STATE_PUBLISH = 'publish'
class MalformedCrashReport(Exception):
"""Exception raised when the crash report payload is malformed.
Message should be an alpha-numeric error code with no spaces.
"""
pass
class CrashReport:
"""Crash report structure."""
def __init__(self, raw_crash, dumps, crash_id, errors=0):
self.raw_crash = raw_crash
self.dumps = dumps
self.crash_id = crash_id
self.errors = errors
self.state = None
def set_state(self, state):
"""Set new state and reset errors."""
self.state = state
self.errors = 0
def positive_int(val):
"""Everett parser that enforces val >= 1."""
val = int(val)
if val < 1:
raise ValueError('val must be greater than 1: %s' % val)
return val
class BreakpadSubmitterResource(RequiredConfigMixin):
"""Handles incoming breakpad crash reports and saves to crashstorage.
This handles incoming HTTP POST requests containing breakpad-style crash
reports in multipart/form-data format.
It can handle compressed or uncompressed POST payloads.
It parses the payload from the HTTP POST request, runs it through the
throttler with the specified rules, generates a crash_id, returns the
crash_id to the HTTP client and then saves the crash using the configured
crashstorage class.
.. Note::
From when a crash comes in to when it's saved by the crashstorage class,
the crash is entirely in memory. Keep that in mind when figuring out
how to scale your Antenna nodes.
The most important configuration bit here is choosing the crashstorage
class.
For example::
CRASHSTORAGE_CLASS=antenna.ext.s3.crashstorage.S3CrashStorage
"""
required_config = ConfigOptions()
required_config.add_option(
'dump_field', default='upload_file_minidump',
doc='The name of the field in the POST data for dumps.'
)
required_config.add_option(
'dump_id_prefix', default='bp-',
doc='The crash type prefix.'
)
required_config.add_option(
'concurrent_crashmovers',
default='2',
parser=positive_int,
doc=(
'The number of crashes concurrently being saved and published. '
'Each process gets this many concurrent crashmovers, so if you\'re '
'running 5 processes on the node, then it\'s '
'(5 * concurrent_crashmovers) sharing upload bandwidth.'
)
)
# crashstorage things
required_config.add_option(
'crashstorage_class',
default='antenna.ext.crashstorage_base.NoOpCrashStorage',
parser=parse_class,
doc='The class in charge of storing crashes.'
)
# crashpublish things
required_config.add_option(
'crashpublish_class',
default='antenna.ext.crashpublish_base.NoOpCrashPublish',
parser=parse_class,
doc='The class in charge of publishing crashes.'
)
def __init__(self, config):
self.config = config.with_options(self)
self.crashstorage = self.config('crashstorage_class')(config.with_namespace('crashstorage'))
self.crashpublish = self.config('crashpublish_class')(config.with_namespace('crashpublish'))
self.throttler = Throttler(config)
# Gevent pool for crashmover workers
self.crashmover_pool = Pool(size=self.config('concurrent_crashmovers'))
# Queue for crashmover work
self.crashmover_queue = deque()
# Register hb functions with heartbeat manager
register_for_heartbeat(self.hb_report_health_stats)
register_for_heartbeat(self.hb_run_crashmover)
# Register life function with heartbeat manager
register_for_life(self.has_work_to_do)
def get_runtime_config(self, namespace=None):
"""Return generator of runtime configuration."""
for item in super().get_runtime_config():
yield item
for item in self.throttler.get_runtime_config():
yield item
for item in self.crashstorage.get_runtime_config(['crashstorage']):
yield item
for item in self.crashpublish.get_runtime_config(['crashpublish']):
yield item
def check_health(self, state):
"""Return health state."""
if hasattr(self.crashstorage, 'check_health'):
self.crashstorage.check_health(state)
if hasattr(self.crashpublish, 'check_health'):
self.crashpublish.check_health(state)
def hb_report_health_stats(self):
"""Heartbeat function to report health stats."""
# The number of crash reports sitting in the work queue; this is a
# direct measure of the health of this process--a number that's going
# up means impending doom
mymetrics.gauge('work_queue_size', value=len(self.crashmover_queue))
def has_work_to_do(self):
"""Return whether this still has work to do."""
work_to_do = (
len(self.crashmover_pool) +
len(self.crashmover_queue)
)
logger.info('work left to do: %s' % work_to_do)
# Indicates whether or not we're sitting on crashes to save--this helps
# keep Antenna alive until we're done saving crashes
return bool(work_to_do)
def extract_payload(self, req):
"""Parse HTTP POST payload.
Decompresses the payload if necessary and then walks through the
FieldStorage converting from multipart/form-data to Python datatypes.
NOTE(willkg): The FieldStorage is poorly documented (in my opinion). It
has a list attribute that is a list of FieldStorage items--one for each
key/val in the form. For attached files, the FieldStorage will have a
name, value and filename and the type should be
``application/octet-stream``. Thus we parse it looking for things of type
``text/plain``, ``application/json``, and application/octet-stream.
:arg falcon.request.Request req: a Falcon Request instance
:returns: (raw_crash dict, dumps dict)
:raises MalformedCrashReport:
"""
# If we don't have a content type, raise MalformedCrashReport
if not req.content_type:
raise MalformedCrashReport('no_content_type')
# If it's the wrong content type or there's no boundary section, raise
# MalformedCrashReport
content_type = [part.strip() for part in req.content_type.split(';', 1)]
if ((len(content_type) != 2 or
content_type[0] != 'multipart/form-data' or
not content_type[1].startswith('boundary='))):
if content_type[0] != 'multipart/form-data':
raise MalformedCrashReport('wrong_content_type')
else:
raise MalformedCrashReport('no_boundary')
content_length = req.content_length or 0
# If there's no content, raise MalformedCrashReport
if content_length == 0:
raise MalformedCrashReport('no_content_length')
# Decompress payload if it's compressed
if req.env.get('HTTP_CONTENT_ENCODING') == 'gzip':
mymetrics.incr('gzipped_crash')
# If the content is gzipped, we pull it out and decompress it. We
# have to do that here because nginx doesn't have a good way to do
# that in nginx-land.
gzip_header = 16 + zlib.MAX_WBITS
try:
data = zlib.decompress(req.stream.read(content_length), gzip_header)
except zlib.error:
# This indicates this isn't a valid compressed stream. Given
# that the HTTP request insists it is, we're just going to
# assume it's junk and not try to process any further.
raise MalformedCrashReport('bad_gzip')
# Stomp on the content length to correct it because we've changed
# the payload size by decompressing it. We save the original value
# in case we need to debug something later on.
req.env['ORIG_CONTENT_LENGTH'] = content_length
content_length = len(data)
req.env['CONTENT_LENGTH'] = str(content_length)
data = io.BytesIO(data)
mymetrics.histogram('crash_size', value=content_length, tags=['payload:compressed'])
else:
# NOTE(willkg): At this point, req.stream is either a
# falcon.request_helper.BoundedStream (in tests) or a
# gunicorn.http.body.Body (in production).
#
# FieldStorage doesn't work with BoundedStream so we pluck out the
# internal stream from that which works fine.
#
# FIXME(willkg): why don't tests work with BoundedStream?
if isinstance(req.stream, BoundedStream):
data = req.stream.stream
else:
data = req.stream
mymetrics.histogram('crash_size', value=content_length, tags=['payload:uncompressed'])
# Stomp on querystring so we don't pull it in
request_env = dict(req.env)
request_env['QUERY_STRING'] = ''
fs = cgi.FieldStorage(fp=data, environ=request_env, keep_blank_values=1)
raw_crash = {}
dumps = {}
has_json = False
has_kvpairs = False
for fs_item in fs.list:
# If the field has no name, then it's probably junk, so let's drop it.
if not fs_item.name:
continue
if fs_item.name == 'dump_checksums':
# We don't want to pick up the dump_checksums from a raw
# crash that was re-submitted.
continue
elif fs_item.type and fs_item.type.startswith('application/json'):
# This is a JSON blob, so load it and override raw_crash with
# it.
has_json = True
raw_crash = json.loads(fs_item.value)
elif fs_item.type and (fs_item.type.startswith('application/octet-stream') or isinstance(fs_item.value, bytes)):
# This is a dump, so add it to dumps using a sanitized dump
# name.
dump_name = sanitize_dump_name(fs_item.name)
dumps[dump_name] = fs_item.value
else:
# This isn't a dump, so it's a key/val pair, so we add that.
has_kvpairs = True
raw_crash[fs_item.name] = fs_item.value
if has_json and has_kvpairs:
# If the crash payload has both kvpairs and a JSON blob, then it's
# malformed and we should dump it.
raise MalformedCrashReport('has_json_and_kv')
return raw_crash, dumps
def get_throttle_result(self, raw_crash):
"""Run raw_crash through throttler for a throttling result.
:arg dict raw_crash: the raw crash to throttle
:returns tuple: ``(result, rule_name, percentage)``
"""
# At this stage, nothing has given us a throttle answer, so we
# throttle the crash.
result, rule_name, throttle_rate = self.throttler.throttle(raw_crash)
# Save the results in the raw_crash itself
raw_crash['legacy_processing'] = result
raw_crash['throttle_rate'] = throttle_rate
return result, rule_name, throttle_rate
@mymetrics.timer_decorator('on_post.time')
def on_post(self, req, resp):
"""Handle incoming HTTP POSTs.
Note: This is executed by the WSGI app, so it and anything it does is
covered by the Sentry middleware.
"""
resp.status = falcon.HTTP_200
start_time = time.time()
# NOTE(willkg): This has to return text/plain since that's what the
# breakpad clients expect.
resp.content_type = 'text/plain'
try:
raw_crash, dumps = self.extract_payload(req)
except MalformedCrashReport as exc:
# If this is malformed, then reject it with malformed error code.
msg = str(exc)
mymetrics.incr('malformed', tags=['reason:%s' % msg])
resp.body = 'Discarded=%s' % msg
return
mymetrics.incr('incoming_crash')
# Add timestamps
current_timestamp = utc_now()
raw_crash['submitted_timestamp'] = current_timestamp.isoformat()
raw_crash['timestamp'] = start_time
# Add checksums and MinidumpSha256Hash
raw_crash['dump_checksums'] = {
dump_name: hashlib.sha256(dump).hexdigest()
for dump_name, dump in dumps.items()
}
raw_crash['MinidumpSha256Hash'] = raw_crash['dump_checksums'].get('upload_file_minidump', '')
# First throttle the crash which gives us the information we need
# to generate a crash id.
throttle_result, rule_name, percentage = self.get_throttle_result(raw_crash)
# Use a uuid if they gave us one and it's valid--otherwise create a new
# one.
if 'uuid' in raw_crash and validate_crash_id(raw_crash['uuid']):
crash_id = raw_crash['uuid']
logger.info('%s has existing crash_id', crash_id)
else:
crash_id = create_crash_id(
timestamp=current_timestamp,
throttle_result=throttle_result
)
raw_crash['uuid'] = crash_id
raw_crash['type_tag'] = self.config('dump_id_prefix').strip('-')
# Log the throttle result
logger.info('%s: matched by %s; returned %s', crash_id, rule_name,
RESULT_TO_TEXT[throttle_result])
mymetrics.incr('throttle_rule', tags=['rule:%s' % rule_name])
mymetrics.incr('throttle', tags=['result:%s' % RESULT_TO_TEXT[throttle_result].lower()])
if throttle_result is REJECT:
# If the result is REJECT, then discard it
resp.body = 'Discarded=rule_%s' % rule_name
elif throttle_result is FAKEACCEPT:
# If the result is a FAKEACCEPT, then we return a crash id, but throw
# the crash away
resp.body = 'CrashID=%s%s\n' % (self.config('dump_id_prefix'), crash_id)
else:
# If the result is not REJECT, then save it and return the CrashID to
# the client
crash_report = CrashReport(raw_crash, dumps, crash_id)
crash_report.set_state(STATE_SAVE)
self.crashmover_queue.append(crash_report)
self.hb_run_crashmover()
resp.body = 'CrashID=%s%s\n' % (self.config('dump_id_prefix'), crash_id)
def hb_run_crashmover(self):
"""Spawn a crashmover if there's work to do."""
# Spawn a new crashmover if there's stuff in the queue and we haven't
# hit the limit of how many we can run
if self.crashmover_queue and self.crashmover_pool.free_count() > 0:
self.crashmover_pool.spawn(self.crashmover_process_queue)
def crashmover_process_queue(self):
"""Process crashmover work.
NOTE(willkg): This has to be super careful not to lose crash reports.
If there's any kind of problem, this must return the crash report to
the relevant queue.
"""
while self.crashmover_queue:
crash_report = self.crashmover_queue.popleft()
try:
if crash_report.state == STATE_SAVE:
# Save crash and then toss crash_id in the publish queue
self.crashmover_save(crash_report)
crash_report.set_state(STATE_PUBLISH)
self.crashmover_queue.append(crash_report)
elif crash_report.state == STATE_PUBLISH:
# Publish crash and we're done
self.crashmover_publish(crash_report)
self.crashmover_finish(crash_report)
except Exception:
mymetrics.incr('%s_crash_exception.count' % crash_report.state)
crash_report.errors += 1
logger.exception(
'Exception when processing queue (%s), state: %s; error %d/%d',
crash_report.crash_id,
crash_report.state,
crash_report.errors,
MAX_ATTEMPTS
)
# After MAX_ATTEMPTS, we give up on this crash and move on
if crash_report.errors < MAX_ATTEMPTS:
self.crashmover_queue.append(crash_report)
else:
logger.error(
'%s: too many errors trying to %s; dropped',
crash_report.crash_id,
crash_report.state
)
mymetrics.incr('%s_crash_dropped.count' % crash_report.state)
def crashmover_finish(self, crash_report):
"""Finish bookkeeping on crash report."""
# Capture the total time it took for this crash to be handled from
# being received from breakpad client to saving to s3.
#
# NOTE(willkg): time.time returns seconds, but .timing() wants
# milliseconds, so we multiply!
delta = (time.time() - crash_report.raw_crash['timestamp']) * 1000
mymetrics.timing('crash_handling.time', value=delta)
mymetrics.incr('save_crash.count')
@mymetrics.timer('crash_save.time')
def crashmover_save(self, crash_report):
"""Save crash report to storage."""
self.crashstorage.save_crash(crash_report)
logger.info('%s saved', crash_report.crash_id)
@mymetrics.timer('crash_publish.time')
def crashmover_publish(self, crash_report):
"""Publish crash_id in publish queue."""
self.crashpublish.publish_crash(crash_report)
logger.info('%s published', crash_report.crash_id)
def join_pool(self):
"""Join the pool.
NOTE(willkg): Only use this in tests!
This is helpful for forcing all the coroutines in the pool to complete
so that we can verify outcomes in the test suite for work that might
cross coroutines.
"""
self.crashmover_pool.join()