Skip to content

Commit

Permalink
Add a script for generating signatures from crash pings.
Browse files Browse the repository at this point in the history
This script generates a CSV file of the format "document_id, crash_signature"
by pulling from the `telemetry.crash` table in BigQuery.

The script parallelizes operations internally to reduce total wall-clock time.
  • Loading branch information
wlach authored and staktrace committed Aug 24, 2020
1 parent be74351 commit 20b0846
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
83 changes: 83 additions & 0 deletions bigquery-etl.py
@@ -0,0 +1,83 @@
import math
import sys
from google.cloud import bigquery
from fx_crash_sig.crash_processor import CrashProcessor

QUERY_TEMPLATE = """
SELECT
document_id, payload
FROM
`moz-fx-data-shared-prod`.telemetry.crash
WHERE
normalized_channel="nightly"
AND DATE(submission_timestamp)="{date}"
AND application.build_id > FORMAT_DATE("%Y%m%d", DATE_SUB(DATE "{date}", INTERVAL 1 WEEK))
AND payload IS NOT NULL
AND payload.stack_traces IS NOT NULL
AND payload.stack_traces.crash_info IS NOT NULL
"""

if len(sys.argv) != 2:
print("USAGE: %s <date in YYYY-MM-DD format>" % sys.argv[0])
sys.exit(1)

proc = CrashProcessor(verbose=True,windows=True)

client = bigquery.Client()
query_job = client.query(QUERY_TEMPLATE.format(date=sys.argv[1]))
result = query_job.result()

CHUNK_SIZE = 10
chunk_count = math.ceil(result.total_rows / CHUNK_SIZE)
print(f"Rows: {result.total_rows}, Chunks: {chunk_count}", file=sys.stderr)

""" Generator that yields a tuple of two arrays. The first array holds document
IDs, the second array holds the corresponding payloads. The length of the
two arrays are always the same.
"""
def get_chunks(result):
doc_ids = []
payloads = []
# there's probably a more pythonic way to do this...
for (document_id, payload) in result:
doc_ids.append(document_id)
payloads.append(payload)
if len(doc_ids) < CHUNK_SIZE:
continue
yield (doc_ids, payloads)
doc_ids = []
payloads = []
if len(doc_ids) > 0:
yield (doc_ids, payloads)

def get_sigs(chunk):
(doc_ids, payloads) = chunk
sigs = proc.get_signatures_multi(doc_ids, payloads)
return (doc_ids, sigs)

""" Helper wrapper around a generator whose length we know, to pass to pool.map.
If we don't use this pool.map will turn the generator into a list which
basically reads all the result data from bigtable in one go, which is heavy
on memory. """
class GeneratorLen(object):
def __init__(self, gen, length):
self.gen = gen
self.length = length

def __len__(self):
return self.length

def __iter__(self):
return self.gen

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(10)
chunks = GeneratorLen(get_chunks(result), chunk_count)
all_sigs = pool.map(get_sigs, chunks)

for (chunk_doc_ids, chunk_sigs) in all_sigs:
for (doc_id, sig) in zip(chunk_doc_ids, chunk_sigs):
if sig is None or len(sig.signature) == 0:
print(f"Error computing signature for {doc_id}", file=sys.stderr)
continue
print(f'{doc_id},"{sig.signature}"')
1 change: 1 addition & 0 deletions requirements.txt
@@ -1,3 +1,4 @@
requests==2.22.0
siggen<2
ujson==1.35
google-cloud-bigquery====1.27.2

0 comments on commit 20b0846

Please sign in to comment.