From 97579579c6834ccfae3d869372dba71ef13eb562 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 18 Sep 2019 13:19:23 +0200 Subject: [PATCH 01/10] Moved cli integration tests out of topic controller --- tests/conftest.py | 10 +- tests/integration/commands/__init__.py | 0 tests/integration/commands/test_apply.py | 119 +++++++++++++++++++ tests/integration/test_topic_controller.py | 129 --------------------- 4 files changed, 127 insertions(+), 131 deletions(-) create mode 100644 tests/integration/commands/__init__.py create mode 100644 tests/integration/commands/test_apply.py diff --git a/tests/conftest.py b/tests/conftest.py index 2e7802b9..cfc0c98c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,13 +2,14 @@ from concurrent.futures import Future from pathlib import Path from string import ascii_letters -from typing import Iterable, Tuple, Callable +from typing import Callable, Iterable, Tuple import confluent_kafka import pytest +from click.testing import CliRunner from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.avro import AvroProducer -from confluent_kafka.cimpl import TopicPartition, Producer +from confluent_kafka.cimpl import Producer, TopicPartition from pykafka.exceptions import NoBrokersAvailableError from esque.cluster import Cluster @@ -38,6 +39,11 @@ def pytest_collection_modifyitems(config, items): item.add_marker(integration) +@pytest.fixture() +def cli_runner(): + yield CliRunner() + + @pytest.fixture() def test_config_path(mocker, tmpdir_factory): fn: Path = tmpdir_factory.mktemp("config").join("dummy.cfg") diff --git a/tests/integration/commands/__init__.py b/tests/integration/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/commands/test_apply.py b/tests/integration/commands/test_apply.py new file mode 100644 index 00000000..61e40839 --- /dev/null +++ b/tests/integration/commands/test_apply.py @@ -0,0 +1,119 @@ +from typing import Any, Dict + +import pytest +import yaml +from click.testing import CliRunner + +from esque.cli.commands import apply +from esque.errors import KafkaException +from esque.topic import Topic +from esque.topic_controller import TopicController + + +@pytest.mark.integration +def test_apply(cli_runner, topic_controller: TopicController, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name + "_1", + "replication_factor": 1, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + topic_2 = { + "name": topic_name + "_2", + "replication_factor": 1, + "num_partitions": 5, + "config": {"cleanup.policy": "delete", "delete.retention.ms": 50000}, + } + apply_conf = {"topics": [topic_1]} + + # 1: topic creation + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 2: change cleanup policy to delete + topic_1["config"]["cleanup.policy"] = "delete" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 3: add another topic and change the first one again + apply_conf["topics"].append(topic_2) + topic_1["config"]["cleanup.policy"] = "compact" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 4: no changes + result = cli_runner.invoke(apply, ["-f", path]) + assert ( + result.exit_code == 0 and "No changes detected, aborting" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 5: change partitions - this attempt should be cancelled + topic_1["num_partitions"] = 3 + topic_1["config"]["cleanup.policy"] = "delete" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output + ), f"Calling apply failed, error: {result.output}" + # reset config to the old settings again + topic_1["num_partitions"] = 50 + topic_1["config"]["cleanup.policy"] = "compact" + + # final: check results in the cluster to make sure they match + for topic_conf in apply_conf["topics"]: + topic_from_conf = Topic.from_dict(topic_conf) + assert not topic_controller.diff_with_cluster( + topic_from_conf + ).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}" + + +@pytest.mark.integration +def test_apply_duplicate_names(cli_runner: CliRunner, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name, + "replication_factor": 1, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + apply_conf = {"topics": [topic_1, topic_1]} + + # having the same topic name twice in apply should raise an ValueError + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed" + + +@pytest.mark.integration +def test_apply_invalid_replicas(cli_runner: CliRunner, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name, + "replication_factor": 100, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + apply_conf = {"topics": [topic_1]} + + # having the same topic name twice in apply should raise an ValueError + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed" + + +def save_yaml(fname: str, data: Dict[str, Any]) -> str: + # this path name is in the gitignore so the temp files are not committed + path = f"tests/test_samples/{fname}_apply.yaml" + with open(path, "w") as outfile: + yaml.dump(data, outfile, default_flow_style=False) + return path diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 45e24cbd..5625ae93 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -1,12 +1,8 @@ import json -from typing import Dict, Any import confluent_kafka import pytest -import yaml -from click.testing import CliRunner -from esque.cli.commands import apply from esque.errors import KafkaException from esque.topic import Topic, TopicDiff from esque.topic_controller import TopicController @@ -59,19 +55,6 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s assert final_config.get("cleanup.policy") == "compact" -@pytest.mark.integration -def test_topic_deletion_works( - topic_controller: TopicController, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str -): - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - assert topic in topics - topic_controller.delete_topic(Topic(topic)) - # Invalidate cache - confluent_admin_client.poll(timeout=1) - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - assert topic not in topics - - @pytest.mark.integration def test_topic_listing_works(topic_controller: TopicController, topic: str): topics = topic_controller.list_topics() @@ -133,115 +116,3 @@ def test_topic_diff(topic_controller: TopicController, topic_id: str): topic = Topic.from_dict(conf) diff = TopicDiff().set_diff("replication_factor", 1, 3) assert get_diff(topic) == diff, "Should have a diff on replication_factor" - - -@pytest.mark.integration -def test_apply(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name + "_1", - "replication_factor": 1, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - topic_2 = { - "name": topic_name + "_2", - "replication_factor": 1, - "num_partitions": 5, - "config": {"cleanup.policy": "delete", "delete.retention.ms": 50000}, - } - apply_conf = {"topics": [topic_1]} - - # 1: topic creation - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 2: change cleanup policy to delete - topic_1["config"]["cleanup.policy"] = "delete" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 3: add another topic and change the first one again - apply_conf["topics"].append(topic_2) - topic_1["config"]["cleanup.policy"] = "compact" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 4: no changes - result = runner.invoke(apply, ["-f", path]) - assert ( - result.exit_code == 0 and "No changes detected, aborting" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 5: change partitions - this attempt should be cancelled - topic_1["num_partitions"] = 3 - topic_1["config"]["cleanup.policy"] = "delete" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output - ), f"Calling apply failed, error: {result.output}" - # reset config to the old settings again - topic_1["num_partitions"] = 50 - topic_1["config"]["cleanup.policy"] = "compact" - - # final: check results in the cluster to make sure they match - for topic_conf in apply_conf["topics"]: - topic_from_conf = Topic.from_dict(topic_conf) - assert not topic_controller.diff_with_cluster( - topic_from_conf - ).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}" - - -@pytest.mark.integration -def test_apply_duplicate_names(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name, - "replication_factor": 1, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - apply_conf = {"topics": [topic_1, topic_1]} - - # having the same topic name twice in apply should raise an ValueError - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed" - - -@pytest.mark.integration -def test_apply_invalid_replicas(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name, - "replication_factor": 100, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - apply_conf = {"topics": [topic_1]} - - # having the same topic name twice in apply should raise an ValueError - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed" - - -def save_yaml(fname: str, data: Dict[str, Any]) -> str: - # this path name is in the gitignore so the temp files are not committed - path = f"tests/test_samples/{fname}_apply.yaml" - with open(path, "w") as outfile: - yaml.dump(data, outfile, default_flow_style=False) - return path From ea455b22c00f36e0bbef12d136d9735599a4612b Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 18 Sep 2019 13:19:40 +0200 Subject: [PATCH 02/10] Added integration tests for topic deletion --- tests/integration/commands/test_deletion.py | 52 +++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 tests/integration/commands/test_deletion.py diff --git a/tests/integration/commands/test_deletion.py b/tests/integration/commands/test_deletion.py new file mode 100644 index 00000000..3998491a --- /dev/null +++ b/tests/integration/commands/test_deletion.py @@ -0,0 +1,52 @@ +import confluent_kafka +import pytest +from click.testing import CliRunner + +from esque.cli.commands import delete_topic + + +@pytest.fixture() +def basic_topic(num_partitions, topic_factory): + yield from topic_factory(num_partitions, "basic-topic") + + +@pytest.fixture() +def duplicate_topic(num_partitions, topic_factory): + yield from topic_factory(num_partitions, "basic.topic") + + +@pytest.mark.integration +def test_topic_deletion_works( + cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str +): + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic in topics + + result = cli_runner.invoke(delete_topic, [topic], input="y\n") + assert result.exit_code == 0 + + # Invalidate cache + confluent_admin_client.poll(timeout=1) + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic not in topics + + +@pytest.mark.integration +def test_keep_kafka_duplicated( + cli_runner: CliRunner, + confluent_admin_client: confluent_kafka.admin.AdminClient, + basic_topic: str, + duplicate_topic: str, +): + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert basic_topic[0] in topics + assert duplicate_topic[0] in topics + + result = cli_runner.invoke(delete_topic, [duplicate_topic[0]], input="y\n") + assert result.exit_code == 0 + + # Invalidate cache + confluent_admin_client.poll(timeout=1) + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert duplicate_topic[0] not in topics + assert basic_topic[0] in topics From 145af23a87e038b2e216656dc2b4064831855855 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 18 Sep 2019 13:19:50 +0200 Subject: [PATCH 03/10] Added integration tests for topic creation --- tests/integration/commands/test_creation.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 tests/integration/commands/test_creation.py diff --git a/tests/integration/commands/test_creation.py b/tests/integration/commands/test_creation.py new file mode 100644 index 00000000..67157c2b --- /dev/null +++ b/tests/integration/commands/test_creation.py @@ -0,0 +1,12 @@ +import confluent_kafka +from click.testing import CliRunner + +from esque.cli.commands import create_topic + + +def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str): + + cli_runner.invoke(create_topic, [topic_id]) + + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic_id not in topics From 6101675c42f85be431210c64fb8726a27861924a Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 18 Sep 2019 13:51:33 +0200 Subject: [PATCH 04/10] Fixed ping command --- esque/cli/commands.py | 4 +-- esque/clients.py | 33 ++++++++++++++++++++++--- tests/integration/commands/test_ping.py | 25 +++++++++++++++++++ 3 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 tests/integration/commands/test_ping.py diff --git a/esque/cli/commands.py b/esque/cli/commands.py index 195b20f4..58c96390 100644 --- a/esque/cli/commands.py +++ b/esque/cli/commands.py @@ -363,7 +363,7 @@ def ping(state, times, wait): deltas = [] try: try: - topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)]) + topic_controller.create_topics([Topic(PING_TOPIC)]) except TopicAlreadyExistsException: click.echo("Topic already exists.") @@ -374,7 +374,7 @@ def ping(state, times, wait): for i in range(times): producer.produce(PING_TOPIC) - _, delta = consumer.consume() + _, delta = consumer.consume(1) deltas.append(delta) click.echo(f"m_seq={i} time={delta:.2f}ms") sleep(wait) diff --git a/esque/clients.py b/esque/clients.py index 3d2285f9..beccda48 100644 --- a/esque/clients.py +++ b/esque/clients.py @@ -1,5 +1,6 @@ import json import pathlib +from abc import ABC, abstractmethod from contextlib import ExitStack from glob import glob from typing import Optional, Tuple @@ -9,14 +10,14 @@ import pendulum from confluent_kafka import Message from confluent_kafka.avro import AvroProducer +from confluent_kafka.cimpl import TopicPartition from esque.avromessage import AvroFileReader, AvroFileWriter from esque.config import Config -from esque.errors import raise_for_kafka_error, raise_for_message, MessageEmptyException +from esque.errors import MessageEmptyException, raise_for_kafka_error, raise_for_message from esque.helpers import delivery_callback, delta_t -from esque.message import KafkaMessage, PlainTextFileReader, PlainTextFileWriter, FileReader, FileWriter +from esque.message import FileReader, FileWriter, KafkaMessage, PlainTextFileReader, PlainTextFileWriter from esque.schemaregistry import SchemaRegistryClient -from abc import ABC, abstractmethod class AbstractConsumer(ABC): @@ -56,8 +57,32 @@ def _consume_single_message(self, timeout=30) -> Optional[Message]: class PingConsumer(AbstractConsumer): + def __init__(self, group_id: str, topic_name: str, last: bool): + offset_reset = "earliest" + if last: + offset_reset = "latest" + self._config = Config().create_confluent_config() + self._config.update( + { + "group.id": group_id, + "error_cb": raise_for_kafka_error, + # We need to commit offsets manually once we"re sure it got saved + # to the sink + "enable.auto.commit": True, + "enable.partition.eof": False, + # We need this to start at the last committed offset instead of the + # latest when subscribing for the first time + "default.topic.config": {"auto.offset.reset": offset_reset}, + } + ) + self._consumer = confluent_kafka.Consumer(self._config) + self._assign_exact_partitions(topic_name) + + def _assign_exact_partitions(self, topic: str) -> None: + self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)]) + def consume(self, amount: int) -> Optional[Tuple[str, int]]: - message = self._consume_single_message() + message = self._consume_single_message(timeout=1) msg_sent_at = pendulum.from_timestamp(float(message.value())) delta_sent = pendulum.now() - msg_sent_at diff --git a/tests/integration/commands/test_ping.py b/tests/integration/commands/test_ping.py new file mode 100644 index 00000000..e0377c35 --- /dev/null +++ b/tests/integration/commands/test_ping.py @@ -0,0 +1,25 @@ +import confluent_kafka +from click.testing import CliRunner + +from esque import config +from esque.cli.commands import ping +from esque.topic_controller import TopicController + + +def test_smoke_test_ping(cli_runner: CliRunner): + result = cli_runner.invoke(ping) + + assert result.exit_code == 0 + + +def test_correct_amount_of_messages( + mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str +): + config.RANDOM = "test" + + topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock()) + + result = cli_runner.invoke(ping) + + assert result.exit_code == 0 + assert topic_controller_delete_topic.call_count == 1 From ad179f612d605e046588d2c46b37dbb0ab589255 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Wed, 18 Sep 2019 13:57:49 +0200 Subject: [PATCH 05/10] Add some smoke tests --- tests/integration/commands/test_describe.py | 9 +++++++++ tests/integration/commands/test_get.py | 9 +++++++++ 2 files changed, 18 insertions(+) create mode 100644 tests/integration/commands/test_describe.py create mode 100644 tests/integration/commands/test_get.py diff --git a/tests/integration/commands/test_describe.py b/tests/integration/commands/test_describe.py new file mode 100644 index 00000000..826f56c3 --- /dev/null +++ b/tests/integration/commands/test_describe.py @@ -0,0 +1,9 @@ +from click.testing import CliRunner + +from esque.cli.commands import describe_topic + + +def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str): + result = cli_runner.invoke(describe_topic, [topic]) + + assert result.exit_code == 0 diff --git a/tests/integration/commands/test_get.py b/tests/integration/commands/test_get.py new file mode 100644 index 00000000..505be3c3 --- /dev/null +++ b/tests/integration/commands/test_get.py @@ -0,0 +1,9 @@ +from click.testing import CliRunner + +from esque.cli.commands import get_topics + + +def test_smoke_test_get_topics(cli_runner: CliRunner): + result = cli_runner.invoke(get_topics) + + assert result.exit_code == 0 From 29826733024648c9fb541cce9723de321655f28d Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Thu, 19 Sep 2019 16:01:17 +0200 Subject: [PATCH 06/10] Removed necessity to define amount --- esque/clients/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/esque/clients/consumer.py b/esque/clients/consumer.py index dc0a54a5..890586ad 100644 --- a/esque/clients/consumer.py +++ b/esque/clients/consumer.py @@ -41,7 +41,7 @@ def _subscribe(self, topic: str) -> None: self._consumer.subscribe([topic]) @abstractmethod - def consume(self, amount: int) -> int: + def consume(self, **kwargs) -> int: pass def _consume_single_message(self, timeout=30) -> Optional[Message]: @@ -55,7 +55,7 @@ def __init__(self, group_id: str, topic_name: str, last: bool): super().__init__(group_id, topic_name, last) self._assign_exact_partitions(topic_name) - def consume(self, amount: int) -> Optional[Tuple[str, int]]: + def consume(self) -> Optional[Tuple[str, int]]: message = self._consume_single_message(timeout=1) msg_sent_at = pendulum.from_timestamp(float(message.value())) delta_sent = pendulum.now() - msg_sent_at From a28f41b4e87c9c2f24e725f2650996b244a3af06 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Thu, 19 Sep 2019 16:07:46 +0200 Subject: [PATCH 07/10] Fix some of Garrett's comments --- esque/cli/commands.py | 2 +- esque/clients/consumer.py | 2 +- tests/integration/commands/test_apply.py | 4 +-- tests/integration/commands/test_creation.py | 33 +++++++++++++++++++++ tests/integration/commands/test_deletion.py | 2 +- tests/integration/commands/test_describe.py | 2 ++ tests/integration/commands/test_get.py | 2 ++ tests/integration/commands/test_ping.py | 5 +++- 8 files changed, 46 insertions(+), 6 deletions(-) diff --git a/esque/cli/commands.py b/esque/cli/commands.py index 4fdea314..817f9a8b 100644 --- a/esque/cli/commands.py +++ b/esque/cli/commands.py @@ -383,7 +383,7 @@ def ping(state, times, wait): for i in range(times): producer.produce(PING_TOPIC) - _, delta = consumer.consume(1) + _, delta = consumer.consume() deltas.append(delta) click.echo(f"m_seq={i} time={delta:.2f}ms") sleep(wait) diff --git a/esque/clients/consumer.py b/esque/clients/consumer.py index 890586ad..09895f9a 100644 --- a/esque/clients/consumer.py +++ b/esque/clients/consumer.py @@ -44,7 +44,7 @@ def _subscribe(self, topic: str) -> None: def consume(self, **kwargs) -> int: pass - def _consume_single_message(self, timeout=30) -> Optional[Message]: + def _consume_single_message(self, timeout=30) -> Message: message = self._consumer.poll(timeout=timeout) raise_for_message(message) return message diff --git a/tests/integration/commands/test_apply.py b/tests/integration/commands/test_apply.py index 61e40839..fef66648 100644 --- a/tests/integration/commands/test_apply.py +++ b/tests/integration/commands/test_apply.py @@ -3,11 +3,11 @@ import pytest import yaml from click.testing import CliRunner +from esque.topic import Topic +from esque.topic_controller import TopicController from esque.cli.commands import apply from esque.errors import KafkaException -from esque.topic import Topic -from esque.topic_controller import TopicController @pytest.mark.integration diff --git a/tests/integration/commands/test_creation.py b/tests/integration/commands/test_creation.py index 67157c2b..9855c136 100644 --- a/tests/integration/commands/test_creation.py +++ b/tests/integration/commands/test_creation.py @@ -1,12 +1,45 @@ import confluent_kafka +import pytest from click.testing import CliRunner from esque.cli.commands import create_topic +from esque.cli.options import State +from esque.resources.topic import Topic +@pytest.mark.integration def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str): cli_runner.invoke(create_topic, [topic_id]) topics = confluent_admin_client.list_topics(timeout=5).topics.keys() assert topic_id not in topics + + +@pytest.mark.integration +def test_topic_creation_with_template_works( + state: State, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str +): + topic_1 = topic_id + "_1" + topic_2 = topic_id + "_2" + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic_1 not in topics + replication_factor = 1 + num_partitions = 1 + config = { + "cleanup.policy": "delete", + "delete.retention.ms": "123456", + "file.delete.delay.ms": "789101112", + "flush.messages": "12345678910111213", + "flush.ms": "123456789", + } + state.cluster.topic_controller.create_topics( + [Topic(topic_1, replication_factor=replication_factor, num_partitions=num_partitions, config=config)] + ) + runner = CliRunner() + runner.invoke(create_topic, ["--no-verify", "-l", topic_1, topic_2]) + config_from_template = state.cluster.topic_controller.get_cluster_topic(topic_2) + assert config_from_template.replication_factor == replication_factor + assert config_from_template.num_partitions == num_partitions + for config_key, value in config.items(): + assert config_from_template.config[config_key] == value diff --git a/tests/integration/commands/test_deletion.py b/tests/integration/commands/test_deletion.py index 3998491a..36476c95 100644 --- a/tests/integration/commands/test_deletion.py +++ b/tests/integration/commands/test_deletion.py @@ -32,7 +32,7 @@ def test_topic_deletion_works( @pytest.mark.integration -def test_keep_kafka_duplicated( +def test_keep_minus_delete_period( cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, basic_topic: str, diff --git a/tests/integration/commands/test_describe.py b/tests/integration/commands/test_describe.py index 826f56c3..c75f48c9 100644 --- a/tests/integration/commands/test_describe.py +++ b/tests/integration/commands/test_describe.py @@ -1,8 +1,10 @@ +import pytest from click.testing import CliRunner from esque.cli.commands import describe_topic +@pytest.mark.integration def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str): result = cli_runner.invoke(describe_topic, [topic]) diff --git a/tests/integration/commands/test_get.py b/tests/integration/commands/test_get.py index 505be3c3..2ac5afd6 100644 --- a/tests/integration/commands/test_get.py +++ b/tests/integration/commands/test_get.py @@ -1,8 +1,10 @@ +import pytest from click.testing import CliRunner from esque.cli.commands import get_topics +@pytest.mark.integration def test_smoke_test_get_topics(cli_runner: CliRunner): result = cli_runner.invoke(get_topics) diff --git a/tests/integration/commands/test_ping.py b/tests/integration/commands/test_ping.py index e0377c35..0596d577 100644 --- a/tests/integration/commands/test_ping.py +++ b/tests/integration/commands/test_ping.py @@ -1,17 +1,20 @@ import confluent_kafka +import pytest from click.testing import CliRunner from esque import config from esque.cli.commands import ping -from esque.topic_controller import TopicController +from esque.controller.topic_controller import TopicController +@pytest.mark.integration def test_smoke_test_ping(cli_runner: CliRunner): result = cli_runner.invoke(ping) assert result.exit_code == 0 +@pytest.mark.integration def test_correct_amount_of_messages( mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str ): From 17fa3bba1725b5bc1228edf9d710955e168ffea4 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Thu, 19 Sep 2019 16:16:31 +0200 Subject: [PATCH 08/10] Fixed ping test --- tests/integration/commands/test_ping.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/integration/commands/test_ping.py b/tests/integration/commands/test_ping.py index 0596d577..a730554d 100644 --- a/tests/integration/commands/test_ping.py +++ b/tests/integration/commands/test_ping.py @@ -1,4 +1,3 @@ -import confluent_kafka import pytest from click.testing import CliRunner @@ -15,14 +14,13 @@ def test_smoke_test_ping(cli_runner: CliRunner): @pytest.mark.integration -def test_correct_amount_of_messages( - mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str -): - config.RANDOM = "test" - +def test_correct_amount_of_messages(mocker, cli_runner: CliRunner, topic_controller: TopicController): topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock()) result = cli_runner.invoke(ping) assert result.exit_code == 0 assert topic_controller_delete_topic.call_count == 1 + + ping_topic = topic_controller.get_cluster_topic(config.PING_TOPIC) + assert ping_topic.offsets[0].high == 10 From 6ac8a71c4673bb1888799b4085a1f65361ce7610 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Thu, 19 Sep 2019 16:31:44 +0200 Subject: [PATCH 09/10] Fix --- tests/integration/commands/test_apply.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/commands/test_apply.py b/tests/integration/commands/test_apply.py index fef66648..48fa8865 100644 --- a/tests/integration/commands/test_apply.py +++ b/tests/integration/commands/test_apply.py @@ -3,8 +3,8 @@ import pytest import yaml from click.testing import CliRunner -from esque.topic import Topic -from esque.topic_controller import TopicController +from esque.resources.topic import Topic +from esque.controller.topic_controller import TopicController from esque.cli.commands import apply from esque.errors import KafkaException From 15e7d648dd1b55926e22325eb833b49f9fac0976 Mon Sep 17 00:00:00 2001 From: Jannik Hoffjann Date: Thu, 19 Sep 2019 16:46:03 +0200 Subject: [PATCH 10/10] Adds state to integration tests --- tests/conftest.py | 6 ++++++ tests/integration/test_topic_controller.py | 6 ------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index a8e00235..94718954 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,7 @@ from confluent_kafka.cimpl import Producer, TopicPartition from pykafka.exceptions import NoBrokersAvailableError +from esque.cli.options import State from esque.cluster import Cluster from esque.config import sample_config_path, Config from esque.errors import raise_for_kafka_error @@ -211,3 +212,8 @@ def cluster(test_config): raise ex yield cluster + + +@pytest.fixture() +def state(test_config): + yield State() diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index 7f63b1f0..4c02b909 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -3,17 +3,11 @@ import confluent_kafka import pytest -from esque.cli.options import State from esque.controller.topic_controller import TopicController from esque.errors import KafkaException from esque.resources.topic import Topic, TopicDiff -@pytest.fixture() -def state(test_config): - yield State() - - @pytest.fixture() def topic_controller(cluster): yield cluster.topic_controller