Skip to content

Commit

Permalink
Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Feb 10, 2023
1 parent df32e8c commit a923dbf
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
- mongo db - fix db statement capturing
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))

## Version 1.15.0/0.36b0 (2022-12-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
def committed(self, partitions, timeout=-1):
return self._consumer.committed(partitions, timeout)

def commit(self, message=None, offsets=None, asynchronous=True):
if message is not None and offsets is not None:
raise ValueError(
"message and offsets are mutually exclusive for confluent_kafka.Consumer.commit"
)
if message is not None:
return self._consumer.commit(message=message, asynchronous=asynchronous)
if offsets is not None:
return self._consumer.commit(offsets=offsets, asynchronous=asynchronous)
return self._consumer.commit(asynchronous=asynchronous)

def consume(
self, num_messages=1, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from unittest import TestCase

from confluent_kafka import Consumer, Producer
from confluent_kafka import Consumer, Producer, TopicPartition

from opentelemetry.instrumentation.confluent_kafka import (
ConfluentKafkaInstrumentor,
Expand Down Expand Up @@ -58,3 +58,28 @@ def test_instrument_api(self) -> None:

consumer = instrumentation.uninstrument_consumer(consumer)
self.assertEqual(consumer.__class__, Consumer)

def test_consumer_commit_method(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()

consumer = Consumer(
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
)

consumer = instrumentation.instrument_consumer(consumer)
self.assertEqual(consumer.__class__, ProxiedConsumer)
self.assertTrue(hasattr(consumer, "commit"))
# test asserts that calling consumer.commit, with values passed for both message and offsets will raise an error.
valueError = "message and offsets are mutually exclusive for confluent_kafka.Consumer.commit"
self.assertRaisesRegex(
ValueError,
valueError,
consumer.commit,
message={"topic": "test"},
offsets=[TopicPartition(topic="test")],
asynchronous=True,
)

0 comments on commit a923dbf

Please sign in to comment.