/
test_produce.py
85 lines (70 loc) · 2.89 KB
/
test_produce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import json
import time
from typing import Callable, List
import confluent_kafka
import pytest
from _pytest.tmpdir import TempPathFactory
from click.testing import CliRunner
from confluent_kafka.cimpl import Consumer, TopicPartition
from pytest_cases import fixture
from esque.cli.commands import esque
from esque.config import Config
from esque.errors import NoConfirmationPossibleException
@fixture
def consumer_factory(unittest_config: Config) -> Callable[[str], Consumer]:
consumers: List[Consumer] = []
def consumer_factory_(topic: str) -> Consumer:
consumer = Consumer(
{
"group.id": "asdf",
"enable.auto.commit": False,
"enable.partition.eof": False,
**unittest_config.create_confluent_config(),
}
)
partitions = consumer.list_topics(topic=topic).topics[topic].partitions
consumer.assign([TopicPartition(topic=topic, partition=p, offset=0) for p in partitions])
consumers.append(consumer)
return consumer
yield consumer_factory_
for consumer in consumers:
consumer.close()
@pytest.mark.integration
def test_produce_can_create_topic(
consumer_factory: Callable[[str], Consumer],
non_interactive_cli_runner: CliRunner,
topic_id: str,
tmpdir_factory: TempPathFactory,
):
data = json.dumps(dict(key="key1", value="value1")) + "\n"
result = non_interactive_cli_runner.invoke(
esque, args=["produce", "--no-verify", "--stdin", topic_id], input=data, catch_exceptions=False
)
assert result.exit_code == 0
consumer = consumer_factory(topic_id)
expected_messages = [(b"key1", b"value1")]
actual_messages = []
start = time.monotonic()
while len(actual_messages) < 1:
msg = consumer.poll(timeout=2)
if msg is not None:
actual_messages.append((msg.key(), msg.value()))
elif time.monotonic() - start >= 20:
raise TimeoutError("Timeout reading data from topic")
assert expected_messages == actual_messages
@pytest.mark.integration
def test_binary_and_avro_fails(non_interactive_cli_runner: CliRunner):
with pytest.raises(ValueError):
non_interactive_cli_runner.invoke(
esque, args=["produce", "--binary", "--avro", "thetopic"], catch_exceptions=False
)
@pytest.mark.integration
def test_produce_to_non_existent_topic_fails(
confluent_admin_client: confluent_kafka.admin.AdminClient, non_interactive_cli_runner: CliRunner, topic_id: str
):
target_topic_id = topic_id
data = "".join([json.dumps(dict(key='"key1"', value='"value1"')) + "\n"])
result = non_interactive_cli_runner.invoke(esque, args=["produce", "--stdin", target_topic_id], input=data)
assert isinstance(result.exception, NoConfirmationPossibleException)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert target_topic_id not in topics