From 1e898548325422fa93f3ebe7ae12b9df7342ccbd Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Wed, 15 Feb 2023 14:17:28 +0200 Subject: [PATCH] Add confluent kafka docs (#1668) * add elasticsearch to docs * add confluent kafka to docs * tox generate fix * tox docs fix --------- Co-authored-by: Srikanth Chekuri --- docs-requirements.txt | 1 + docs/conf.py | 27 ++++-------- .../confluent_kafka/confluent_kafka.rst | 7 +++ docs/nitpick-exceptions.ini | 23 +++++++++- .../confluent_kafka/__init__.py | 43 ++++++++++--------- 5 files changed, 59 insertions(+), 42 deletions(-) create mode 100644 docs/instrumentation/confluent_kafka/confluent_kafka.rst diff --git a/docs-requirements.txt b/docs-requirements.txt index 7fb89a9bda..2d09dc0441 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -25,6 +25,7 @@ asyncpg>=0.12.0 boto~=2.0 botocore~=1.0 celery>=4.0 +confluent-kafka>= 1.8.2,< 2.0.0 elasticsearch>=2.0,<9.0 flask~=2.0 falcon~=2.0 diff --git a/docs/conf.py b/docs/conf.py index 93d5b7cdca..7c628b7cd8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -126,25 +126,14 @@ def getlistcfg(strval): ] -if "class_references" in mcfg: - class_references = getlistcfg(mcfg["class_references"]) - for class_reference in class_references: - nitpick_ignore.append( - ( - "py:class", - class_reference, - ) - ) - -if "anys" in mcfg: - anys = getlistcfg(mcfg["anys"]) - for _any in anys: - nitpick_ignore.append( - ( - "any", - _any, - ) - ) +ignore_categories = ["py-class", "py-func", "py-exc", "any"] + +for category in ignore_categories: + if category in mcfg: + items = getlistcfg(mcfg[category]) + for item in items: + nitpick_ignore.append((category.replace("-", ":"), item)) + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/docs/instrumentation/confluent_kafka/confluent_kafka.rst b/docs/instrumentation/confluent_kafka/confluent_kafka.rst new file mode 100644 index 0000000000..8f8794bb17 --- /dev/null +++ b/docs/instrumentation/confluent_kafka/confluent_kafka.rst @@ -0,0 +1,7 @@ +.. include:: ../../../instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst + +.. automodule:: opentelemetry.instrumentation.confluent_kafka + :members: + :undoc-members: + :show-inheritance: + :noindex: diff --git a/docs/nitpick-exceptions.ini b/docs/nitpick-exceptions.ini index fa8de32519..2b0910ad30 100644 --- a/docs/nitpick-exceptions.ini +++ b/docs/nitpick-exceptions.ini @@ -1,5 +1,5 @@ [default] -class_references= +py-class= ; TODO: Understand why sphinx is not able to find this local class opentelemetry.propagators.textmap.CarrierT opentelemetry.propagators.textmap.Setter @@ -11,6 +11,8 @@ class_references= opentelemetry.propagators.textmap.Getter ; - AWSXRayPropagator opentelemetry.sdk.trace.id_generator.IdGenerator + opentelemetry.instrumentation.confluent_kafka.ProxiedProducer + opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer ; - AwsXRayIdGenerator TextMapPropagator CarrierT @@ -26,8 +28,16 @@ class_references= httpx.AsyncByteStream httpx.Response yarl.URL + cimpl.Producer + cimpl.Consumer + func + Message + TopicPartition + callable + Consumer + confluent_kafka.Message -anys= +any= ; API opentelemetry.propagators.textmap.TextMapPropagator.fields ; - AWSXRayPropagator @@ -44,3 +54,12 @@ anys= ; - instrumentation.* Setter httpx +; +py-func= + poll + flush + Message.error + +py-exc= + KafkaException + KafkaError diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index e4912313fb..12cb363219 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -13,12 +13,12 @@ # limitations under the License. """ -Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages +Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages Usage ----- -..code:: python +.. code-block:: python from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor from confluent_kafka import Producer, Consumer @@ -30,12 +30,10 @@ conf1 = {'bootstrap.servers': "localhost:9092"} producer = Producer(conf1) producer.produce('my-topic',b'raw_bytes') - - conf2 = {'bootstrap.servers': "localhost:9092", - 'group.id': "foo", - 'auto.offset.reset': 'smallest'} + conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'} # report a span of type consumer with the default settings consumer = Consumer(conf2) + def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) @@ -43,11 +41,10 @@ def basic_consume_loop(consumer, topics): while running: msg = consumer.poll(timeout=1.0) if msg is None: continue - if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event - sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n") + sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}") elif msg.error(): raise KafkaException(msg.error()) else: @@ -57,19 +54,26 @@ def basic_consume_loop(consumer, topics): consumer.close() basic_consume_loop(consumer, "my-topic") + --- + +The _instrument method accepts the following keyword args: + tracer_provider (TracerProvider) - an optional tracer provider + + instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message + this function signature is: + + def instrument_producer(producer: Producer, tracer_provider=None) + instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message + this function signature is: + + def instrument_consumer(consumer: Consumer, tracer_provider=None) + for example: + +.. code:: python -The `_instrument` method accepts the following keyword args: -tracer_provider (TracerProvider) - an optional tracer provider -instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message - this function signature is: - def instrument_producer(producer: Producer, tracer_provider=None) -instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message - this function signature is: - def instrument_consumer(consumer: Consumer, tracer_provider=None) -for example: -.. code: python from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor + from confluent_kafka import Producer, Consumer inst = ConfluentKafkaInstrumentor() @@ -85,15 +89,12 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) p = inst.instrument_producer(p, tracer_provider) c = inst.instrument_consumer(c, tracer_provider=tracer_provider) - # Using kafka as normal now will automatically generate spans, # including user custom attributes added from the hooks conf = {'bootstrap.servers': "localhost:9092"} p.produce('my-topic',b'raw_bytes') msg = c.poll() - -API ___ """ from typing import Collection