Skip to content

Commit

Permalink
Merge pull request #2648 from twobraids/supplemental-processor
Browse files Browse the repository at this point in the history
fixes Bug 1138488 - added supplemental processing to ProcessorApp
  • Loading branch information
rhelmer committed Mar 5, 2015
2 parents 8f6dcc1 + 990f921 commit 06fdd2a
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 33 deletions.
6 changes: 3 additions & 3 deletions socorro/processor/hybrid_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def _temp_raw_crash_json_file(self, raw_crash, crash_id):
os.unlink(file_pathname)

#--------------------------------------------------------------------------
def convert_raw_crash_to_processed_crash(self, raw_crash, raw_dumps):
def process_crash(self, raw_crash, raw_dumps, ignored_processed_crash):
""" This function is run only by a worker thread.
Given a job, fetch a thread local database connection and the json
document. Use these to create the record in the 'reports' table,
Expand Down Expand Up @@ -1419,8 +1419,8 @@ def _temp_file_context(self, raw_dump_path):
)

#--------------------------------------------------------------------------
def __call__(self, raw_crash, raw_dumps):
self.convert_raw_crash_to_processed_crash(raw_crash, raw_dumps)
def __call__(self, raw_crash, raw_dumps, processed_crash):
self.process_crash(raw_crash, raw_dumps, processed_crash)

#--------------------------------------------------------------------------
def _log_job_start(self, crash_id):
Expand Down
6 changes: 3 additions & 3 deletions socorro/processor/legacy_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def reject_raw_crash(self, crash_id, reason):
self._log_job_end(utc_now(), False, crash_id)

#--------------------------------------------------------------------------
def convert_raw_crash_to_processed_crash(self, raw_crash, raw_dumps):
def process_crash(self, raw_crash, raw_dumps, ignored_processed_crash):
""" This function is run only by a worker thread.
Given a job, fetch a thread local database connection and the json
document. Use these to create the record in the 'reports' table,
Expand Down Expand Up @@ -1212,8 +1212,8 @@ def _temp_file_context(self, raw_dump_path):
)

#--------------------------------------------------------------------------
def __call__(self, raw_crash, raw_dumps):
self.convert_raw_crash_to_processed_crash(raw_crash, raw_dumps)
def __call__(self, raw_crash, raw_dumps, processed_crash):
self.process_crash(raw_crash, raw_dumps, processed_crash)

#--------------------------------------------------------------------------
def _log_job_start(self, crash_id):
Expand Down
18 changes: 15 additions & 3 deletions socorro/processor/processor_2015.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def __init__(self, config, quit_check_callback=None):
)

#--------------------------------------------------------------------------
def convert_raw_crash_to_processed_crash(self, raw_crash, raw_dumps):
def process_crash(self, raw_crash, raw_dumps, processed_crash):
"""Take a raw_crash and its associated raw_dumps and return a
processed_crash.
"""
Expand All @@ -193,8 +193,19 @@ def convert_raw_crash_to_processed_crash(self, raw_crash, raw_dumps):
processor_meta_data.processor = self
processor_meta_data.config = self.config

# create the empty processed crash
processed_crash = DotDict()
if "processor_notes" in processed_crash:
original_processor_notes = [
x.strip() for x in processed_crash.processor_notes.split(";")
]
processor_meta_data.processor_notes.append(
"earlier processing: %s" % processed_crash.get(
"started_datetime",
'Unknown Date'
)
)
else:
original_processor_notes = []

processed_crash.success = False
processed_crash.started_datetime = utc_now()
# for backwards compatibility:
Expand Down Expand Up @@ -242,6 +253,7 @@ def convert_raw_crash_to_processed_crash(self, raw_crash, raw_dumps):

# the processor notes are in the form of a list. Join them all
# together to make a single string
processor_meta_data.processor_notes.extend(original_processor_notes)
processed_crash.processor_notes = '; '.join(
processor_meta_data.processor_notes
)
Expand Down
15 changes: 12 additions & 3 deletions socorro/processor/processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PolyCrashStorage,
CrashIDNotFound,
)
from socorro.lib.util import DotDict


