Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove confluent-kafka as a hard dependency #26

Merged
merged 17 commits into from
Aug 19, 2022
Merged
60 changes: 60 additions & 0 deletions docs/decisions/0005-optional-import-of-confluent-kafka.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
4. Optional import of confluent-kafka
#####################################

Status
******

Accepted

Context
*******

* `confluent-kafka`_ is a library written and maintained by Confluent, our managed instance provider (see :doc:`0004-kafka-managed-hosting`). The library abstracts out the work for sending and receiving events to and from the Kafka cluster and converting them into message objects.
* confluent-kafka in turn is a wrapper around a C library called `librdkafka`_ (distributed as `librdkafka_dev`)
* librdkafka-dev does not currently have a compiled binary available for Linux/aarch64
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved

As a result of the points above, if a package includes a dependency on confluent-kafka, developers will not be able to install the package in Linux/aarch64 environments.

.. _confluent-kafka: https://github.com/confluentinc/confluent-kafka-python
.. _librdkafka: https://github.com/edenhill/librdkafka

Decision
********

Instead of requiring confluent-kafka directly in base.in, we will wrap all imports of `confluent_kafka`in a `try...catch` block. If the import fails, the library will log an informative message and any calls will fail gracefully.
dianakhuang marked this conversation as resolved.
Show resolved Hide resolved

For example::

try:
import confluent_kafka
from confluent_kafka import DeserializingConsumer
except ImportError:
confluent_kafka = None

Then, later on, before any usage of `DeserializingConsumer`::

