Skip to content

Commit

Permalink
some more test refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Swen committed Oct 13, 2021
1 parent dbc1f02 commit 60eba74
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 241 deletions.
30 changes: 0 additions & 30 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from esque.helpers import log_error
from esque.resources.broker import Broker
from esque.resources.topic import Topic
from tests.integration.commands.conftest import KafkaTestMessage

# constants that indicate which config version to load
# see function get_path_for_config_version
Expand Down Expand Up @@ -225,35 +224,6 @@ def topic_controller(cluster: Cluster):
yield cluster.topic_controller


@fixture()
def messages_ordered_same_partition() -> Iterable[KafkaTestMessage]:
ordered_messages = [
KafkaTestMessage(key="j", value="v01", partition=0, timestamp=10_000),
KafkaTestMessage(key="i", value="v02", partition=0, timestamp=20_000),
KafkaTestMessage(key="h", value="v03", partition=0, timestamp=30_000),
KafkaTestMessage(key="g", value="v04", partition=0, timestamp=40_000),
KafkaTestMessage(key="f", value="v05", partition=0, timestamp=50_000),
KafkaTestMessage(key="e", value="v06", partition=0, timestamp=60_000),
KafkaTestMessage(key="d", value="v07", partition=0, timestamp=70_000),
KafkaTestMessage(key="c", value="v08", partition=0, timestamp=80_000),
KafkaTestMessage(key="b", value="v09", partition=0, timestamp=90_000),
KafkaTestMessage(key="a", value="v10", partition=0, timestamp=100_000),
]
yield ordered_messages


@fixture()
def produced_messages_same_partition(messages_ordered_same_partition: Iterable[KafkaTestMessage]):
def _produce(topic_name: str, producer: ConfluentProducer, sleep_time: float = 0.5):
for message in messages_ordered_same_partition:
message.topic = topic_name
producer.produce(**message.producer_args())
time.sleep(sleep_time)
producer.flush()

return _produce


@fixture()
def confluent_admin_client(unittest_config) -> AdminClient:
admin = AdminClient({"topic.metadata.refresh.interval.ms": "250", **unittest_config.create_confluent_config()})
Expand Down
186 changes: 0 additions & 186 deletions tests/integration/commands/conftest.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
import dataclasses
import json
import random
import string
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar
from unittest import mock

import avro
from click.testing import CliRunner
from confluent_kafka import cimpl
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro import loads as load_schema
from confluent_kafka.cimpl import Producer as ConfluentProducer
from pytest_cases import fixture

from esque.config import Config
Expand All @@ -28,179 +18,3 @@ def non_interactive_cli_runner(mocker: mock, unittest_config: Config) -> CliRunn
mocker.patch("esque.cli.helpers._isatty", return_value=False)
mocker.patch("esque.cli.environment.ESQUE_VERBOSE", new_callable=mock.PropertyMock, return_value="1")
return CliRunner()


T = TypeVar("T")


@dataclasses.dataclass
class KafkaTestMessage:
key: Any = ""
value: Any = ""
binary_key: bytes = b""
binary_value: bytes = b""
partition: int = -1
offset: int = -1
timestamp: int = 0
topic: str = ""
headers: List[Tuple[str, bytes]] = dataclasses.field(default_factory=list)

def update_callback(self, err: Optional[cimpl.KafkaError], msg: cimpl.Message):
assert err is None, f"Received KafkaError {err}."
self.binary_value = msg.value()
self.binary_key = msg.key()
self.partition = msg.partition()
self.offset = msg.offset()
self.timestamp = msg.timestamp()[1]

def producer_args(self) -> Dict:
args = {"key": self.key, "value": self.value, "topic": self.topic, "on_delivery": self.update_callback}
if self.partition >= 0:
args["partition"] = self.partition
if self.timestamp > 0:
args["timestamp"] = self.timestamp
if self.headers:
args["headers"] = self.headers
return args

