Skip to content

Commit

Permalink
Kafka Expanded Version Testing (#629)
Browse files Browse the repository at this point in the history
* Expand kafka testing range to older versions

* Remove confluent-kafka 1.5

* Remove confluent-kafka 1.5

* Fix flakey confluent-kafka tests

* Fixup: fix flakey tests

* Fixup: fix kafka-python flakey tests

* Fixup: fix kafka-python flakey tests

* Remove confluent-kafka 1.8 tests

The following is an unresolved issue occuring in the setup of confluent-kafka 1.8.2:
asweigart/PyGetWindow#9

Co-authored-by: Hannah Stepanek <hstepanek@newrelic.com>
  • Loading branch information
2 people authored and lrafeei committed Sep 27, 2022
1 parent 8beb0cc commit 684ca89
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 46 deletions.
6 changes: 4 additions & 2 deletions tests/messagebroker_confluentkafka/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def skip_if_not_serializing(client_type):


@pytest.fixture(scope="function")
def producer(client_type, json_serializer):
def producer(topic, client_type, json_serializer):
from confluent_kafka import Producer, SerializingProducer

if client_type == "cimpl":
Expand All @@ -86,7 +86,9 @@ def producer(client_type, json_serializer):
)

yield producer
producer.purge()

if hasattr(producer, "purge"):
producer.purge()


@pytest.fixture(scope="function")
Expand Down
10 changes: 9 additions & 1 deletion tests/messagebroker_confluentkafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,15 @@ def _consume():
@cache_kafka_consumer_headers()
def _test():
# Start the transaction but don't exit it.
consumer.poll(0.5)
# Keep polling until we get the record or the timeout is exceeded.
timeout = 10
attempts = 0
record = None
while not record and attempts < timeout:
record = consumer.poll(0.5)
if not record:
attempts += 1
continue

_test()

Expand Down
18 changes: 14 additions & 4 deletions tests/messagebroker_confluentkafka/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, pro
@background_task()
def test():
with pytest.raises(error_cls):
record = consumer.poll(0.5)
assert record is not None, "No record consumed."
timeout = 10
attempts = 0
while attempts < timeout:
if not consumer.poll(0.5):
attempts += 1
continue

test()

Expand All @@ -128,14 +132,20 @@ def _test():
send_producer_message()

record_count = 0
while True:

timeout = 10
attempts = 0
record = None
while not record and attempts < timeout:
record = consumer.poll(0.5)
if not record:
break
attempts += 1
continue
assert not record.error()

assert record.value() == {"foo": 1}
record_count += 1
consumer.poll(0.5) # Exit the transaction.

assert record_count == 1, "Incorrect count of records consumed: %d. Expected 1." % record_count

Expand Down
17 changes: 9 additions & 8 deletions tests/messagebroker_kafkapython/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ def consumer(topic, producer, client_type, json_deserializer, json_callable_dese
group_id="test",
)

# The first time the kafka consumer is created and polled, it returns a StopIterator
# exception. To by-pass this, loop over the consumer before using it.
# NOTE: This seems to only happen in Python2.7.
for record in consumer:
pass
yield consumer
consumer.close()

Expand Down Expand Up @@ -230,9 +225,15 @@ def _test():
send_producer_message()

record_count = 0
for record in consumer:
assert deserialize(record.value) == {"foo": 1}
record_count += 1

timeout = 10
attempts = 0
record = None
while not record and attempts < timeout:
for record in consumer:
assert deserialize(record.value) == {"foo": 1}
record_count += 1
attempts += 1

assert record_count == 1, "Incorrect count of records consumed: %d. Expected 1." % record_count

Expand Down
24 changes: 14 additions & 10 deletions tests/messagebroker_kafkapython/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,23 @@

import pytest
from conftest import cache_kafka_consumer_headers

from newrelic.common.object_names import callable_name

from testing_support.fixtures import (
reset_core_stats_engine,
validate_attributes,
validate_error_event_attributes_outside_transaction,
validate_transaction_errors,
validate_transaction_metrics,
reset_core_stats_engine,
)
from testing_support.validators.validate_transaction_count import (
validate_transaction_count,
)
from testing_support.validators.validate_distributed_trace_accepted import (
validate_distributed_trace_accepted,
)
from testing_support.validators.validate_transaction_count import (
validate_transaction_count,
)

from newrelic.api.background_task import background_task
from newrelic.api.transaction import end_of_transaction
from newrelic.common.object_names import callable_name
from newrelic.packages import six


Expand Down Expand Up @@ -117,8 +115,7 @@ def test_consumer_errors(get_consumer_record, consumer_next_raises):