if not confluent_kafka:
warn("Confluent_kafka not installed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we warn during import, if at all? Having lots of repeated warnings would be annoying for those that have no intent on using the feature. Or, is this only run by code that is clearly meant to be using the feature?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea that this only warns on import, but the examples below use this pattern where the warning shows up when we try to use confluent_kafka. I'll leave things the way it is now, and then make a decision about this a bit later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only run by code that tries to use the feature. My worry is that, depending on usage, the import warning might be far away from the failure that will happen when someone tries to actually use the code.

return None
...do things with DeserializingConsumer

Consequences
************

This will make developers or other users of the edx-event-bus-kafka library responsible for installing confluent-kafka in their own environments.

For edx.org, we will install confluent-kafka as part of creating the docker containers that will run the services
that use this library.

Rejected Alternatives
*********************

* Make the entire `edx-event-bus-kafka` library an optional dependency in the services that use it (eg edx-platform, course-discovery)

This would require developers to install `edx-event-bus-kafka` separately when setting up their environment. This means it would not be able to be updated with `make upgrade` in the same way we manage versions of all of our other packages. Moreover, this would require separate commits to update the version of the package and update the code that uses it, meaning we would have to use an expand-contract release model for every breaking change. This goes against best practices, being highly error-prone.

We expect edx-event-bus-kafka to change more frequently than `confluent-kafka`, which is why we are more willing to adopt the optional dependency strategy for the latter.

* Keep both `confluent-kafka` and `edx-event-bus-kafka` as a required dependencies

While not necessarily causing problems for edx.org, this would break many community-hosted Open edX instances as well as many development environments.
1 change: 1 addition & 0 deletions docs/how_tos/manual_testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The producer can be tested manually against a Kafka running in devstack.

#. In devstack, run ``make devpi-up studio-up-without-deps-shell`` to bring up Studio with a shell.
#. In the Studio shell, run ``pip install -e /edx/src/event-bus-kafka``
#. In the Studio shell, run ``pip install confluent-kafka`` (necessary external dependency)
dianakhuang marked this conversation as resolved.
Show resolved Hide resolved
#. Test the producer:

- Run the example command listed in the ``edx_event_bus_kafka.management.commands.produce_event.Command`` docstring
Expand Down
14 changes: 12 additions & 2 deletions edx_event_bus_kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@
import warnings
from typing import Optional

from confluent_kafka.schema_registry import SchemaRegistryClient
from django.conf import settings

try:
import confluent_kafka
from confluent_kafka.schema_registry import SchemaRegistryClient
except ImportError:
confluent_kafka = None

def create_schema_registry_client() -> Optional[SchemaRegistryClient]:

# return type (Optional[SchemaRegistryClient]) removed for better error messaging when confluent-kafka is not available
dianakhuang marked this conversation as resolved.
Show resolved Hide resolved
def create_schema_registry_client():
"""
Create a schema registry client from common settings.
"""
if not confluent_kafka:
warnings.warn('Library confluent-kafka not available. Cannot create schema registry client.')
return None

url = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL', None)
if url is None:
warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL")
Expand Down
25 changes: 18 additions & 7 deletions edx_event_bus_kafka/consumer/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import logging

from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka.schema_registry.avro import AvroDeserializer
from django.conf import settings
from django.core.management.base import BaseCommand
from django.dispatch import receiver
Expand All @@ -19,6 +17,13 @@

logger = logging.getLogger(__name__)

try:
import confluent_kafka
from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka.schema_registry.avro import AvroDeserializer
except ImportError:
confluent_kafka = None

# .. toggle_name: EVENT_BUS_KAFKA_CONSUMERS_ENABLED
# .. toggle_implementation: SettingToggle
# .. toggle_default: False
Expand All @@ -44,12 +49,17 @@ class KafkaEventConsumer:
"""

def __init__(self, topic, group_id, signal):
self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
if confluent_kafka:
self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
else:
raise Exception('Library confluent-kafka not available. Cannot create event consumer.')
dianakhuang marked this conversation as resolved.
Show resolved Hide resolved

def _create_consumer(self) -> DeserializingConsumer:
# return type (Optional[DeserializingConsumer]) removed for better error messaging when confluent-kafka is not
# available
def _create_consumer(self):
"""
Create a DeserializingConsumer for events of the given signal instance.
"""
Expand Down Expand Up @@ -165,6 +175,7 @@ class ConsumeEventsCommand(BaseCommand):

example:
python3 manage.py cms consume_events -t user-event-debug -g user-event-consumers
-s org.openedx.learning.auth.session.login.completed.v1

# TODO (EventBus): Add pointer to relevant future docs around topics and consumer groups, and potentially
update example topic and group names to follow any future naming conventions.
Expand Down
7 changes: 6 additions & 1 deletion edx_event_bus_kafka/consumer/tests/test_event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from contextlib import contextmanager
from unittest.mock import Mock, patch

from confluent_kafka.serialization import StringSerializer
from django.core.management import call_command
from django.test import TestCase
from django.test.utils import override_settings
Expand All @@ -16,6 +15,12 @@
from edx_event_bus_kafka.consumer.event_consumer import KafkaEventConsumer
from edx_event_bus_kafka.management.commands.consume_events import Command

try:
import confluent_kafka
from confluent_kafka.serialization import StringSerializer
except ImportExcept:
confluent_kafka = None


class FakeMessage:
"""
Expand Down
3 changes: 1 addition & 2 deletions edx_event_bus_kafka/management/commands/produce_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ class Command(BaseCommand):
Produce a single test event.
"""
help = """
This starts a Kafka event consumer that listens to the specified topic and logs all messages it receives. Topic
is required.
This starts a Kafka event consumer that produces a test event with the given data to the specified topic.

example:
python3 manage.py cms produce_event --signal openedx_events.learning.signals.SESSION_LOGIN_COMPLETED \
Expand Down
19 changes: 15 additions & 4 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import json
import logging
from functools import lru_cache
from typing import Any, List, Optional
from typing import Any, List

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry.avro import AvroSerializer
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

from edx_event_bus_kafka.config import create_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

try:
import confluent_kafka
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry.avro import AvroSerializer
except ImportError:
confluent_kafka = None

# CloudEvent standard name for the event type header, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER_KEY = "ce_type"
Expand Down Expand Up @@ -118,8 +123,10 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producers to be long-lived.

# return type (Optional[SerializingProducer]) removed for better error messaging when confluent-kafka is not available
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
@lru_cache
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str):
"""
Create the producer for a signal and a key field path.

Expand All @@ -135,6 +142,10 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -
remote-config (and in particular does not result in mixed cache/uncached configuration).
This complexity is being deferred until this becomes a performance issue.
"""
if not confluent_kafka:
logger.warning('Library confluent-kafka not available. Cannot create event producer.')
return None

