Skip to content

Commit

Permalink
Merge pull request #3251 from peterbe/bug-1249359-store-correlations-…
Browse files Browse the repository at this point in the history
…in-es

Bug 1249359 store correlations in es
  • Loading branch information
Peter Bengtsson committed Mar 30, 2016
2 parents 82fefd4 + e9fe8e6 commit fba686b
Show file tree
Hide file tree
Showing 8 changed files with 751 additions and 75 deletions.
190 changes: 140 additions & 50 deletions socorro/analysis/correlations/correlations_app.py
Expand Up @@ -2,37 +2,24 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import json
import re
import datetime
import os
import tempfile

from itertools import (
product as iter_product,
ifilter,
)
from functools import partial

from collections import defaultdict, Sequence, MutableMapping
from contextlib import contextmanager
import ujson as json

from configman import Namespace, RequiredConfig, class_converter
from configman.dotdict import DotDict as ConfigmanDotDict
from configman.converters import list_converter, to_str

from socorro.analysis.correlations import macdebugids
from socorro.analysis.correlations import addonids
from configman import Namespace, class_converter
from configman.dotdict import DotDict

from socorrolib.app.fetch_transform_save_app import (
FetchTransformSaveWithSeparateNewCrashSourceApp
)

from socorrolib.lib.transform_rules import Rule
from socorrolib.lib.util import DotDict as SocorroDotDict
from socorrolib.lib.converters import change_default
from socorrolib.lib.datetimeutil import UTC
from socorro.external.crashstorage_base import CrashIDNotFound
from socorro.external.postgresql.products import ProductVersions
from socorro.processor.processor_2015 import rule_sets_from_string
from socorro.processor.processor_app import ProcessorApp
from socorro.external.crashstorage_base import (
CrashIDNotFound
)
from socorro.external.boto.crashstorage import BotoS3CrashStorage

#------------------------------------------------------------------------------
correlation_rule_sets = [
Expand All @@ -42,20 +29,27 @@
"socorrolib.lib.transform_rules.TransformRuleSystem",
"apply_all_rules",
"socorro.analysis.correlations.core_count_rule"
".CorrelationCoreCountRule, "
".CorrelationCoreCountRule, "
"socorro.analysis.correlations.interesting_rule"
".CorrelationInterestingModulesRule,"
".CorrelationInterestingModulesRule,"
"socorro.analysis.correlations.interesting_rule"
".CorrelationInterestingModulesVersionsRule,"
".CorrelationInterestingModulesVersionsRule,"
"socorro.analysis.correlations.interesting_rule"
".CorrelationInterestingAddonsRule,"
".CorrelationInterestingAddonsRule,"
"socorro.analysis.correlations.interesting_rule"
".CorrelationInterestingAddonsVersionsRule,"
".CorrelationInterestingAddonsVersionsRule,"
],
]
correlation_rule_sets_as_string = json.dumps(correlation_rule_sets)


def date_with_default_yesterday(value):
if not value:
return datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
y, m, d = [int(x) for x in value.split('-')]
return datetime.date(y, m, d)


#==============================================================================
class CorrelationsApp(FetchTransformSaveWithSeparateNewCrashSourceApp):
""""""
Expand All @@ -73,20 +67,81 @@ class CorrelationsApp(FetchTransformSaveWithSeparateNewCrashSourceApp):
likely_to_be_changed=True,
)

required_config.add_option(
'transaction_executor_class',
default="socorro.database.transaction_executor."
"TransactionExecutorWithInfiniteBackoff",
doc='a class that will manage transactions',
from_string_converter=class_converter,
reference_value_from='resource.postgresql',
)
required_config.add_option(
'database_class',
default=(
'socorro.external.postgresql.connection_context'
'.ConnectionContext'
),
doc='the class responsible for connecting to Postgres',
from_string_converter=class_converter,
reference_value_from='resource.postgresql',
)

required_config.add_option(
name='date',
doc='Specific date to run this for',
default='',
from_string_converter=date_with_default_yesterday,
)

required_config.add_option(
name='product',
doc='Product name',
default='Firefox',
)

#--------------------------------------------------------------------------
def __init__(self, config, quit_check_callback=None):
super(CorrelationsApp, self).__init__(config)

