Skip to content

Commit

Permalink
Allow Kafka producer headers to be dict or list
Browse files Browse the repository at this point in the history
  • Loading branch information
mrajashree committed Feb 10, 2023
1 parent df32e8c commit 92001b1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
- mongo db - fix db statement capturing
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))

## Version 1.15.0/0.36b0 (2022-12-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:

if value:
value = value.encode()
carrier.append((key, value))

if isinstance(carrier, list):
carrier.append((key, value))

if isinstance(carrier, dict):
carrier[key] = value


_kafka_getter = KafkaContextGetter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
ProxiedProducer,
)

from opentelemetry.instrumentation.confluent_kafka.utils import KafkaContextSetter


class TestConfluentKafka(TestCase):
def test_instrument_api(self) -> None:
Expand Down Expand Up @@ -58,3 +60,14 @@ def test_instrument_api(self) -> None:

consumer = instrumentation.uninstrument_consumer(consumer)
self.assertEqual(consumer.__class__, Consumer)

def test_context_setter(self) -> None:
context_setter = KafkaContextSetter()

carrier_dict = {"key1": "val1"}
context_setter.set(carrier_dict, "key2", "val2")
self.assertGreaterEqual(carrier_dict.items(), {"key2": "val2".encode()}.items())

carrier_list = [("key1", "val1")]
context_setter.set(carrier_list, "key2", "val2")
self.assertTrue(("key2", "val2".encode()) in carrier_list)

0 comments on commit 92001b1

Please sign in to comment.