Skip to content

Commit

Permalink
Merge pull request #411 from openedx/cag/erb-settings
Browse files Browse the repository at this point in the history
feat!: use one configuration for all backends
  • Loading branch information
Ian2012 committed Apr 2, 2024
2 parents a8952cf + cc7c22a commit 8721ce8
Show file tree
Hide file tree
Showing 17 changed files with 90 additions and 121 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
~~~~~~~~~~

[9.0.0]
~~~~~~~

* **BREAKING CHANGE**: Use a single entry point for all event routing backends.
which allows to easily switch bettwen celery and the event bus for the backends.

[8.3.0]

* Allow to use any configured engine to replay tracking logs
Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.3.1'
__version__ = '9.0.0'
3 changes: 2 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ def queue_event(self, redis, event):
Queue the event to be sent to configured routers.
"""
event["timestamp"] = event["timestamp"].isoformat()
if isinstance(event["timestamp"], datetime):
event["timestamp"] = event["timestamp"].isoformat()
queue_size = redis.lpush(self.queue_name, json.dumps(event, cls=DateTimeJSONEncoder))
logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}')

Expand Down
7 changes: 5 additions & 2 deletions event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,17 @@ def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connectio
event1 = copy(self.transformed_event)
event1["timestamp"] = datetime.datetime.now()
event2 = copy(self.transformed_event)
event2["timestamp"] = datetime.datetime.now()
event2_emission = datetime.datetime.now()
event2["timestamp"] = event2_emission
events = [event1, event2]
formatted_events = []
for event in events:
formatted_event = copy(event)
formatted_event["timestamp"] = formatted_event["timestamp"].isoformat()
formatted_event["timestamp"] = event["timestamp"].isoformat()
formatted_events.append(json.dumps(formatted_event).encode('utf-8'))

event2["timestamp"] = event2_emission.isoformat()

redis_mock.rpop.return_value = formatted_events
redis_mock.lpush.return_value = 1
redis_mock.get.return_value.decode.return_value = datetime.datetime.now().isoformat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def __init__(
self.batches_sent = 0

self.tracker = get_tracker()
self.engine = self.tracker.backends[self.transformer_type]
self.engine = self.tracker.backends["event_transformer"]
self.backend = self.engine.backends[self.transformer_type]

def is_known_event(self, event):
"""
Expand Down Expand Up @@ -98,8 +99,7 @@ def send(self):
"""
if self.destination == "LRS":
print(f"Sending {len(self.event_queue)} events to LRS...")
for backend in self.engine.backends.values():
backend.bulk_send(self.event_queue)
self.backend.bulk_send(self.event_queue)
else:
print("Skipping send, we're storing with libcloud instead of an LRS.")

Expand Down
54 changes: 21 additions & 33 deletions event_routing_backends/management/commands/recover_failed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from django.conf import settings
from django.core.management.base import BaseCommand
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.tracker import get_tracker

from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched
Expand All @@ -26,7 +25,7 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"--transformer_type",
choices=["xapi", "caliper", "all"],
choices=["xapi", "caliper"],
required=True,
help="The type of transformed events to recover.",
)
Expand All @@ -46,15 +45,8 @@ def handle(self, *args, **options):
batch_size = options["batch_size"]
tracker = get_tracker()

engines = {
name: engine
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}

if not engines:
logger.info("No compatible backend found.")
return
engine = tracker.backends["event_transformer"]
backend = engine.backends[transformer_type]

# In the recovery process we are disabling batching to prevent
# single event failures from blocking the recovery process.
Expand All @@ -63,28 +55,24 @@ def handle(self, *args, **options):
success = 0
malformed = 0
failed = 0
for name, engine in engines.items():
if transformer_type not in ("all", name):
logger.info("Skipping backend: {}".format(name))
continue
for backend_name, backend in engine.backends.items():
while failed_events := backend.get_failed_events(batch_size):
logger.info(
"Recovering {} failed events for backend {}".format(
len(failed_events), backend_name
)
)
for event in failed_events:
try:
backend.send(event)
success += 1
except EventNotDispatched:
logger.error("Malformed event: {}".format(event["name"]))
malformed += 1
except Exception as e: # pylint: disable=broad-except
# Backend can still be in a bad state, so we need to catch all exceptions
logger.error("Failed to send event: {}".format(e))
failed += 1

while failed_events := backend.get_failed_events(batch_size):
logger.info(
"Recovering {} failed events for backend {}".format(
len(failed_events), transformer_type
)
)
for event in failed_events:
try:
backend.send(event)
success += 1
except EventNotDispatched:
logger.error("Malformed event: {}".format(event["name"]))
malformed += 1
except Exception as e: # pylint: disable=broad-except
# Backend can still be in a bad state, so we need to catch all exceptions
logger.error("Failed to send event: {}".format(e))
failed += 1

logger.info("Recovery process completed.")
logger.info("Recovered events : {}".format(success))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def custom_get_failed_events(self, batch_size):
class TestRecoverFailedEvents(TestCase):
@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"event_transformer": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {"xapi": XAPI_PROCESSOR},
Expand All @@ -61,16 +61,16 @@ def test_send_tracking_log_to_backends(self, mock_get_tracker):
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["event_bus"].backends["xapi"] = mock_backend
tracker.backends["event_transformer"].backends["xapi"] = mock_backend
mock_backend.get_failed_events = Mocker().custom_get_failed_events

call_command("recover_failed_events", transformer_type="all")
call_command("recover_failed_events", transformer_type="xapi")

mock_backend.send.assert_called_once_with({"name": "test"})

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"event_transformer": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {"xapi": XAPI_PROCESSOR},
Expand All @@ -91,17 +91,17 @@ def test_send_tracking_log_to_backends_with_exception(
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["event_bus"].backends["xapi"] = mock_backend
tracker.backends["event_transformer"].backends["xapi"] = mock_backend
mock_backend.get_failed_events = Mocker().custom_get_failed_events
mock_backend.send.side_effect = Exception("Error")

call_command("recover_failed_events", transformer_type="all")
call_command("recover_failed_events", transformer_type="xapi")

mock_logger.error.assert_called_once_with("Failed to send event: Error")

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"event_transformer": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {"xapi": XAPI_PROCESSOR},
Expand All @@ -122,17 +122,17 @@ def test_send_tracking_log_to_backends_with_event_exception(
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["event_bus"].backends["xapi"] = mock_backend
tracker.backends["event_transformer"].backends["xapi"] = mock_backend
mock_backend.get_failed_events = Mocker().custom_get_failed_events
mock_backend.send.side_effect = EventNotDispatched("Error")

call_command("recover_failed_events", transformer_type="all")
call_command("recover_failed_events", transformer_type="xapi")

mock_logger.error.assert_called_once_with("Malformed event: {}".format("test"))

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"event_transformer": {
"ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend",
"OPTIONS": {
"backends": {"xapi": XAPI_PROCESSOR},
Expand All @@ -156,35 +156,9 @@ def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker):
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker
mock_backend = Mock()
tracker.backends["xapi"].backends["xapi"] = mock_backend
tracker.backends["event_transformer"].backends["xapi"] = mock_backend
mock_backend.get_failed_events.return_value = []

call_command("recover_failed_events", transformer_type="xapi")

mock_backend.send.assert_not_called()

@override_settings(
EVENT_TRACKING_BACKENDS={
"event_bus": {
"ENGINE": "eventtracking.backends.logger.LoggerBackend",
"OPTIONS": {},
},
}
)
@patch(
"event_routing_backends.management.commands.recover_failed_events.get_tracker"
)
@patch("event_routing_backends.management.commands.recover_failed_events.logger")
def test_send_tracking_log_to_backends_no_engines(
self, mock_logger, mock_get_tracker
):
"""
Test for send_tracking_log_to_backends
"""
tracker = DjangoTracker()
mock_get_tracker.return_value = tracker

call_command("recover_failed_events", transformer_type="all")

mock_logger.info.assert_any_call("Recovering failed events")
mock_logger.info.assert_any_call("No compatible backend found.")
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def test_transform_command(command_opts, mock_common_calls, caplog, capsys):
mm2.registry.mapping = {"problem_check": 1}
# Fake a process response that can be serialized to json
mm2.return_value = {"foo": "bar"}
tracker.backends["xapi"].processors = [mm2]
for backend in tracker.backends["xapi"].backends.values():
tracker.backends["event_transformer"].processors = [mm2]
for backend in tracker.backends["event_transformer"].backends.values():
backend.bulk_send = MagicMock()

call_command(
Expand Down
49 changes: 25 additions & 24 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,23 @@ def plugin_settings(settings):
'edx.course.grade.now_failed'
]

settings.EVENT_BUS_TRACKING_LOGS = set(allowed_xapi_events + allowed_caliper_events)
allowed_events = set(allowed_xapi_events + allowed_caliper_events)

# Operators can configure the event bus allowed events via EVENT_BUS_TRACKING_LOGS and by default
# we are allowing the supported events by xAPI and Caliper so that operators don't need to configure
# the events manually.
settings.EVENT_BUS_TRACKING_LOGS = allowed_events

settings.EVENT_TRACKING_BACKENDS.update({
'xapi': {
'event_transformer': {
'ENGINE': 'eventtracking.backends.async_routing.AsyncRoutingBackend',
'OPTIONS': {
'backend_name': 'xapi',
'backend_name': 'events',
'processors': [
{
'ENGINE': 'eventtracking.processors.whitelist.NameWhitelistProcessor',
'OPTIONS': {
'whitelist': allowed_xapi_events
'whitelist': allowed_events
}
},
],
Expand All @@ -193,6 +198,12 @@ def plugin_settings(settings):
'ENGINE': 'event_routing_backends.backends.async_events_router.AsyncEventsRouter',
'OPTIONS': {
'processors': [
{
'ENGINE': 'eventtracking.processors.whitelist.NameWhitelistProcessor',
'OPTIONS': {
'whitelist': allowed_xapi_events
}
},
{
'ENGINE':
'event_routing_backends.processors.xapi.transformer_processor.XApiProcessor',
Expand All @@ -201,27 +212,17 @@ def plugin_settings(settings):
],
'backend_name': 'xapi',
}
}
},
},
},
"caliper": {
"ENGINE": "eventtracking.backends.async_routing.AsyncRoutingBackend",
"OPTIONS": {
"backend_name": "caliper",
"processors": [
{
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor",
"OPTIONS": {
"whitelist": allowed_caliper_events
}
}
],
"backends": {
},
"caliper": {
'ENGINE': 'event_routing_backends.backends.async_events_router.AsyncEventsRouter',
"OPTIONS": {
"processors": [
{
"ENGINE": "eventtracking.processors.whitelist.NameWhitelistProcessor",
"OPTIONS": {
"whitelist": allowed_caliper_events
}
},
{
"ENGINE":
"event_routing_backends.processors."
Expand All @@ -240,7 +241,7 @@ def plugin_settings(settings):
"backend_name": "caliper"
}
}
}
}
}
},
},
},
})
10 changes: 6 additions & 4 deletions event_routing_backends/tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ def test_common_settings(self):
Test common settings
"""
common_settings.plugin_settings(settings)
self.assertIn('xapi', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('event_transformer', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('xapi', settings.EVENT_TRACKING_BACKENDS["event_transformer"]["OPTIONS"]["backends"])
self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS["event_transformer"]["OPTIONS"]["backends"])
self.assertIn('edx.course.enrollment.activated', settings.EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS)
self.assertFalse(settings.CALIPER_EVENTS_ENABLED)
self.assertFalse(settings.CALIPER_EVENT_LOGGING_ENABLED)
Expand All @@ -33,8 +34,9 @@ def test_devstack_settings(self):
Test devstack settings
"""
devstack_settings.plugin_settings(settings)
self.assertIn('xapi', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('event_transformer', settings.EVENT_TRACKING_BACKENDS)
self.assertIn('xapi', settings.EVENT_TRACKING_BACKENDS["event_transformer"]["OPTIONS"]["backends"])
self.assertIn('caliper', settings.EVENT_TRACKING_BACKENDS["event_transformer"]["OPTIONS"]["backends"])
self.assertIn('edx.course.enrollment.deactivated', settings.EVENT_TRACKING_BACKENDS_BUSINESS_CRITICAL_EVENTS)
self.assertFalse(settings.CALIPER_EVENTS_ENABLED)
self.assertFalse(settings.CALIPER_EVENT_LOGGING_ENABLED)
Expand Down
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ edx-toggles==5.1.1
# via
# -r requirements/base.in
# event-tracking
event-tracking==2.3.1
event-tracking==2.3.2
# via -r requirements/base.in
fastavro==1.9.4
# via openedx-events
Expand Down
1 change: 1 addition & 0 deletions requirements/constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ sphinx==4.2.0
# This pin can be removed once sphinx constraint is removed.
docutils<0.18
doc8<1.0.0
event-tracking>=2.3.2

0 comments on commit 8721ce8

Please sign in to comment.