Skip to content

Commit

Permalink
Fixes bug 901977 - Store raw crash data into elasticsearch. r=lars
Browse files Browse the repository at this point in the history
  • Loading branch information
adngdb committed Nov 22, 2013
1 parent affff2c commit 085fd7b
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 307 deletions.
210 changes: 133 additions & 77 deletions scripts/elasticsearch_backfill_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@
"""

import datetime
import json
import pyelasticsearch

from configman import Namespace, converters
from isodate.isoerror import ISO8601Error

from socorro.app import generic_app
from socorro.external.elasticsearch.crashstorage import (
ElasticSearchCrashStorage
)
from socorro.external.hbase.crashstorage import HBaseCrashStorage
from socorro.external.hbase.hbase_client import HBaseConnectionForCrashReports
from socorro.lib import datetimeutil


Expand All @@ -33,15 +27,28 @@ class ElasticsearchBackfillApp(generic_app.App):
app_description = __doc__

required_config = Namespace()
required_config.add_option(
'elasticsearch_storage_class',
default=ElasticSearchCrashStorage,

required_config.namespace('elasticsearch')
required_config.elasticsearch.add_option(
'storage_class',
default='socorro.external.elasticsearch.crashstorage.'
'ElasticSearchCrashStorage',
from_string_converter=converters.class_converter,
doc='The class to use to store crash reports in elasticsearch.'
)
required_config.add_option(
'hbase_storage_class',
default=HBaseCrashStorage,
doc='The class to use to pull crash reports from HBase.'
required_config.elasticsearch.add_option(
'elasticsearch_index_alias',
default='socorro%Y%W_%Y%m%d',
doc='Index to use when reindex data. Will be aliased to the regular '
'index. '
)

required_config.namespace('secondary_storage')
required_config.secondary_storage.add_option(
'storage_class',
default='socorro.external.hb.crashstorage.HBaseCrashStorage',
from_string_converter=converters.class_converter,
doc='The class to use to pull raw crash reports.'
)

required_config.add_option(
Expand All @@ -52,97 +59,146 @@ class ElasticsearchBackfillApp(generic_app.App):
)
required_config.add_option(
'duration',
default=7,
doc='Number of days to backfill. '
default=1,
doc='Number of weeks to backfill. '
)
required_config.add_option(
'index_doc_number',
default=50,
default=100,
doc='Number of crashes to index at a time. '
)

def main(self):
self.es_storage = self.config.elasticsearch_storage_class(self.config)
hb_client = HBaseConnectionForCrashReports(
self.config.hbase_host,
self.config.hbase_port,
self.config.hbase_timeout,
self.es_storage = self.config.elasticsearch.storage_class(
self.config.elasticsearch
)
self.secondary_storage = self.config.secondary_storage.storage_class(
self.config.secondary_storage
)

current_date = self.config.end_date

one_day = datetime.timedelta(days=1)
one_week = datetime.timedelta(weeks=1)
# Iterate over our indices.
for i in range(self.config.duration):
es_index = self.get_index_for_date(current_date)
day = current_date.strftime('%y%m%d')

self.config.logger.info('backfilling crashes for %s', day)

# First create the index if it doesn't already exist
self.es_storage.create_index(es_index)

reports = hb_client.get_list_of_processed_json_for_date(
day,
number_of_retries=5
es_current_index = self.get_index_for_date(
current_date,
self.config.elasticsearch.elasticsearch_index
)
es_new_index = self.get_index_for_date(
current_date,
self.config.elasticsearch.elasticsearch_index_alias
)

crashes_to_index = []
self.config.logger.info(
'backfilling crashes for %s',
es_current_index
)

for report in reports:
processed_crash = self.format_dates_in_crash(
json.loads(report)
try:
reports = self.get_reports(es_current_index, es_fields=[])
total_num_of_crashes_in_index = reports['total']
except pyelasticsearch.exceptions.ElasticHttpNotFoundError:
# This index does not exist, we don't want to reindex it.
self.config.logger.info(
'index %s does not exist, moving on',
es_current_index
)
continue

# First create the new index.
self.es_storage.create_index(es_new_index)

# Get all the reports in elasticsearch, but only a few at a time.
for es_from in range(
0,
total_num_of_crashes_in_index,
self.config.index_doc_number
):
crashes_to_index = []
reports = self.get_reports(
es_current_index,
es_from=es_from,
es_size=self.config.index_doc_number,
)

# print 'storing %s' % processed_crash['uuid']

if len(crashes_to_index) > self.config.index_doc_number:
# print 'now indexing crashes! '
self.index_crashes(es_index, crashes_to_index)
crashes_to_index = []

crashes_to_index.append(processed_crash)

if len(crashes_to_index) > 0:
self.index_crashes(es_index, crashes_to_index)

current_date -= one_day
for report in reports['hits']:
crash_report = report['_source']
if 'uuid' in crash_report:
# This is a legacy crash report, containing only the
# processed crash at the root level.
crash_id = crash_report['uuid']
processed_crash = crash_report
raw_crash = self.secondary_storage.get_raw_crash(
crash_id
)

crash_document = {
'crash_id': crash_id,
'processed_crash': processed_crash,
'raw_crash': raw_crash,
}
elif 'processed_crash' in crash_report:
# This is a new style crash report, with branches for
# the processed crash and the raw crash.
crash_document = crash_report
else:
raise KeyError('''Unable to understand what type of
document was retrieved from elasticsearch''')

crashes_to_index.append(crash_document)

self.index_crashes(es_new_index, crashes_to_index)

# Now that reindexing is done, delete the old index and
# create an alias to the new one.
self.es_storage.es.delete_index(es_current_index)
self.es_storage.es.update_aliases({
'actions': [
{
'add': {
'index': es_new_index,
'alias': es_current_index,
}
}
]
})

current_date -= one_week

return 0

def get_index_for_date(self, day):
"""return the elasticsearch index for a day"""
index = self.config.elasticsearch_index

if not index:
def get_reports(self, index, es_fields=None, es_size=0, es_from=0):
"""Return some reports from an elasticsearch index. """
es_query = {
'query': {
'match_all': {}
},
'fields': es_fields,
'size': es_size,
'from': es_from
}
return self.es_storage.es.search(
es_query,
index=index,
)['hits']

def get_index_for_date(self, date, index_format):
"""return the elasticsearch index for a date"""
if not index_format:
return None
if '%' in index:
index = day.strftime(index)

return index

def format_dates_in_crash(self, processed_crash):
# HBase returns dates in a format that elasticsearch does not
# understand. To keep our elasticsearch mapping simple, we
# transform all dates to a recognized format.
for attr in processed_crash:
try:
processed_crash[attr] = datetimeutil.date_to_string(
datetimeutil.string_to_datetime(
processed_crash[attr]
)
)
except (ValueError, TypeError, ISO8601Error):
# the attribute is not a date
pass
if '%' in index_format:
return date.strftime(index_format)

return processed_crash
return index_format

def index_crashes(self, es_index, crashes_to_index):
self.es_storage.es.bulk_index(
es_index,
self.config.elasticsearch_doctype,
self.config.elasticsearch.elasticsearch_doctype,
crashes_to_index,
id_field='uuid'
id_field='crash_id'
)


Expand Down
6 changes: 3 additions & 3 deletions socorro/external/crashstorage_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def save_processed(self, processed_crash):
quietly do nothing.
parameters:
processed_crash - a mapping contianing the processed crash"""
processed_crash - a mapping containing the processed crash"""
pass

#--------------------------------------------------------------------------
Expand All @@ -158,11 +158,11 @@ def save_raw_and_processed(self, raw_crash, dumps, processed_crash,
this unified method combines saving both raw and processed crashes.
parameters:
raw_crash - a mapping containing the raw crash meta data. It is
raw_crash - a mapping containing the raw crash meta data. It is
often saved as a json file, but here it is in the form
of a dict.
dumps - a dict of dump name keys and binary blob values
processed_crash - a mapping contianing the processed crash
processed_crash - a mapping containing the processed crash
crash_id - the crash key to use for this crash"""
self.save_raw_crash(raw_crash, dumps, crash_id)
self.save_processed(processed_crash)
Expand Down
31 changes: 18 additions & 13 deletions socorro/external/elasticsearch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ def build_query_from_params(params, config):
)