schema_registry_client = create_schema_registry_client()
if schema_registry_client is None:
return None
Expand Down
6 changes: 5 additions & 1 deletion edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

import openedx_events.learning.signals
import pytest
from confluent_kafka import SerializingProducer
from django.test import override_settings
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.learning.data import UserData, UserPersonalData

import edx_event_bus_kafka.publishing.event_producer as ep

try:
from confluent_kafka import SerializingProducer
except ImportError:
confluent_kafka = None


class TestEventProducer(TestCase):
"""Test producer."""
Expand Down
1 change: 0 additions & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
-c constraints.txt

Django # Web application framework
confluent_kafka[avro,schema-registry] # Kafka client
openedx-events # Events API
edx_django_utils
edx_toggles
32 changes: 8 additions & 24 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,17 @@
#
asgiref==3.5.2
# via django
attrs==21.4.0
attrs==22.1.0
# via openedx-events
avro==1.10.0
# via confluent-kafka
certifi==2022.6.15
# via requests
cffi==1.15.1
# via pynacl
charset-normalizer==2.1.0
# via requests
click==8.1.3
# via
# code-annotations
# edx-django-utils
code-annotations==1.3.0
# via edx-toggles
confluent-kafka[avro,schema-registry]==1.9.0
# via -r requirements/base.in
django==3.2.14
django==3.2.15
# via
# -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt
# -r requirements/base.in
Expand All @@ -48,21 +40,17 @@ edx-opaque-keys[django]==2.3.0
# via openedx-events
edx-toggles==5.0.0
# via -r requirements/base.in
fastavro==1.5.3
# via
# confluent-kafka
# openedx-events
idna==3.3
# via requests
fastavro==1.6.0
# via openedx-events
jinja2==3.1.2
# via code-annotations
markupsafe==2.1.1
# via jinja2
newrelic==7.14.0.177
newrelic==8.0.0.179
# via edx-django-utils
openedx-events==0.10.0
openedx-events==0.12.0
# via -r requirements/base.in
pbr==5.9.0
pbr==5.10.0
# via stevedore
psutil==5.9.1
# via edx-django-utils
Expand All @@ -74,12 +62,10 @@ pynacl==1.5.0
# via edx-django-utils
python-slugify==6.1.2
# via code-annotations
pytz==2022.1
pytz==2022.2.1
# via django
pyyaml==6.0
# via code-annotations
requests==2.28.1
# via confluent-kafka
sqlparse==0.4.2
# via django
stevedore==4.0.0
Expand All @@ -89,5 +75,3 @@ stevedore==4.0.0
# edx-opaque-keys
text-unidecode==1.3
# via python-slugify
urllib3==1.26.10
# via requests
12 changes: 5 additions & 7 deletions requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ charset-normalizer==2.1.0
# via requests
codecov==2.1.12
# via -r requirements/ci.in
coverage==6.4.2
coverage==6.4.4
# via codecov
distlib==0.3.5
# via virtualenv
filelock==3.7.1
filelock==3.8.0
# via
# tox
# virtualenv
Expand All @@ -33,14 +33,12 @@ pyparsing==3.0.9
requests==2.28.1
# via codecov
six==1.16.0
# via
# tox
# virtualenv
# via tox
toml==0.10.2
# via tox
tox==3.25.1
# via -r requirements/ci.in
urllib3==1.26.10
urllib3==1.26.11
# via requests
virtualenv==20.15.1
virtualenv==20.16.3
# via tox
Loading