#==============================================================================
Expand Down Expand Up @@ -64,7 +65,7 @@ class ProcessorApp(FetchTransformSaveApp):
required_config.companion_process.add_option(
'companion_class',
doc='a classname that runs a process in parallel with the processor',
default='',
default='',
#default='socorro.processor.symbol_cache_manager.SymbolLRUCacheManager',
from_string_converter=class_converter
)
Expand Down Expand Up @@ -141,13 +142,20 @@ def transform(
return

try:
processed_crash = self.source.get_unredacted_processed(
crash_id
)
except CrashIDNotFound:
processed_crash = DotDict()

try:
if 'uuid' not in raw_crash:
raw_crash.uuid = crash_id
processed_crash = (
self.processor.convert_raw_crash_to_processed_crash(
self.processor.process_crash(
raw_crash,
dumps
dumps,
processed_crash,
)
)
""" bug 866973 - save_raw_and_processed() instead of just
Expand Down Expand Up @@ -233,3 +241,4 @@ def _cleanup(self):

if __name__ == '__main__':
main(ProcessorApp)

14 changes: 8 additions & 6 deletions socorro/unittest/processor/test_hybrid_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_hybrid_processor_basics(self):
)
eq_(m_transform.call_count, 4)

def test_convert_raw_crash_to_processed_crash_basic(self):
def test_process_crash_basic(self):
config = setup_config_with_mocks()
mocked_transform_rules_str = \
'socorro.processor.hybrid_processor.TransformRuleSystem'
Expand Down Expand Up @@ -305,9 +305,10 @@ def test_convert_raw_crash_to_processed_crash_basic(self):

# Here's the call being tested
processed_crash = \
leg_proc.convert_raw_crash_to_processed_crash(
leg_proc.process_crash(
raw_crash,
raw_dump
raw_dump,
{}
)

# test the result
Expand Down Expand Up @@ -410,7 +411,7 @@ def test_convert_raw_crash_to_processed_crash_basic(self):
any_order=True
)

def test_convert_raw_crash_to_processed_crash_unexpected_error(self):
def test_process_crash_unexpected_error(self):
config = setup_config_with_mocks()
mocked_transform_rules_str = \
'socorro.processor.hybrid_processor.TransformRuleSystem'
Expand Down Expand Up @@ -458,9 +459,10 @@ def test_convert_raw_crash_to_processed_crash_unexpected_error(self):

# Here's the call being tested
processed_crash = \
leg_proc.convert_raw_crash_to_processed_crash(
leg_proc.process_crash(
raw_crash,
raw_dump
raw_dump,
{}
)

eq_(1, leg_proc._log_job_end.call_count)
Expand Down
14 changes: 8 additions & 6 deletions socorro/unittest/processor/test_legacy_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_legacy_processor_basics(self):
)
eq_(m_transform.call_count, 2)

def test_convert_raw_crash_to_processed_crash_basic(self):
def test_process_crash_basic(self):
config = setup_config_with_mocks()
mocked_transform_rules_str = \
'socorro.processor.legacy_processor.TransformRuleSystem'
Expand Down Expand Up @@ -303,9 +303,10 @@ def test_convert_raw_crash_to_processed_crash_basic(self):

# Here's the call being tested
processed_crash = \
leg_proc.convert_raw_crash_to_processed_crash(
leg_proc.process_crash(
raw_crash,
raw_dump
raw_dump,
{}
)

# test the result
Expand Down Expand Up @@ -408,7 +409,7 @@ def test_convert_raw_crash_to_processed_crash_basic(self):
any_order=True
)

def test_convert_raw_crash_to_processed_crash_unexpected_error(self):
def test_process_crash_unexpected_error(self):
config = setup_config_with_mocks()
mocked_transform_rules_str = \
'socorro.processor.legacy_processor.TransformRuleSystem'
Expand Down Expand Up @@ -456,9 +457,10 @@ def test_convert_raw_crash_to_processed_crash_unexpected_error(self):

# Here's the call being tested
processed_crash = \
leg_proc.convert_raw_crash_to_processed_crash(
leg_proc.process_crash(
raw_crash,
raw_dump
raw_dump,
{}
)

eq_(1, leg_proc._log_job_end.call_count)
Expand Down
44 changes: 40 additions & 4 deletions socorro/unittest/processor/test_processor_2015.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def test_Processor2015_init(self):
ok_(isinstance(p.rule_system, DotDict))
eq_(len(p.rule_system), 2)
ok_('ruleset01' in p.rule_system)
print p.rule_system.ruleset01
ok_(isinstance(p.rule_system.ruleset01, TransformRuleSystem))
trs = p.rule_system.ruleset01
eq_(trs.act, trs.apply_all_rules)
Expand All @@ -131,7 +130,7 @@ def test_Processor2015_init(self):
ok_(isinstance(trs.rules[0], SetWindowPos))
ok_(isinstance(trs.rules[1], UpdateWindowAttributes))

def test_convert_raw_crash_to_processed_crash_no_rules(self):
def test_process_crash_no_rules(self):
cm = ConfigurationManager(
definition_source=Processor2015.get_required_config(),
values_source_list=[{'rule_sets': '[]'}],
Expand All @@ -145,9 +144,10 @@ def test_convert_raw_crash_to_processed_crash_no_rules(self):
raw_dumps = {}
with patch('socorro.processor.processor_2015.utc_now') as faked_utcnow:
faked_utcnow.return_value = '2015-01-01T00:00:00'
processed_crash = p.convert_raw_crash_to_processed_crash(
processed_crash = p.process_crash(
raw_crash,
raw_dumps
raw_dumps,
DotDict()
)

ok_(processed_crash.success)
Expand All @@ -156,3 +156,39 @@ def test_convert_raw_crash_to_processed_crash_no_rules(self):
eq_(processed_crash.completed_datetime, '2015-01-01T00:00:00')
eq_(processed_crash.completeddatetime, '2015-01-01T00:00:00')
eq_(processed_crash.processor_notes, 'dwight; Processor2015')

def test_process_crash_existing_processed_crash(self):
cm = ConfigurationManager(
definition_source=Processor2015.get_required_config(),
values_source_list=[{'rule_sets': '[]'}],
)
config = cm.get_config()
config.logger = Mock()
config.processor_name = 'dwight'

p = Processor2015(config)
raw_crash = DotDict()
raw_dumps = {}
processed_crash = DotDict()
processed_crash.processor_notes = "we've been here before; yep"
processed_crash.started_datetime = '2014-01-01T00:00:00'
with patch('socorro.processor.processor_2015.utc_now') as faked_utcnow:
faked_utcnow.return_value = '2015-01-01T00:00:00'
processed_crash = p.process_crash(
raw_crash,
raw_dumps,
processed_crash
)

ok_(processed_crash.success)
eq_(processed_crash.started_datetime, '2015-01-01T00:00:00')
eq_(processed_crash.startedDateTime, '2015-01-01T00:00:00')
eq_(processed_crash.completed_datetime, '2015-01-01T00:00:00')
eq_(processed_crash.completeddatetime, '2015-01-01T00:00:00')
eq_(
processed_crash.processor_notes,
"dwight; Processor2015; earlier processing: 2014-01-01T00:00:00; "
"we've been here before; yep"
)


17 changes: 12 additions & 5 deletions socorro/unittest/processor/test_processor_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,21 @@ def test_transform_success(self):
config = self.get_standard_config()
pa = ProcessorApp(config)
pa._setup_source_and_destination()

fake_raw_crash = DotDict()
mocked_get_raw_crash = mock.Mock(return_value=fake_raw_crash)
pa.source.get_raw_crash = mocked_get_raw_crash

fake_dump = {'upload_file_minidump': 'fake_dump_TEMPORARY.dump'}
mocked_get_raw_dumps_as_files = mock.Mock(return_value=fake_dump)
pa.source.get_raw_dumps_as_files = mocked_get_raw_dumps_as_files
mocked_convert_raw_crash_to_processed_crash = mock.Mock(return_value=7)
pa.processor.convert_raw_crash_to_processed_crash = \
mocked_convert_raw_crash_to_processed_crash

fake_processed_crash = DotDict()
mocked_get_unredacted_processed = mock.Mock(return_value=fake_processed_crash)
pa.source.get_unredacted_processed = mocked_get_unredacted_processed

mocked_process_crash = mock.Mock(return_value=7)
pa.processor.process_crash = mocked_process_crash
pa.destination.save_processed = mock.Mock()
finished_func = mock.Mock()
with mock.patch('socorro.processor.processor_app.os.unlink') as mocked_unlink:
Expand All @@ -102,9 +108,10 @@ def test_transform_success(self):
# test results
mocked_unlink.assert_called_with('fake_dump_TEMPORARY.dump')
pa.source.get_raw_crash.assert_called_with(17)
pa.processor.convert_raw_crash_to_processed_crash.assert_called_with(
pa.processor.process_crash.assert_called_with(
fake_raw_crash,
fake_dump
fake_dump,
fake_processed_crash
)
pa.destination.save_raw_and_processed.assert_called_with(fake_raw_crash, None, 7, 17)
eq_(finished_func.call_count, 1)
Expand Down

0 comments on commit 06fdd2a

Please sign in to comment.