filters["and"].append({
"range": {
"date_processed": {
"from": params["from_date"],
"to": params["to_date"]
}
"range": {
"processed_crash.date_processed": {
"from": params["from_date"],
"to": params["to_date"]
}
})
}
})

if params["report_process"] == "browser":
filters["and"].append({"missing": {"field": "process_type"}})
Expand Down Expand Up @@ -340,9 +340,11 @@ def build_terms_query(fields, terms):

if isinstance(fields, list):
for field in fields:
query[query_type][field] = terms
prefixed_field = "processed_crash.%s" % field
query[query_type][prefixed_field] = terms
else:
query[query_type][fields] = terms
prefixed_field = "processed_crash.%s" % fields
query[query_type][prefixed_field] = terms

return query

Expand All @@ -360,14 +362,17 @@ def build_wildcard_query(fields, terms):
}

if isinstance(fields, list):
for i in fields:
if i == "signature":
i = "signature.full"
wildcard_query["wildcard"][i] = terms
for field in fields:
if field == "signature":
field = "signature.full"

prefixed_field = "processed_crash.%s" % field
wildcard_query["wildcard"][prefixed_field] = terms
else:
if fields == "signature":
fields = "signature.full"
wildcard_query["wildcard"][fields] = terms
prefixed_field = "processed_crash.%s" % fields
wildcard_query["wildcard"][prefixed_field] = terms

return wildcard_query

Expand Down
Loading

0 comments on commit 085fd7b

Please sign in to comment.