Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Put in error handling for consumer #76

Merged
merged 17 commits into from Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -14,6 +14,14 @@ Change Log
Unreleased
**********

[1.6.0] - 2022-11-04
********************

Changed
=======
* Enhanced error logging in consumer, including telemetry for exceptions
* Consumer loop will no longer exit when an error is encountered

[1.5.0] - 2022-11-01
********************

Expand Down
48 changes: 48 additions & 0 deletions docs/decisions/0009-baseline-error-handling-for-consumer.rst
@@ -0,0 +1,48 @@
9. Baseline error handling for the Kafka consumer
#################################################

Status
******

**Provisional**

Context
*******

There are a number of possible types of errors that the consumer may encounter in each iteration of its loop:

- Failure to get an event (network errors, broker or consumer configuration issues)
- Inability to parse a received event (can't talk to schema registry, schema mismatch, bug in serialization on producer)
- Event object that are unusable in some other way (bad or missing type headers, unexpected signal type)
- Errors returned by signal receivers

Some of these are temporary errors that will resolve on retry, such as network errors or signal receivers that encounter a network error making an outbound call. Some are permanent with respect to a given combination of event and consumer code/config, such as an incompatible serialization or schema. And some are permanent just with respect to the consumer itself, and would happen for all events, such as a bug in the consumer loop. For some errors we'll be able to tell which group they fall into (based on exception class or `Confluent Kafka error codes`_) but we cannot do this reliably for *all* errors. We also do not yet have much experience with Kafka and its failure modes on which to base decisions.

.. _Confluent Kafka error codes: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafkaerror

Additionally, the consumer has a limited number of options when it encounters an error. If it cannot retrieve an event, the best it can do is keep trying. If it cannot parse or identify a received event, it can either halt (wait for manual intervention) or start dropping events. And if a signal receiver is in error, the consumer can at best note this somehow.

Halting and waiting for intervention would be appropriate when the consumer is *completely broken*, but would be inappropriate when only a small subset of events cannot be processed. We may not be able to write code that can tell the difference between these situations. Depending on the deployment infrastructure, allowing an exception to propagate out of the consumer loop might result in the consumer being restarted (no overall effect), or it might shut down the consumer entirely until manually restarted (creating a halt).

Decision
********

We will start by catching *every* raised exception before it can escape the consumer loop, logging it with as much information as we think will be necessary for debugging or for restarting the consumer. There's currently no machinery for restarting a consumer group at a particular offset, but it would allow operators to recover from various types of failures (transient, data-specific, and buggy consumer) if needed.

Consumers of the Event Bus are generally expected to tolerate duplicate events (at-least-once delivery) and we know that some of the planned applications are predicated on low latency. Since topics can always be replayed to consumers, failing fast and moving on to the next event is preferable.

As we gain experience, we can add special handling for certain known cases that would be better served by a backoff, retry, or halt.

Receiver exceptions are not *raised*, per se, but are returned by ``send_robust``. We will log these but not treat these as consumer errors, as these exceptions are much more likely to be related to IDA business logic than to the Event Bus itself. There may also be multiple receivers, and failures in one do not indicate that other receivers should stop receiving the event stream.

All raised errors will also be sent to telemetry (New Relic) for monitoring.

Consequences
************

The consumer will err on the side of low latency between IDAs, creating higher (or at least sooner) inter-IDA consistency at the possible expense of inter-object consistency within a topic (if some events are dropped and others processed). We will have to ensure that we capture all failures sufficiently durably that we can replay them, and also that receivers are capable of "healing" their data when a topic is replayed.

Deferred/Rejected Alternatives
******************************

Another common approach in eventing systems is the use of a Dead Letter Queue (DLQ). Events that cannot be processed are sent to a DLQ topic for later reprocessing. (Alternatively, they may be sent to a retry queue in the hopes that the error is transient, and only sent to the DLQ if the retry fails.) These approaches are still worth looking into, but have their own complexity (especially around message ordering) and the decision of whether to use these has been deferred. For now, the logged errors will serve as a crude DLQ.
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer

__version__ = '1.5.0'
__version__ = '1.6.0'
134 changes: 104 additions & 30 deletions edx_event_bus_kafka/internal/consumer.py
Expand Up @@ -3,9 +3,11 @@
"""

import logging
import time

from django.conf import settings
from django.core.management.base import BaseCommand
from edx_django_utils.monitoring import record_exception
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer
from openedx_events.tooling import OpenEdxPublicSignal
Expand All @@ -17,7 +19,7 @@
# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
import confluent_kafka
from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
except ImportError: # pragma: no cover
confluent_kafka = None
Expand All @@ -33,11 +35,29 @@

CONSUMER_POLL_TIMEOUT = getattr(settings, 'EVENT_BUS_KAFKA_CONSUMER_POLL_TIMEOUT', 1.0)

# .. setting_name: EVENT_BUS_KAFKA_CONSUMER_POLL_FAILURE_SLEEP
# .. setting_default: 1.0
# .. setting_description: When the consumer fails to retrieve an event from the broker,
robrap marked this conversation as resolved.
Show resolved Hide resolved
# it will sleep for this many seconds before trying again. This is to prevent fast error-loops
# if the broker is down or the consumer is misconfigured. It *may* also sleep for errors that
# involve receiving an unreadable event, but this could change in the future to be more
# specific to "no event received from broker".
POLL_FAILURE_SLEEP = getattr(settings, 'EVENT_BUS_KAFKA_CONSUMER_POLL_FAILURE_SLEEP', 1.0)

# CloudEvent standard name for the event type header, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER = "ce_type"


class UnusableMessageError(Exception):
"""
Indicates that a message was successfully received but could not be processed.

This could be invalid headers, an unknown signal, or other issue specific to
the contents of the message.
"""


class KafkaEventConsumer:
"""
Construct consumer for the given topic, group, and signal. The consumer can then
Expand All @@ -54,6 +74,7 @@ def __init__(self, topic, group_id, signal):
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
self._shut_down_loop = False

# return type (Optional[DeserializingConsumer]) removed from signature to avoid error on import
def _create_consumer(self):
Expand Down Expand Up @@ -85,77 +106,130 @@ def inner_from_dict(event_data_dict, ctx=None): # pylint: disable=unused-argume

return DeserializingConsumer(consumer_config)

def _shut_down(self):
robrap marked this conversation as resolved.
Show resolved Hide resolved
"""
Test utility for shutting down the consumer loop.
"""
self._shut_down_loop = True

def consume_indefinitely(self):
"""
Consume events from a topic in an infinite loop.
"""
# This is already checked at the Command level, but it's possible this loop
# could get called some other way, so check it here too.
if not KAFKA_CONSUMERS_ENABLED.is_enabled():
logger.error("Kafka consumers not enabled")
logger.error("Kafka consumers not enabled, exiting.")
return

try:
full_topic = get_full_topic(self.topic)
run_context = {
'full_topic': full_topic,
'consumer_group': self.group_id,
'expected_signal': self.signal,
}
self.consumer.subscribe([full_topic])
logger.info(f"Running consumer for {run_context!r}")

# TODO: Make sure exceptions won't kill the loop. https://github.com/openedx/event-bus-kafka/issues/62
while True:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
self.process_single_message(msg)
# Allow unit tests to break out of loop
if self._shut_down_loop:
break

msg = None
try:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
self.emit_signals_from_message(msg)
except BaseException as e:
self.record_event_consuming_error(run_context, e, msg)
robrap marked this conversation as resolved.
Show resolved Hide resolved
# Prevent fast error-looping when no event received from broker. Because
# DeserializingConsumer raises rather than returning a Message when it has an
# error() value, this may be triggered even when a Message *was* returned,
# slowing down the queue. This is probably close enough, though.
if msg is None:
time.sleep(POLL_FAILURE_SLEEP)
finally:
# Close down consumer to commit final offsets.
self.consumer.close()
logger.info("Committing final offsets")

def process_single_message(self, msg):
"""
Emit signal with message data
"""
if msg.error():
# TODO: Iterate on error handling with retry and dead-letter queue topics.
# https://github.com/edx/edx-arch-experiments/issues/55 has broad overview of questions about errors.
if msg.error().code() == KafkaError._PARTITION_EOF: # pylint: disable=protected-access
# End of partition event
logger.info(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset}")
else:
logger.exception(msg.error())
else:
self.emit_signals_from_message(msg)

def emit_signals_from_message(self, msg):
"""
Determine the correct signal and send the event from the message.

Arguments:
msg (Message): Consumed message.
"""
# DeserializingConsumer.poll() always returns either a valid message
# or None, and raises an exception in all other cases. This means
# we don't need to check msg.error() ourselves. But... check it here
# anyway for robustness against code changes.
if msg.error() is not None:
raise UnusableMessageError(
f"Polled message had error object (shouldn't happen): {msg.error()!r}"
)

headers = msg.headers() or [] # treat None as []

event_types = [value for key, value in headers if key == EVENT_TYPE_HEADER]
if len(event_types) == 0:
logger.error(f"Missing {EVENT_TYPE_HEADER} header on message, cannot determine signal")
return
raise UnusableMessageError(
f"Missing {EVENT_TYPE_HEADER} header on message, cannot determine signal"
)
if len(event_types) > 1:
logger.error(f"Multiple {EVENT_TYPE_HEADER} headers found on message, cannot determine signal")
return
raise UnusableMessageError(
f"Multiple {EVENT_TYPE_HEADER} headers found on message, cannot determine signal"
)

event_type = event_types[0]

# CloudEvents specifies using UTF-8 for header values, so let's be explicit.
event_type_str = event_type.decode("utf-8")

# TODO: Maybe raise error here? Or at least set a metric or custom attribute.
if event_type_str != self.signal.event_type:
logger.error(
f"Signal types do not match. Expected {self.signal.event_type}."
raise UnusableMessageError(
f"Signal types do not match. Expected {self.signal.event_type}. "
f"Received message of type {event_type_str}."
)
return

self.signal.send_event(**msg.value())

def record_event_consuming_error(self, run_context, error, maybe_event):
"""
Record an error caught while consuming an event, both to the logs and to telemetry.

Arguments:
run_context: Dictionary of contextual information: full_topic, consumer_group,
and expected_signal.
error: An exception instance
maybe_event: None if event could not be fetched or decoded, or a Message if one
was successfully deserialized but could not be processed for some reason
"""
context_msg = ", ".join(f"{k}={v!r}" for k, v in run_context.items())
if maybe_event is None:
event_msg = "no event available"
else:
event_details = {
'partition': maybe_event.partition(),
'offset': maybe_event.offset(),
'headers': maybe_event.headers(),
'key': maybe_event.key(),
'value': maybe_event.value(),
}
event_msg = f"event details: {event_details!r}"

try:
# This is gross, but our record_exception wrapper doesn't take args at the moment,
# and will only read the exception from stack context.
raise Exception(error)
except BaseException:
record_exception()
logger.exception(
f"Error consuming event from Kafka: {error!r} in context {context_msg} -- {event_msg}"
)


class ConsumeEventsCommand(BaseCommand):
"""
Expand Down Expand Up @@ -193,7 +267,7 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
if not KAFKA_CONSUMERS_ENABLED.is_enabled():
logger.error("Kafka consumers not enabled")
logger.error("Kafka consumers not enabled, exiting.")
return
try:
signal = OpenEdxPublicSignal.get_signal_by_type(options['signal'][0])
Expand Down