Skip to content

Commit

Permalink
Add confluent kafka producer poll and flush returns (#2527)
Browse files Browse the repository at this point in the history
  • Loading branch information
dferrochio committed May 30, 2024
1 parent 7bddbb5 commit 41792e7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Breaking changes

- Add return statement to Confluent kafka Producer poll() and flush() calls when instrumented by ConfluentKafkaInstrumentor().instrument_producer() ([#2527](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2527))
- Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi`
([#2300](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2300))
- Rename AwsLambdaInstrumentor span attributes `faas.id` to `cloud.resource_id`, `faas.execution` to `faas.invocation_id`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ def __init__(self, producer: Producer, tracer: Tracer):
self._tracer = tracer

def flush(self, timeout=-1):
self._producer.flush(timeout)
return self._producer.flush(timeout)

def poll(self, timeout=-1):
self._producer.poll(timeout)
return self._producer.poll(timeout)

def produce(
self, topic, value=None, *args, **kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)
from opentelemetry.test.test_base import TestBase

from .utils import MockConsumer, MockedMessage
from .utils import MockConsumer, MockedMessage, MockedProducer


class TestConfluentKafka(TestBase):
Expand Down Expand Up @@ -246,3 +246,35 @@ def _compare_spans(self, spans, expected_spans):
self.assertEqual(
expected_attribute_value, span.attributes[attribute_key]
)

def test_producer_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []

producer = MockedProducer(
message_queue,
{
"bootstrap.servers": "localhost:29092",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.poll()
self.assertIsNotNone(msg)

def test_producer_flush(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []

producer = MockedProducer(
message_queue,
{
"bootstrap.servers": "localhost:29092",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from confluent_kafka import Consumer
from typing import Optional

from confluent_kafka import Consumer, Producer


class MockConsumer(Consumer):
Expand All @@ -20,11 +22,21 @@ def poll(self, timeout=None):


class MockedMessage:
def __init__(self, topic: str, partition: int, offset: int, headers):
def __init__(
self,
topic: str,
partition: int,
offset: int,
headers,
key: Optional[str] = None,
value: Optional[str] = None,
):
self._topic = topic
self._partition = partition
self._offset = offset
self._headers = headers
self._key = key
self._value = value

def topic(self):
return self._topic
Expand All @@ -37,3 +49,35 @@ def offset(self):

def headers(self):
return self._headers

def key(self):
return self._key

def value(self):
return self._value


class MockedProducer(Producer):
def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def produce(
self, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
self._queue.append(
MockedMessage(
topic=kwargs.get("topic"),
partition=0,
offset=0,
headers=[],
key=kwargs.get("key"),
value=kwargs.get("value"),
)
)

def poll(self, *args, **kwargs):
return len(self._queue)

def flush(self, *args, **kwargs):
return len(self._queue)

0 comments on commit 41792e7

Please sign in to comment.