diff --git a/CHANGELOG.md b/CHANGELOG.md index 4331bb599c..5680e5fe3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435)) - mongo db - fix db statement capturing ([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512)) +- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer + ([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656)) ## Version 1.15.0/0.36b0 (2022-12-10) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 5d77e1c8e6..e4912313fb 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -173,6 +173,9 @@ def __init__(self, consumer: Consumer, tracer: Tracer): def committed(self, partitions, timeout=-1): return self._consumer.committed(partitions, timeout) + def commit(self, *args, **kwargs): + return self._consumer.commit(*args, **kwargs) + def consume( self, num_messages=1, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index e9462d7898..d78d128760 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -58,3 +58,18 @@ def test_instrument_api(self) -> None: consumer = instrumentation.uninstrument_consumer(consumer) self.assertEqual(consumer.__class__, Consumer) + + def test_consumer_commit_method_exists(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + + consumer = Consumer( + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + } + ) + + consumer = instrumentation.instrument_consumer(consumer) + self.assertEqual(consumer.__class__, ProxiedConsumer) + self.assertTrue(hasattr(consumer, "commit"))