Skip to content

Commit

Permalink
feat: Poll producer on a regular cadence to avoid delayed callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
timmc-edx committed Sep 1, 2022
1 parent 26d9c4f commit 19d3992
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import json
import logging
import threading
import time
import weakref
from functools import lru_cache
from typing import Any, List, Optional

Expand Down Expand Up @@ -167,6 +170,13 @@ class EventProducerKafka():
def __init__(self, producer):
self.producer = producer

threading.Thread(
target=poll_indefinitely,
name="kafka-producer-poll",
args=(weakref.ref(self),), # allow GC but also thread auto-stop (important for tests!)
daemon=True, # don't block shutdown
)

def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
Expand Down Expand Up @@ -210,6 +220,37 @@ def prepare_for_shutdown(self):
self.producer.flush(-1)


def poll_indefinitely(api_weakref: EventProducerKafka):
"""
Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.
The thread stops automatically once the producer is garbage-collected.
This ensures that callbacks are triggered in a timely fashion, rather than waiting
for the poll() call that we make before or after each produce() call. This may be
important if events are produced infrequently, and it allows the last event the
server emits before shutdown to have its callback run (if it happens soon enough.)
"""
# The reason we hold a weakref to the whole EventProducerKafka and
# not directly to the Producer itself is that you just can't make
# a weakref to the latter (perhaps because it's a C object.)
while True:
time.sleep(1.0)

# Temporarily hold a strong ref to the producer API singleton
api_object = api_weakref()
if api_object is None:
return

try:
api_object.producer.poll(0)
except BaseException:
pass
finally:
# Get rid of that strong ref again
api_object = None


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
Expand Down

0 comments on commit 19d3992

Please sign in to comment.