@reset_core_stats_engine()
@validate_error_event_attributes_outside_transaction(
num_errors=1,
exact_attrs={"intrinsic": {"error.class": callable_name(exc_class)}, "agent": {}, "user": {}}
num_errors=1, exact_attrs={"intrinsic": {"error.class": callable_name(exc_class)}, "agent": {}, "user": {}}
)
def _test():
with pytest.raises(exc_class):
Expand Down Expand Up @@ -160,7 +157,14 @@ def _consume():
@cache_kafka_consumer_headers
def _test():
# Start the transaction but don't exit it.
next(consumer_iter)
timeout = 10
attempts = 0
record = None
while not record and attempts < timeout:
try:
record = next(consumer_iter)
except StopIteration:
attempts += 1

_test()

Expand Down
47 changes: 28 additions & 19 deletions tests/messagebroker_kafkapython/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json

import pytest
from testing_support.fixtures import (
reset_core_stats_engine,
validate_error_event_attributes_outside_transaction,
validate_transaction_errors,
validate_transaction_metrics,
validate_error_event_attributes_outside_transaction,
reset_core_stats_engine,
)

from newrelic.api.background_task import background_task
from newrelic.packages import six

from newrelic.common.object_names import callable_name
from newrelic.packages import six

import json

def test_serialization_metrics(skip_if_not_serializing, topic, send_producer_message):
txn_name = "test_serialization:test_serialization_metrics.<locals>.test" if six.PY3 else "test_serialization:test"
Expand All @@ -48,10 +48,13 @@ def test():
test()


@pytest.mark.parametrize("key,value", (
(object(), "A"),
("A", object()),
))
@pytest.mark.parametrize(
"key,value",
(
(object(), "A"),
("A", object()),
),
)
def test_serialization_errors(skip_if_not_serializing, topic, producer, key, value):
error_cls = TypeError

Expand All @@ -64,13 +67,16 @@ def test():
test()


@pytest.mark.parametrize("key,value", (
(b"%", b"{}"),
(b"{}", b"%"),
))
@pytest.mark.parametrize(
"key,value",
(
(b"%", b"{}"),
(b"{}", b"%"),
),
)
def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, producer, consumer, key, value):
error_cls = json.decoder.JSONDecodeError if six.PY3 else ValueError

# Remove serializers to cause intentional issues
monkeypatch.setitem(producer.config, "value_serializer", None)
monkeypatch.setitem(producer.config, "key_serializer", None)
Expand All @@ -80,13 +86,16 @@ def test_deserialization_errors(skip_if_not_serializing, monkeypatch, topic, pro

@reset_core_stats_engine()
@validate_error_event_attributes_outside_transaction(
num_errors=1,
exact_attrs={"intrinsic": {"error.class": callable_name(error_cls)}, "agent": {}, "user": {}}
num_errors=1, exact_attrs={"intrinsic": {"error.class": callable_name(error_cls)}, "agent": {}, "user": {}}
)
def test():
with pytest.raises(error_cls):
for record in consumer:
pass
assert record is not None, "No record consumed."
timeout = 10
attempts = 0
record = None
while not record and attempts < timeout:
for record in consumer:
pass
attempts += 1

test()
11 changes: 9 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ envlist =
rabbitmq-messagebroker_pika-{py27,py37,py38,py39,pypy,pypy37}-pika0.13,
rabbitmq-messagebroker_pika-{py37,py38,py39,py310,pypy37}-pikalatest,
kafka-messagebroker_confluentkafka-{py27,py37,py38,py39,py310}-confluentkafkalatest,
kafka-messagebroker_confluentkafka-{py27,py39}-confluentkafka{0107,0106},
kafka-messagebroker_kafkapython-{pypy,py27,py37,py38,pypy37}-kafkapythonlatest,
kafka-messagebroker_kafkapython-{py27,py38}-kafkapython{020001,020000,0104},
python-template_mako-{py27,py37,py38,py39,py310}

[pytest]
Expand Down Expand Up @@ -355,8 +357,13 @@ deps =
messagebroker_pika-pikalatest: pika
messagebroker_pika: tornado<5
messagebroker_pika-{py27,pypy}: enum34
messagebroker_confluentkafka: confluent-kafka
messagebroker_kafkapython: kafka-python
messagebroker_confluentkafka-confluentkafkalatest: confluent-kafka
messagebroker_confluentkafka-confluentkafka0107: confluent-kafka<1.8
messagebroker_confluentkafka-confluentkafka0106: confluent-kafka<1.7
messagebroker_kafkapython-kafkapythonlatest: kafka-python
messagebroker_kafkapython-kafkapython020001: kafka-python<2.0.2
messagebroker_kafkapython-kafkapython020000: kafka-python<2.0.1
messagebroker_kafkapython-kafkapython0104: kafka-python<1.5
template_mako: mako<1.2

setenv =
Expand Down

0 comments on commit 684ca89

Please sign in to comment.