Skip to content

Commit

Permalink
remove num_partitions from KafkaTestMessage.random_values
Browse files Browse the repository at this point in the history
  • Loading branch information
Swen committed Oct 13, 2021
1 parent b5f9501 commit dd0101c
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 41 deletions.
6 changes: 3 additions & 3 deletions tests/integration/commands/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_avro_consume_to_stdout(
avro_producer: AvroProducer, source_topic: Tuple[str, int], non_interactive_cli_runner: CliRunner
):
source_topic_id, _ = source_topic
expected_messages = produce_avro_test_messages(avro_producer, source_topic, amount=10)
expected_messages = produce_avro_test_messages(avro_producer, topic_name=source_topic_id, amount=10)

message_text = non_interactive_cli_runner.invoke(
esque,
Expand All @@ -43,7 +43,7 @@ def test_offset_not_committed(
consumergroup_controller: ConsumerGroupController,
):
source_topic_id, _ = source_topic
produce_avro_test_messages(avro_producer, source_topic)
produce_avro_test_messages(avro_producer, topic_name=source_topic_id)

non_interactive_cli_runner.invoke(
esque, args=["consume", "--stdout", "--numbers", "10", "--avro", source_topic_id], catch_exceptions=False
Expand All @@ -63,7 +63,7 @@ def test_binary_consume_to_stdout(
producer: ConfluentProducer, source_topic: Tuple[str, int], non_interactive_cli_runner: CliRunner
):
source_topic_id, _ = source_topic
expected_messages = produce_binary_test_messages(producer, source_topic)
expected_messages = produce_binary_test_messages(producer, topic_name=source_topic_id)

message_text = non_interactive_cli_runner.invoke(
esque,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/commands/test_describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_describe_topic_no_flag(non_interactive_cli_runner: CliRunner, topic: st
def test_describe_topic_last_timestamp_does_not_commit(
non_interactive_cli_runner: CliRunner, topic: str, consumergroup_controller: ConsumerGroupController, producer
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)
result = non_interactive_cli_runner.invoke(
esque, args=["describe", "topic", topic, "--last-timestamp"], catch_exceptions=False
)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/commands/test_edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_edit_offsets(
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down
10 changes: 5 additions & 5 deletions tests/integration/commands/test_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_set_offsets_offset_to_absolute_value(
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down Expand Up @@ -46,7 +46,7 @@ def test_set_offsets_offset_to_delta(
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down Expand Up @@ -76,7 +76,7 @@ def test_set_offsets_offset_to_delta_all_topics(
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down Expand Up @@ -104,7 +104,7 @@ def test_set_offsets_offset_from_group(
target_consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down Expand Up @@ -147,7 +147,7 @@ def test_set_offsets_offset_to_timestamp_value(
consumer_group: str,
consumergroup_controller: ConsumerGroupController,
):
messages = produce_text_test_messages(producer=producer, topic=(topic, 1), amount=10)
messages = produce_text_test_messages(producer=producer, topic_name=topic, amount=10)

consumergroup_controller.commit_offsets(consumer_group, [TopicPartition(topic=topic, partition=0, offset=10)])

Expand Down
20 changes: 10 additions & 10 deletions tests/integration/commands/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_transfer_plain_text_message_using_cli_pipe(
target_topic: Tuple[str, int],
non_interactive_cli_runner: CliRunner,
):
expected_messages = produce_text_test_messages(topic=source_topic, producer=producer)
expected_messages = produce_text_test_messages(topic_name=source_topic[0], producer=producer)

result1 = non_interactive_cli_runner.invoke(
esque, args=["consume", "--stdout", "--number", "10", source_topic[0]], catch_exceptions=False
Expand All @@ -82,7 +82,7 @@ def test_transfer_plain_text_message_with_headers_using_cli_pipe(
target_topic: Tuple[str, int],
non_interactive_cli_runner: CliRunner,
):
expected_messages = produce_text_test_messages_with_headers(topic=source_topic, producer=producer)
expected_messages = produce_text_test_messages_with_headers(topic_name=source_topic[0], producer=producer)

result1 = non_interactive_cli_runner.invoke(
esque, args=["consume", "--stdout", "--number", "10", source_topic[0]], catch_exceptions=False
Expand All @@ -107,7 +107,7 @@ def test_transfer_binary_message_using_cli_pipe(
target_topic: Tuple[str, int],
non_interactive_cli_runner,
):
expected_messages = produce_binary_test_messages(topic=source_topic, producer=producer)
expected_messages = produce_binary_test_messages(topic_name=source_topic[0], producer=producer)

result1 = non_interactive_cli_runner.invoke(
esque, args=["consume", "--stdout", "--binary", "--number", "10", source_topic[0]], catch_exceptions=False
Expand All @@ -133,7 +133,7 @@ def test_transfer_plain_text_message_using_file(
tmpdir_factory,
):
output_directory = tmpdir_factory.mktemp("output_directory")
expected_messages = produce_text_test_messages(topic=source_topic, producer=producer)
expected_messages = produce_text_test_messages(topic_name=source_topic[0], producer=producer)

non_interactive_cli_runner.invoke(
esque, args=["consume", "-d", str(output_directory), "--number", "10", source_topic[0]], catch_exceptions=False
Expand All @@ -160,7 +160,7 @@ def test_transfer_plain_text_message_with_headers_using_file(
tmpdir_factory,
):
output_directory = tmpdir_factory.mktemp("output_directory")
expected_messages = produce_text_test_messages_with_headers(topic=source_topic, producer=producer)
expected_messages = produce_text_test_messages_with_headers(topic_name=source_topic[0], producer=producer)

non_interactive_cli_runner.invoke(
esque, args=["consume", "-d", str(output_directory), "--number", "10", source_topic[0]], catch_exceptions=False
Expand All @@ -187,7 +187,7 @@ def test_transfer_binary_message_using_file(
tmpdir_factory,
):
output_directory = tmpdir_factory.mktemp("output_directory")
expected_messages = produce_binary_test_messages(topic=source_topic, producer=producer)
expected_messages = produce_binary_test_messages(topic_name=source_topic[0], producer=producer)

non_interactive_cli_runner.invoke(
esque,
Expand Down Expand Up @@ -215,7 +215,7 @@ def test_transfer_avro_message_using_file(
tmpdir_factory,
):
output_directory = tmpdir_factory.mktemp("output_directory")
expected_messages = produce_avro_test_messages(topic=source_topic, avro_producer=avro_producer)
expected_messages = produce_avro_test_messages(topic_name=source_topic[0], avro_producer=avro_producer)

non_interactive_cli_runner.invoke(
esque,
Expand Down Expand Up @@ -247,7 +247,7 @@ def test_transfer_avro_with_single_command(
target_topic: Tuple[str, int],
non_interactive_cli_runner: CliRunner,
):
expected_messages = produce_avro_test_messages(topic=source_topic, avro_producer=avro_producer)
expected_messages = produce_avro_test_messages(topic_name=source_topic[0], avro_producer=avro_producer)
non_interactive_cli_runner.invoke(
esque,
args=[
Expand Down Expand Up @@ -285,7 +285,7 @@ def test_transfer_binary_with_single_command(
target_topic: Tuple[str, int],
non_interactive_cli_runner: CliRunner,
):
expected_messages = produce_binary_test_messages(topic=source_topic, producer=producer)
expected_messages = produce_binary_test_messages(topic_name=source_topic[0], producer=producer)

non_interactive_cli_runner.invoke(
esque,
Expand Down Expand Up @@ -318,7 +318,7 @@ def test_transfer_plain_with_single_command(
target_topic: Tuple[str, int],
non_interactive_cli_runner: CliRunner,
):
expected_messages = produce_text_test_messages_with_headers(topic=source_topic, producer=producer)
expected_messages = produce_text_test_messages_with_headers(topic_name=source_topic[0], producer=producer)

non_interactive_cli_runner.invoke(
esque,
Expand Down
33 changes: 12 additions & 21 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,49 +137,40 @@ def random_bytes(length: int = 16) -> bytes:


def produce_text_test_messages(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
producer: ConfluentProducer, topic_name: str, amount: int = 10
) -> List["KafkaTestMessage"]:
topic_name, _ = topic
messages = KafkaTestMessage.random_values(topic_name=topic_name, n=amount, generate_headers=False)
produce_all(producer, messages)
return messages


def produce_all(producer: ConfluentProducer, messages: List[KafkaTestMessage]) -> None:
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
return messages


def produce_text_test_messages_with_headers(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
producer: ConfluentProducer, topic_name: str, amount: int = 10
) -> List["KafkaTestMessage"]:
topic_name, _ = topic
messages = KafkaTestMessage.random_values(topic_name=topic_name, n=amount, generate_headers=True)
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
produce_all(producer, messages)
return messages


def produce_avro_test_messages(
avro_producer: AvroProducer, topic: Tuple[str, int], amount: int = 10
avro_producer: AvroProducer, topic_name: str, amount: int = 10
) -> List[AvroKafkaTestMessage]:

topic_name, _ = topic
messages: List[AvroKafkaTestMessage] = AvroKafkaTestMessage.random_values(topic_name, n=amount)

for msg in messages:
avro_producer.produce(**msg.producer_args())

avro_producer.flush()
produce_all(avro_producer, messages)
return messages


def produce_binary_test_messages(
producer: ConfluentProducer, topic: Tuple[str, int], amount: int = 10
producer: ConfluentProducer, topic_name: str, amount: int = 10
) -> List[BinaryKafkaTestMessage]:
topic_name, _ = topic
messages: List[BinaryKafkaTestMessage] = BinaryKafkaTestMessage.random_values(
topic_name=topic_name, n=amount, generate_headers=False
)
for msg in messages:
producer.produce(**msg.producer_args())
producer.flush()
produce_all(producer, messages)
return messages

0 comments on commit dd0101c

Please sign in to comment.