@classmethod
def random_values(
cls: Type[T], topic_name: str, num_partitions: int, n: int = 10, generate_headers: bool = False
) -> List[T]:
msgs: List[T] = []
for _ in range(n):
key = cls.random_key()
value = cls.random_value()
partition = random.randrange(0, num_partitions)
if generate_headers:
headers = cls.random_headers()
else:
headers = []
msgs.append(cls(key=key, value=value, topic=topic_name, partition=partition, headers=headers))
return msgs

@staticmethod
def random_key() -> Any:
return random_str()

@staticmethod
def random_value() -> Any:
return random_str()

@staticmethod
def random_headers() -> List[Tuple[str, Optional[bytes]]]:
header_count = random.randrange(0, 5)
headers: List[Tuple[str, Optional[bytes]]] = []
for _ in range(header_count):
header_key = random_str()
if random.random() < 0.1: # 10% chance to be None
header_value = None
else:
header_value = random_str().encode("utf-8")
headers.append((header_key, header_value))
return headers


def random_str(length: int = 16) -> str:
return "".join(random.choices(string.ascii_lowercase, k=length))


def produce_text_test_messages(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
) -> List["KafkaTestMessage"]:
topic_name, num_partitions = topic
messages = KafkaTestMessage.random_values(
topic_name=topic_name, num_partitions=num_partitions, n=amount, generate_headers=False
)
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
return messages


def produce_text_test_messages_with_headers(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
) -> List["KafkaTestMessage"]:
topic_name, num_partitions = topic
messages = KafkaTestMessage.random_values(
topic_name=topic_name, num_partitions=num_partitions, n=amount, generate_headers=True
)
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
return messages


def mk_avro_schema(field_name: str, field_type: str) -> avro.schema.Schema:
return load_schema(
json.dumps(
{
"type": "record",
"namespace": "com.example",
"name": f"MySchema_{field_name}",
"fields": [{"name": field_name, "type": field_type}],
}
)
)


@dataclasses.dataclass
class AvroKafkaTestMessage(KafkaTestMessage):
KEY_SCHEMA: ClassVar[avro.schema.Schema] = mk_avro_schema("key", "string")
VALUE_SCHEMA: ClassVar[avro.schema.Schema] = mk_avro_schema("value", "string")

def producer_args(self) -> Dict:
args = super().producer_args()
args["key_schema"] = self.KEY_SCHEMA
args["value_schema"] = self.VALUE_SCHEMA
return args

@staticmethod
def random_key() -> Any:
return {"key": random_str()}

@staticmethod
def random_value() -> Any:
return {"value": random_str()}


def produce_avro_test_messages(
avro_producer: AvroProducer, topic: Tuple[str, int], amount: int = 10
) -> List[AvroKafkaTestMessage]:

topic_name, num_partitions = topic
messages: List[AvroKafkaTestMessage] = AvroKafkaTestMessage.random_values(topic_name, num_partitions, n=amount)

for msg in messages:
avro_producer.produce(**msg.producer_args())

avro_producer.flush()
return messages


@dataclasses.dataclass
class BinaryKafkaTestMessage(KafkaTestMessage):
@staticmethod
def random_key() -> Any:
return random_bytes()

@staticmethod
def random_value() -> Any:
return random_bytes()


def random_bytes(length: int = 16) -> bytes:
return random.getrandbits(length * 8).to_bytes(length, "big")


def produce_binary_test_messages(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
) -> List[BinaryKafkaTestMessage]:
topic_name, num_partitions = topic
messages: List[BinaryKafkaTestMessage] = BinaryKafkaTestMessage.random_values(
topic_name=topic_name, num_partitions=num_partitions, n=amount, generate_headers=False
)
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
return messages
2 changes: 1 addition & 1 deletion tests/integration/commands/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from esque.cli.commands import esque
from esque.controller.consumergroup_controller import ConsumerGroupController
from esque.errors import ConsumerGroupDoesNotExistException
from tests.integration.commands.conftest import produce_avro_test_messages, produce_binary_test_messages
from tests.utils import produce_avro_test_messages, produce_binary_test_messages


