diff --git a/event_listener.py b/event_listener.py index 7edef3149..93e23edf4 100644 --- a/event_listener.py +++ b/event_listener.py @@ -40,7 +40,7 @@ def main(mode: Mode, dry_run: bool): LOG.i("Starting with HttpEventSink") sink = HttpEventSink() - runner = Runner(source=source, sink=sink, force_execution=mode == Mode.DEAD_LETTER) + runner = Runner(source=source, sink=sink) runner.run() diff --git a/events/event_source.py b/events/event_source.py index 4d10278a0..f4f893729 100644 --- a/events/event_source.py +++ b/events/event_source.py @@ -56,7 +56,12 @@ def __listen(self, on_event: Callable[[SyncEvent], NoReturn]): webhook_id = int(notify.payload) event = SyncEvent.get_by(id=webhook_id) if event is not None: - on_event(event) + if event.mark_as_taken(): + on_event(event) + else: + LOG.info( + f"Event {event.id} was handled by another runner" + ) else: LOG.info(f"Could not find event with id={notify.payload}") except Exception as e: diff --git a/events/runner.py b/events/runner.py index f970adafd..d6f9c2e02 100644 --- a/events/runner.py +++ b/events/runner.py @@ -8,12 +8,9 @@ class Runner: - def __init__( - self, source: EventSource, sink: EventSink, force_execution: bool = False - ): + def __init__(self, source: EventSource, sink: EventSink): self.__source = source self.__sink = sink - self.__force_execution = force_execution def run(self): self.__source.run(self.__on_event) @@ -21,31 +18,25 @@ def run(self): @newrelic.agent.background_task() def __on_event(self, event: SyncEvent): try: - can_process = event.mark_as_taken() - if can_process or self.__force_execution: - event_created_at = event.created_at - start_time = arrow.now() - success = self.__sink.process(event) - if success: - event_id = event.id - SyncEvent.delete(event.id, commit=True) - LOG.info(f"Marked {event_id} as done") + event_created_at = event.created_at + start_time = arrow.now() + success = self.__sink.process(event) + if success: + event_id = event.id + SyncEvent.delete(event.id, commit=True) + LOG.info(f"Marked {event_id} as done") - end_time = arrow.now() - start_time - time_between_taken_and_created = start_time - event_created_at + end_time = arrow.now() - start_time + time_between_taken_and_created = start_time - event_created_at - newrelic.agent.record_custom_metric( - "Custom/sync_event_processed", 1 - ) - newrelic.agent.record_custom_metric( - "Custom/sync_event_process_time", end_time.total_seconds() - ) - newrelic.agent.record_custom_metric( - "Custom/sync_event_elapsed_time", - time_between_taken_and_created.total_seconds(), - ) - else: - LOG.info(f"{event.id} was handled by another runner") + newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1) + newrelic.agent.record_custom_metric( + "Custom/sync_event_process_time", end_time.total_seconds() + ) + newrelic.agent.record_custom_metric( + "Custom/sync_event_elapsed_time", + time_between_taken_and_created.total_seconds(), + ) except Exception as e: LOG.warn(f"Exception processing event [id={event.id}]: {e}") newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)