#--------------------------------------------------------------------------
@staticmethod
def get_application_defaults():
return {
"number_of_submissions": 'all',
"source.crashstorage_class":
'socorro.external.boto.crashstorage.BotoS3CrashStorage',
"destination.crashstorage_class":
'socorro.external.crashstorage_base.NullCrashStorage',
"new_crash_source.new_crash_source_class":
'socorro.external.postgresql.new_crash_source'
'.PGPVNewCrashSource',
"source.crashstorage_class": (
'socorro.external.boto.crashstorage.BotoS3CrashStorage'
),
"destination.crashstorage_class": (
'socorro.external.crashstorage_base.NullCrashStorage'
),
"new_crash_source.new_crash_source_class": (
'socorro.external.es.new_crash_source.ESNewCrashSource'
),
}

#--------------------------------------------------------------------------
def _create_iter(self):
hits = ProductVersions(config=self.config).get(
active=True,
product=self.config.product
)['hits']
versions = [
x['version'] for x in hits if not x['version'].endswith('b')
]
assert versions, "No active versions"

# convert a datetime.date object to datetime.datetime
dt = datetime.datetime(
self.config.date.year,
self.config.date.month,
self.config.date.day,
).replace(tzinfo=UTC)
return self.new_crash_source.new_crashes(
dt,
product=self.config.product,
versions=versions,
)

#--------------------------------------------------------------------------
def _transform(self, crash_id):
"""Take a raw_crash and its associated raw_dumps and return a
Expand Down Expand Up @@ -117,20 +172,11 @@ def _transform(self, crash_id):
meta_data
)
self.quit_check()
try:
self.destination.save_processed(processed_crash)
self.config.logger.info('saved - %s', crash_id)
except Exception as x:
self.config.logger.error(
"writing raw: %s",
str(x),
exc_info=True
)

#--------------------------------------------------------------------------
def _setup_source_and_destination(self):
super(CorrelationsApp, self)._setup_source_and_destination()
self.rule_system = ConfigmanDotDict()
self.rule_system = DotDict()
for a_rule_set_name in self.config.rules.rule_sets.names:
self.config.logger.debug(
'setting up rule set: %s',
Expand All @@ -149,9 +195,53 @@ def close(self):
self.config.logger.debug('CorrelationsApp closes')
for a_rule_set_name, a_rule_set in self.rule_system.iteritems():
self.config.logger.debug('closing %s', a_rule_set_name)
try:
a_rule_set.close()
except AttributeError:
# guess we don't need to close that rule
pass
a_rule_set.close()
self.config.logger.debug('done closing rules')


#==============================================================================
class LocallyCachedBotoS3CrashStorage(BotoS3CrashStorage): # pragma: no cover
"""This class is dedicated only for local development, but checked in.
Never enabled by default and has no test coverage.
When you attempt to run correlations repeatedly on your laptop,
you have to download lots and lots of processed crashes from S3.
That's fine once but if you need to re-run it again, you'll have
to download the same stuff again. It's faster then to just use
this class instead of the default BotoS3CrashStorage class.
To enable this class instead, set:
--source.crashstorage_class=socorro.analysis.correlations.correlations_app\
.LocallyCachedBotoS3CrashStorage
in your call to `socorro correlations ...`
"""

def __init__(self, *args, **kwargs):
super(LocallyCachedBotoS3CrashStorage, self).__init__(
*args, **kwargs
)
self.temp_dir = os.path.join(
tempfile.gettempdir(),
'locally_downloaded_processed_crashes'
)
if not os.path.isdir(self.temp_dir):
os.mkdir(self.temp_dir)

def get_unredacted_processed(self, crash_id):
file_path = os.path.join(self.temp_dir, '{}.json'.format(crash_id))
try:
with open(file_path, 'r') as f:
return json.load(f)
except IOError:
crash = (
super(LocallyCachedBotoS3CrashStorage, self)
.get_unredacted_processed(crash_id)
)
with open(file_path, 'w') as f:
json.dump(crash, f)
self.config.logger.debug(
'Cache MISS downloading {}'.format(crash_id)
)
return crash

0 comments on commit fba686b

Please sign in to comment.