Skip to content

Commit

Permalink
feat: Implement producer API for openedx-events
Browse files Browse the repository at this point in the history
Remove caching of `get_producer` since openedx-events will take care of
that for us.

This is part of openedx/openedx-events#87
  • Loading branch information
timmc-edx committed Nov 21, 2022
1 parent 2e15c99 commit 8b5a117
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
7 changes: 7 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ The documentation/ADRs may also be moved to more appropriate places as the proce

The repository works together with the openedx/openedx-events repository to make the fully functional event bus.

To use this implementation of the Event Bus with openedx-events, set the following Django settings::

EVENT_BUS_PRODUCER: edx_event_bus_kafka.get_producer
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS: ...
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL: ...
EVENT_BUS_TOPIC_PREFIX: ...

For manual testing, see `<docs/how_tos/manual_testing.rst>`__.

Documentation
Expand Down
13 changes: 4 additions & 9 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Produce Kafka events from signals.
Main function is ``get_producer()``.
Main function is ``get_producer()``, which should be referred to from ``EVENT_BUS_PRODUCER``.
"""

import json
Expand All @@ -17,6 +17,7 @@
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.monitoring import record_exception
from openedx_events.event_bus import EventBusProducer
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

Expand Down Expand Up @@ -211,7 +212,7 @@ def on_event_deliver(self, err, evt):
f"partition={evt.partition()}")


class KafkaEventProducer():
class KafkaEventProducer(EventBusProducer):
"""
API singleton for event production to Kafka.
Expand Down Expand Up @@ -328,14 +329,9 @@ def poll_indefinitely(api_weakref: KafkaEventProducer):
api_object = None


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producer to be long-lived.
@lru_cache # will just be one cache entry, in practice
def get_producer() -> Optional[KafkaEventProducer]:
"""
Create or retrieve Producer API singleton.
Create or retrieve Producer API instance.
If confluent-kafka library or essential settings are missing, warn and return None.
"""
Expand All @@ -354,4 +350,3 @@ def get_producer() -> Optional[KafkaEventProducer]:
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_serializers.cache_clear()
get_producer.cache_clear()
10 changes: 8 additions & 2 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest import TestCase
from unittest.mock import ANY, Mock, call, patch

import openedx_events.event_bus
import openedx_events.learning.signals
import pytest
from django.test import override_settings
Expand Down Expand Up @@ -85,8 +86,13 @@ def test_get_producer_unconfigured(self):
assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ")

def test_get_producer_configured(self):
"""Creation succeeds when all settings are present."""
"""
Creation succeeds when all settings are present.
Also tests basic compliance with the implementation-loader API in openedx-events.
"""
with override_settings(
EVENT_BUS_PRODUCER='edx_event_bus_kafka.get_producer',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret',
Expand All @@ -95,7 +101,7 @@ def test_get_producer_configured(self):
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
):
assert isinstance(ep.get_producer(), ep.KafkaEventProducer)
assert isinstance(openedx_events.event_bus.get_producer(), ep.KafkaEventProducer)

@patch('edx_event_bus_kafka.internal.producer.logger')
def test_on_event_deliver(self, mock_logger):
Expand Down
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-c constraints.txt

Django # Web application framework
# TODO -- pin min version
openedx-events # Events API
edx_django_utils
edx_toggles

0 comments on commit 8b5a117

Please sign in to comment.