diff --git a/esque/cli/commands.py b/esque/cli/commands.py index d58f6a5b..817f9a8b 100644 --- a/esque/cli/commands.py +++ b/esque/cli/commands.py @@ -372,7 +372,7 @@ def ping(state, times, wait): deltas = [] try: try: - topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)]) + topic_controller.create_topics([Topic(PING_TOPIC)]) except TopicAlreadyExistsException: click.echo("Topic already exists.") diff --git a/esque/clients/consumer.py b/esque/clients/consumer.py index 54cafe67..09895f9a 100644 --- a/esque/clients/consumer.py +++ b/esque/clients/consumer.py @@ -5,7 +5,7 @@ import confluent_kafka import pendulum -from confluent_kafka.cimpl import Message +from confluent_kafka.cimpl import Message, TopicPartition from esque.clients.schemaregistry import SchemaRegistryClient from esque.config import Config @@ -35,29 +35,35 @@ def __init__(self, group_id: str, topic_name: str, last: bool): } ) self._consumer = confluent_kafka.Consumer(self._config) - self._subscribe(topic_name) + self._topic_name = topic_name def _subscribe(self, topic: str) -> None: self._consumer.subscribe([topic]) @abstractmethod - def consume(self, amount: int) -> int: + def consume(self, **kwargs) -> int: pass - def _consume_single_message(self, timeout=30) -> Optional[Message]: + def _consume_single_message(self, timeout=30) -> Message: message = self._consumer.poll(timeout=timeout) raise_for_message(message) return message class PingConsumer(AbstractConsumer): - def consume(self, amount: int) -> Optional[Tuple[str, int]]: - message = self._consume_single_message() + def __init__(self, group_id: str, topic_name: str, last: bool): + super().__init__(group_id, topic_name, last) + self._assign_exact_partitions(topic_name) + def consume(self) -> Optional[Tuple[str, int]]: + message = self._consume_single_message(timeout=1) msg_sent_at = pendulum.from_timestamp(float(message.value())) delta_sent = pendulum.now() - msg_sent_at return message.key(), delta_sent.microseconds / 1000 + def _assign_exact_partitions(self, topic: str) -> None: + self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)]) + class FileConsumer(AbstractConsumer): def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool): @@ -101,6 +107,7 @@ class AvroFileConsumer(FileConsumer): def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool): super().__init__(group_id, topic_name, working_dir, last) self.schema_registry_client = SchemaRegistryClient(Config().schema_registry) + self._subscribe(topic_name) def get_file_writer(self, partition: int) -> FileWriter: return AvroFileWriter((self.working_dir / f"partition_{partition}"), self.schema_registry_client) diff --git a/tests/conftest.py b/tests/conftest.py index bf680232..94718954 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,11 +6,13 @@ import confluent_kafka import pytest +from click.testing import CliRunner from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.avro import AvroProducer from confluent_kafka.cimpl import Producer, TopicPartition from pykafka.exceptions import NoBrokersAvailableError +from esque.cli.options import State from esque.cluster import Cluster from esque.config import sample_config_path, Config from esque.errors import raise_for_kafka_error @@ -38,6 +40,11 @@ def pytest_collection_modifyitems(config, items): item.add_marker(integration) +@pytest.fixture() +def cli_runner(): + yield CliRunner() + + @pytest.fixture() def test_config_path(mocker, tmpdir_factory): fn: Path = tmpdir_factory.mktemp("config").join("dummy.cfg") @@ -205,3 +212,8 @@ def cluster(test_config): raise ex yield cluster + + +@pytest.fixture() +def state(test_config): + yield State() diff --git a/tests/integration/commands/__init__.py b/tests/integration/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/commands/test_apply.py b/tests/integration/commands/test_apply.py new file mode 100644 index 00000000..48fa8865 --- /dev/null +++ b/tests/integration/commands/test_apply.py @@ -0,0 +1,119 @@ +from typing import Any, Dict + +import pytest +import yaml +from click.testing import CliRunner +from esque.resources.topic import Topic +from esque.controller.topic_controller import TopicController + +from esque.cli.commands import apply +from esque.errors import KafkaException + + +@pytest.mark.integration +def test_apply(cli_runner, topic_controller: TopicController, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name + "_1", + "replication_factor": 1, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + topic_2 = { + "name": topic_name + "_2", + "replication_factor": 1, + "num_partitions": 5, + "config": {"cleanup.policy": "delete", "delete.retention.ms": 50000}, + } + apply_conf = {"topics": [topic_1]} + + # 1: topic creation + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 2: change cleanup policy to delete + topic_1["config"]["cleanup.policy"] = "delete" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 3: add another topic and change the first one again + apply_conf["topics"].append(topic_2) + topic_1["config"]["cleanup.policy"] = "compact" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "Successfully applied changes" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 4: no changes + result = cli_runner.invoke(apply, ["-f", path]) + assert ( + result.exit_code == 0 and "No changes detected, aborting" in result.output + ), f"Calling apply failed, error: {result.output}" + + # 5: change partitions - this attempt should be cancelled + topic_1["num_partitions"] = 3 + topic_1["config"]["cleanup.policy"] = "delete" + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert ( + result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output + ), f"Calling apply failed, error: {result.output}" + # reset config to the old settings again + topic_1["num_partitions"] = 50 + topic_1["config"]["cleanup.policy"] = "compact" + + # final: check results in the cluster to make sure they match + for topic_conf in apply_conf["topics"]: + topic_from_conf = Topic.from_dict(topic_conf) + assert not topic_controller.diff_with_cluster( + topic_from_conf + ).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}" + + +@pytest.mark.integration +def test_apply_duplicate_names(cli_runner: CliRunner, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name, + "replication_factor": 1, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + apply_conf = {"topics": [topic_1, topic_1]} + + # having the same topic name twice in apply should raise an ValueError + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed" + + +@pytest.mark.integration +def test_apply_invalid_replicas(cli_runner: CliRunner, topic_id: str): + topic_name = f"apply_{topic_id}" + topic_1 = { + "name": topic_name, + "replication_factor": 100, + "num_partitions": 50, + "config": {"cleanup.policy": "compact"}, + } + apply_conf = {"topics": [topic_1]} + + # having the same topic name twice in apply should raise an ValueError + path = save_yaml(topic_id, apply_conf) + result = cli_runner.invoke(apply, ["-f", path], input="Y\n") + assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed" + + +def save_yaml(fname: str, data: Dict[str, Any]) -> str: + # this path name is in the gitignore so the temp files are not committed + path = f"tests/test_samples/{fname}_apply.yaml" + with open(path, "w") as outfile: + yaml.dump(data, outfile, default_flow_style=False) + return path diff --git a/tests/integration/commands/test_creation.py b/tests/integration/commands/test_creation.py new file mode 100644 index 00000000..9855c136 --- /dev/null +++ b/tests/integration/commands/test_creation.py @@ -0,0 +1,45 @@ +import confluent_kafka +import pytest +from click.testing import CliRunner + +from esque.cli.commands import create_topic +from esque.cli.options import State +from esque.resources.topic import Topic + + +@pytest.mark.integration +def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str): + + cli_runner.invoke(create_topic, [topic_id]) + + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic_id not in topics + + +@pytest.mark.integration +def test_topic_creation_with_template_works( + state: State, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str +): + topic_1 = topic_id + "_1" + topic_2 = topic_id + "_2" + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic_1 not in topics + replication_factor = 1 + num_partitions = 1 + config = { + "cleanup.policy": "delete", + "delete.retention.ms": "123456", + "file.delete.delay.ms": "789101112", + "flush.messages": "12345678910111213", + "flush.ms": "123456789", + } + state.cluster.topic_controller.create_topics( + [Topic(topic_1, replication_factor=replication_factor, num_partitions=num_partitions, config=config)] + ) + runner = CliRunner() + runner.invoke(create_topic, ["--no-verify", "-l", topic_1, topic_2]) + config_from_template = state.cluster.topic_controller.get_cluster_topic(topic_2) + assert config_from_template.replication_factor == replication_factor + assert config_from_template.num_partitions == num_partitions + for config_key, value in config.items(): + assert config_from_template.config[config_key] == value diff --git a/tests/integration/commands/test_deletion.py b/tests/integration/commands/test_deletion.py new file mode 100644 index 00000000..36476c95 --- /dev/null +++ b/tests/integration/commands/test_deletion.py @@ -0,0 +1,52 @@ +import confluent_kafka +import pytest +from click.testing import CliRunner + +from esque.cli.commands import delete_topic + + +@pytest.fixture() +def basic_topic(num_partitions, topic_factory): + yield from topic_factory(num_partitions, "basic-topic") + + +@pytest.fixture() +def duplicate_topic(num_partitions, topic_factory): + yield from topic_factory(num_partitions, "basic.topic") + + +@pytest.mark.integration +def test_topic_deletion_works( + cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str +): + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic in topics + + result = cli_runner.invoke(delete_topic, [topic], input="y\n") + assert result.exit_code == 0 + + # Invalidate cache + confluent_admin_client.poll(timeout=1) + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert topic not in topics + + +@pytest.mark.integration +def test_keep_minus_delete_period( + cli_runner: CliRunner, + confluent_admin_client: confluent_kafka.admin.AdminClient, + basic_topic: str, + duplicate_topic: str, +): + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert basic_topic[0] in topics + assert duplicate_topic[0] in topics + + result = cli_runner.invoke(delete_topic, [duplicate_topic[0]], input="y\n") + assert result.exit_code == 0 + + # Invalidate cache + confluent_admin_client.poll(timeout=1) + topics = confluent_admin_client.list_topics(timeout=5).topics.keys() + assert duplicate_topic[0] not in topics + assert basic_topic[0] in topics diff --git a/tests/integration/commands/test_describe.py b/tests/integration/commands/test_describe.py new file mode 100644 index 00000000..c75f48c9 --- /dev/null +++ b/tests/integration/commands/test_describe.py @@ -0,0 +1,11 @@ +import pytest +from click.testing import CliRunner + +from esque.cli.commands import describe_topic + + +@pytest.mark.integration +def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str): + result = cli_runner.invoke(describe_topic, [topic]) + + assert result.exit_code == 0 diff --git a/tests/integration/commands/test_get.py b/tests/integration/commands/test_get.py new file mode 100644 index 00000000..2ac5afd6 --- /dev/null +++ b/tests/integration/commands/test_get.py @@ -0,0 +1,11 @@ +import pytest +from click.testing import CliRunner + +from esque.cli.commands import get_topics + + +@pytest.mark.integration +def test_smoke_test_get_topics(cli_runner: CliRunner): + result = cli_runner.invoke(get_topics) + + assert result.exit_code == 0 diff --git a/tests/integration/commands/test_ping.py b/tests/integration/commands/test_ping.py new file mode 100644 index 00000000..a730554d --- /dev/null +++ b/tests/integration/commands/test_ping.py @@ -0,0 +1,26 @@ +import pytest +from click.testing import CliRunner + +from esque import config +from esque.cli.commands import ping +from esque.controller.topic_controller import TopicController + + +@pytest.mark.integration +def test_smoke_test_ping(cli_runner: CliRunner): + result = cli_runner.invoke(ping) + + assert result.exit_code == 0 + + +@pytest.mark.integration +def test_correct_amount_of_messages(mocker, cli_runner: CliRunner, topic_controller: TopicController): + topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock()) + + result = cli_runner.invoke(ping) + + assert result.exit_code == 0 + assert topic_controller_delete_topic.call_count == 1 + + ping_topic = topic_controller.get_cluster_topic(config.PING_TOPIC) + assert ping_topic.offsets[0].high == 10 diff --git a/tests/integration/test_topic_controller.py b/tests/integration/test_topic_controller.py index e1f58ae2..4c02b909 100644 --- a/tests/integration/test_topic_controller.py +++ b/tests/integration/test_topic_controller.py @@ -1,23 +1,13 @@ import json -from typing import Any, Dict import confluent_kafka import pytest -import yaml -from click.testing import CliRunner -from esque.cli.commands import apply, create_topic -from esque.cli.options import State from esque.controller.topic_controller import TopicController from esque.errors import KafkaException from esque.resources.topic import Topic, TopicDiff -@pytest.fixture() -def state(test_config): - yield State() - - @pytest.fixture() def topic_controller(cluster): yield cluster.topic_controller @@ -65,19 +55,6 @@ def test_alter_topic_config_works(topic_controller: TopicController, topic_id: s assert final_config.get("cleanup.policy") == "compact" -@pytest.mark.integration -def test_topic_deletion_works( - topic_controller: TopicController, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str -): - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - assert topic in topics - topic_controller.delete_topic(Topic(topic)) - # Invalidate cache - confluent_admin_client.poll(timeout=1) - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - assert topic not in topics - - @pytest.mark.integration def test_topic_listing_works(topic_controller: TopicController, topic: str): topics = topic_controller.list_topics() @@ -139,144 +116,3 @@ def test_topic_diff(topic_controller: TopicController, topic_id: str): topic = Topic.from_dict(conf) diff = TopicDiff().set_diff("replication_factor", 1, 3) assert get_diff(topic) == diff, "Should have a diff on replication_factor" - - -@pytest.mark.integration -def test_apply(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name + "_1", - "replication_factor": 1, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - topic_2 = { - "name": topic_name + "_2", - "replication_factor": 1, - "num_partitions": 5, - "config": {"cleanup.policy": "delete", "delete.retention.ms": 50000}, - } - apply_conf = {"topics": [topic_1]} - - # 1: topic creation - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 2: change cleanup policy to delete - topic_1["config"]["cleanup.policy"] = "delete" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 3: add another topic and change the first one again - apply_conf["topics"].append(topic_2) - topic_1["config"]["cleanup.policy"] = "compact" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "Successfully applied changes" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 4: no changes - result = runner.invoke(apply, ["-f", path]) - assert ( - result.exit_code == 0 and "No changes detected, aborting" in result.output - ), f"Calling apply failed, error: {result.output}" - - # 5: change partitions - this attempt should be cancelled - topic_1["num_partitions"] = 3 - topic_1["config"]["cleanup.policy"] = "delete" - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert ( - result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output - ), f"Calling apply failed, error: {result.output}" - # reset config to the old settings again - topic_1["num_partitions"] = 50 - topic_1["config"]["cleanup.policy"] = "compact" - - # final: check results in the cluster to make sure they match - for topic_conf in apply_conf["topics"]: - topic_from_conf = Topic.from_dict(topic_conf) - assert not topic_controller.diff_with_cluster( - topic_from_conf - ).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}" - - -@pytest.mark.integration -def test_apply_duplicate_names(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name, - "replication_factor": 1, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - apply_conf = {"topics": [topic_1, topic_1]} - - # having the same topic name twice in apply should raise an ValueError - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed" - - -@pytest.mark.integration -def test_apply_invalid_replicas(topic_controller: TopicController, topic_id: str): - runner = CliRunner() - topic_name = f"apply_{topic_id}" - topic_1 = { - "name": topic_name, - "replication_factor": 100, - "num_partitions": 50, - "config": {"cleanup.policy": "compact"}, - } - apply_conf = {"topics": [topic_1]} - - # having the same topic name twice in apply should raise an ValueError - path = save_yaml(topic_id, apply_conf) - result = runner.invoke(apply, ["-f", path], input="Y\n") - assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed" - - -@pytest.mark.integration -def test_topic_creation_with_template_works( - state: State, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str -): - topic_1 = topic_id + "_1" - topic_2 = topic_id + "_2" - topics = confluent_admin_client.list_topics(timeout=5).topics.keys() - assert topic_1 not in topics - replication_factor = 1 - num_partitions = 1 - config = { - "cleanup.policy": "delete", - "delete.retention.ms": "123456", - "file.delete.delay.ms": "789101112", - "flush.messages": "12345678910111213", - "flush.ms": "123456789", - } - state.cluster.topic_controller.create_topics( - [Topic(topic_1, replication_factor=replication_factor, num_partitions=num_partitions, config=config)] - ) - runner = CliRunner() - runner.invoke(create_topic, ["--no-verify", "-l", topic_1, topic_2]) - config_from_template = state.cluster.topic_controller.get_cluster_topic(topic_2) - assert config_from_template.replication_factor == replication_factor - assert config_from_template.num_partitions == num_partitions - for config_key, value in config.items(): - assert config_from_template.config[config_key] == value - - -def save_yaml(fname: str, data: Dict[str, Any]) -> str: - # this path name is in the gitignore so the temp files are not committed - path = f"tests/test_samples/{fname}_apply.yaml" - with open(path, "w") as outfile: - yaml.dump(data, outfile, default_flow_style=False) - return path