Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add a RabbitMQ crash source and the finished_func feature necessary to allow ACKs to be sent after crashes are processed. #1220

Closed
wants to merge 7 commits into from

2 participants

Erik Rose K Lars Lohn
Erik Rose
Owner

Fix bug 866970, 866989.

Erik Rose erikrose commented on the diff
socorro/external/rabbitmq/rmq_new_crash_source.py
((1 lines not shown))
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from configman import Namespace, RequiredConfig
+from configman.converters import class_converter
+
+from functools import partial
+
+
+#==============================================================================
+class RMQNewCrashSource(RequiredConfig):
+ """An iterable of crashes from RabbitMQ"""
+
+ required_config = Namespace()
+ required_config.source.add_option(
Erik Rose Owner
erikrose added a note

Need to remove ".source". I'll take care of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Erik Rose
Owner

Merged into #1223.

Erik Rose erikrose closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 1, 2013
  1. K Lars Lohn
Commits on May 2, 2013
  1. added 'finished_func' to transform method

    lars authored
  2. K Lars Lohn
  3. Erik Rose

    Handle unspecified finished_funcs a different way.

    erikrose authored
    This way, we save a bit of code, and the number of args finished_func takes is self-documenting.
Commits on May 3, 2013
  1. Erik Rose
  2. Erik Rose
  3. Erik Rose
This page is out of date. Refresh to see the latest.
52 socorro/external/rabbitmq/rmq_new_crash_source.py
View
@@ -0,0 +1,52 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from configman import Namespace, RequiredConfig
+from configman.converters import class_converter
+
+from functools import partial
+
+
+#==============================================================================
+class RMQNewCrashSource(RequiredConfig):
+ """An iterable of crashes from RabbitMQ"""
+
+ required_config = Namespace()
+ required_config.source.add_option(
Erik Rose Owner
erikrose added a note

Need to remove ".source". I'll take care of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ 'crashstorage_class',
+ doc='the source storage class',
+ default='socorro.external.rabbitmq.crashstorage.RabbitMQCrashStorage',
+ from_string_converter=class_converter
+ )
+
+ #--------------------------------------------------------------------------
+ def __init__(self, config, processor_name, quit_check_callback=None):
+ self.crash_store = config.crashstorage_class(config)
+
+ #--------------------------------------------------------------------------
+ def close(self):
+ pass
+
+ #--------------------------------------------------------------------------
+ def __iter__(self):
+ """Return an iterator over crashes from RabbitMQ.
+
+ Each crash is a tuple of the ``(args, kwargs)`` variety. The lone arg
+ is a crash ID, and the kwargs contain only a callback function which
+ the FTS app will call to send an ack to Rabbit after processing is
+ complete.
+
+ """
+ for a_crash_id in self.crash_store.new_crashes():
+ yield (
+ (a_crash_id,),
+ {'finished_func': partial(
+ self.crash_store.ack_crash,
+ a_crash_id
+ )}
+ )
+
+ #--------------------------------------------------------------------------
+ def __call__(self):
+ return self.__iter__()
8 socorro/processor/processor_app.py
View
@@ -103,12 +103,13 @@ def quit_check(self):
self.task_manager.quit_check()
#--------------------------------------------------------------------------
- def transform(self, crash_id):
+ def transform(self, crash_id, finished_func=lambda: None):
"""this implementation is the framework on how a raw crash is
converted into a processed crash. The 'crash_id' passed in is used as
a key to fetch the raw crash from the 'source', the conversion funtion
- implemented by the 'processor_class' is applied, and then the
- processed crash is saved to the 'destination'."""
+ implemented by the 'processor_class' is applied, the
+ processed crash is saved to the 'destination', and then 'finished_func'
+ is called."""
try:
raw_crash = self.source.get_raw_crash(crash_id)
dumps = self.source.get_raw_dumps_as_files(crash_id)
@@ -143,6 +144,7 @@ def transform(self, crash_id):
consuming crash_ids the same way that the processor consumes them.
"""
self.destination.save_raw_and_processed(raw_crash, None, processed_crash, crash_id)
+ finished_func()
#--------------------------------------------------------------------------
def _setup_source_and_destination(self):
8 socorro/unittest/processor/test_processor_app.py
View
@@ -99,8 +99,9 @@ def test_transform_success(self):
pa.processor.convert_raw_crash_to_processed_crash = \
mocked_convert_raw_crash_to_processed_crash
pa.destination.save_processed = mock.Mock()
+ finished_func = mock.Mock()
# the call being tested
- pa.transform(17)
+ pa.transform(17, finished_func)
# test results
pa.source.get_raw_crash.assert_called_with(17)
pa.processor.convert_raw_crash_to_processed_crash.assert_called_with(
@@ -108,6 +109,7 @@ def test_transform_success(self):
fake_dump
)
pa.destination.save_raw_and_processed.assert_called_with(fake_raw_crash, None, 7, 17)
+ finished_func.assert_called_once()
def test_transform_crash_id_missing(self):
config = self.get_standard_config()
@@ -116,12 +118,15 @@ def test_transform_crash_id_missing(self):
mocked_get_raw_crash = mock.Mock(side_effect=CrashIDNotFound(17))
pa.source.get_raw_crash = mocked_get_raw_crash
+ finished_func = mock.Mock()
pa.transform(17)
pa.source.get_raw_crash.assert_called_with(17)
pa.processor.reject_raw_crash.assert_called_with(
17,
'this crash cannot be found in raw crash storage'
)
+ finished_func.assert_called_once()
+ self.assertEqual(finished_func.call_count, 0)
def test_transform_unexpected_exception(self):
config = self.get_standard_config()
@@ -136,3 +141,4 @@ def test_transform_unexpected_exception(self):
17,
'error in loading: bummer'
)
+
Something went wrong with that request. Please try again.