@pytest.mark.integration
Expand Down
9 changes: 3 additions & 6 deletions tests/integration/commands/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from esque.errors import ConsumerGroupDoesNotExistException
from esque.resources.topic import Topic
from tests.conftest import parameterized_output_formats
from tests.utils import produce_text_test_messages

VARIOUS_IMPORTANT_BROKER_OPTIONS = [
"advertised.host.name",
Expand All @@ -33,13 +34,9 @@ def test_describe_topic_no_flag(non_interactive_cli_runner: CliRunner, topic: st

@pytest.mark.integration
def test_describe_topic_last_timestamp_does_not_commit(
non_interactive_cli_runner: CliRunner,
topic: str,
consumergroup_controller: ConsumerGroupController,
produced_messages_same_partition,
producer,
non_interactive_cli_runner: CliRunner, topic: str, consumergroup_controller: ConsumerGroupController, producer
):
produced_messages_same_partition(topic_name=topic, producer=producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
result = non_interactive_cli_runner.invoke(
esque, args=["describe", "topic", topic, "--last-timestamp"], catch_exceptions=False
)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/commands/test_edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from esque.controller.consumergroup_controller import ConsumerGroupController
from esque.controller.topic_controller import TopicController
from esque.errors import EditCanceled
from tests.utils import produce_text_test_messages


@pytest.mark.integration
Expand Down Expand Up @@ -101,12 +102,11 @@ def test_edit_offsets(
monkeypatch: MonkeyPatch,
interactive_cli_runner,
topic: str,
produced_messages_same_partition,
producer: ConfluenceProducer,
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produced_messages_same_partition(topic, producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down
22 changes: 7 additions & 15 deletions tests/integration/commands/test_set.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
from typing import List

import pendulum
import pytest
from confluent_kafka.cimpl import Producer as ConfluenceProducer
from confluent_kafka.cimpl import TopicPartition

from esque.cli.commands import esque
from esque.controller.consumergroup_controller import ConsumerGroupController
from tests.integration.commands.conftest import KafkaTestMessage
from tests.utils import produce_text_test_messages


@pytest.mark.integration
def test_set_offsets_offset_to_absolute_value(
topic: str,
produced_messages_same_partition,
interactive_cli_runner,
producer: ConfluenceProducer,
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produced_messages_same_partition(topic, producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand All @@ -44,13 +41,12 @@ def test_set_offsets_offset_to_absolute_value(
@pytest.mark.integration
def test_set_offsets_offset_to_delta(
topic: str,
produced_messages_same_partition,
interactive_cli_runner,
producer: ConfluenceProducer,
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produced_messages_same_partition(topic, producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand All @@ -75,13 +71,12 @@ def test_set_offsets_offset_to_delta(
@pytest.mark.integration
def test_set_offsets_offset_to_delta_all_topics(
topic: str,
produced_messages_same_partition,
interactive_cli_runner,
producer: ConfluenceProducer,
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produced_messages_same_partition(topic, producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand All @@ -103,14 +98,13 @@ def test_set_offsets_offset_to_delta_all_topics(
@pytest.mark.integration
def test_set_offsets_offset_from_group(
topic: str,
produced_messages_same_partition,
interactive_cli_runner,
producer: ConfluenceProducer,
consumer_group: str,
target_consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produced_messages_same_partition(topic, producer)
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down Expand Up @@ -148,22 +142,20 @@ def test_set_offsets_offset_from_group(
@pytest.mark.integration
def test_set_offsets_offset_to_timestamp_value(
topic: str,
produced_messages_same_partition,
interactive_cli_runner,
producer: ConfluenceProducer,
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
messages_ordered_same_partition: List[KafkaTestMessage],
):
produced_messages_same_partition(topic, producer, 1.5)
messages = produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

consumergroup_desc_before = consumergroup_controller.get_consumer_group(consumer_id=consumer_group).describe(
partitions=True
)

fifth_message = messages_ordered_same_partition[4]
fifth_message = messages[4]
timestamp = fifth_message.timestamp
dt = pendulum.from_timestamp(round(timestamp / 1000) - 1)

Expand Down

0 comments on commit 60eba74

Please sign in to comment.