Skip to content

Commit

Permalink
Merge 15e7d64 into 7129c8a
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn committed Sep 19, 2019
2 parents 7129c8a + 15e7d64 commit aacadc4
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 171 deletions.
2 changes: 1 addition & 1 deletion esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
topic_controller.create_topics([Topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand Down
19 changes: 13 additions & 6 deletions esque/clients/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import confluent_kafka
import pendulum
from confluent_kafka.cimpl import Message
from confluent_kafka.cimpl import Message, TopicPartition

from esque.clients.schemaregistry import SchemaRegistryClient
from esque.config import Config
Expand Down Expand Up @@ -35,29 +35,35 @@ def __init__(self, group_id: str, topic_name: str, last: bool):
}
)
self._consumer = confluent_kafka.Consumer(self._config)
self._subscribe(topic_name)
self._topic_name = topic_name

def _subscribe(self, topic: str) -> None:
self._consumer.subscribe([topic])

@abstractmethod
def consume(self, amount: int) -> int:
def consume(self, **kwargs) -> int:
pass

def _consume_single_message(self, timeout=30) -> Optional[Message]:
def _consume_single_message(self, timeout=30) -> Message:
message = self._consumer.poll(timeout=timeout)
raise_for_message(message)
return message


class PingConsumer(AbstractConsumer):
def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message()
def __init__(self, group_id: str, topic_name: str, last: bool):
super().__init__(group_id, topic_name, last)
self._assign_exact_partitions(topic_name)

def consume(self) -> Optional[Tuple[str, int]]:
message = self._consume_single_message(timeout=1)
msg_sent_at = pendulum.from_timestamp(float(message.value()))
delta_sent = pendulum.now() - msg_sent_at
return message.key(), delta_sent.microseconds / 1000

def _assign_exact_partitions(self, topic: str) -> None:
self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)])


class FileConsumer(AbstractConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
Expand Down Expand Up @@ -101,6 +107,7 @@ class AvroFileConsumer(FileConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
super().__init__(group_id, topic_name, working_dir, last)
self.schema_registry_client = SchemaRegistryClient(Config().schema_registry)
self._subscribe(topic_name)

def get_file_writer(self, partition: int) -> FileWriter:
return AvroFileWriter((self.working_dir / f"partition_{partition}"), self.schema_registry_client)
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

import confluent_kafka
import pytest
from click.testing import CliRunner
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.avro import AvroProducer
from confluent_kafka.cimpl import Producer, TopicPartition
from pykafka.exceptions import NoBrokersAvailableError

from esque.cli.options import State
from esque.cluster import Cluster
from esque.config import sample_config_path, Config
from esque.errors import raise_for_kafka_error
Expand Down Expand Up @@ -38,6 +40,11 @@ def pytest_collection_modifyitems(config, items):
item.add_marker(integration)


@pytest.fixture()
def cli_runner():
yield CliRunner()


@pytest.fixture()
def test_config_path(mocker, tmpdir_factory):
fn: Path = tmpdir_factory.mktemp("config").join("dummy.cfg")
Expand Down Expand Up @@ -205,3 +212,8 @@ def cluster(test_config):
raise ex

yield cluster


@pytest.fixture()
def state(test_config):
yield State()
Empty file.
119 changes: 119 additions & 0 deletions tests/integration/commands/test_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Dict

import pytest
import yaml
from click.testing import CliRunner
from esque.resources.topic import Topic
from esque.controller.topic_controller import TopicController

from esque.cli.commands import apply
from esque.errors import KafkaException


@pytest.mark.integration
def test_apply(cli_runner, topic_controller: TopicController, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name + "_1",
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
topic_2 = {
"name": topic_name + "_2",
"replication_factor": 1,
"num_partitions": 5,
"config": {"cleanup.policy": "delete", "delete.retention.ms": 50000},
}
apply_conf = {"topics": [topic_1]}

# 1: topic creation
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 2: change cleanup policy to delete
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 3: add another topic and change the first one again
apply_conf["topics"].append(topic_2)
topic_1["config"]["cleanup.policy"] = "compact"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 4: no changes
result = cli_runner.invoke(apply, ["-f", path])
assert (
result.exit_code == 0 and "No changes detected, aborting" in result.output
), f"Calling apply failed, error: {result.output}"

# 5: change partitions - this attempt should be cancelled
topic_1["num_partitions"] = 3
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output
), f"Calling apply failed, error: {result.output}"
# reset config to the old settings again
topic_1["num_partitions"] = 50
topic_1["config"]["cleanup.policy"] = "compact"

# final: check results in the cluster to make sure they match
for topic_conf in apply_conf["topics"]:
topic_from_conf = Topic.from_dict(topic_conf)
assert not topic_controller.diff_with_cluster(
topic_from_conf
).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}"


@pytest.mark.integration
def test_apply_duplicate_names(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1, topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed"


@pytest.mark.integration
def test_apply_invalid_replicas(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 100,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed"


def save_yaml(fname: str, data: Dict[str, Any]) -> str:
# this path name is in the gitignore so the temp files are not committed
path = f"tests/test_samples/{fname}_apply.yaml"
with open(path, "w") as outfile:
yaml.dump(data, outfile, default_flow_style=False)
return path
45 changes: 45 additions & 0 deletions tests/integration/commands/test_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import confluent_kafka
import pytest
from click.testing import CliRunner

from esque.cli.commands import create_topic
from esque.cli.options import State
from esque.resources.topic import Topic


@pytest.mark.integration
def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str):

cli_runner.invoke(create_topic, [topic_id])

topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic_id not in topics


@pytest.mark.integration
def test_topic_creation_with_template_works(
state: State, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str
):
topic_1 = topic_id + "_1"
topic_2 = topic_id + "_2"
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic_1 not in topics
replication_factor = 1
num_partitions = 1
config = {
"cleanup.policy": "delete",
"delete.retention.ms": "123456",
"file.delete.delay.ms": "789101112",
"flush.messages": "12345678910111213",
"flush.ms": "123456789",
}
state.cluster.topic_controller.create_topics(
[Topic(topic_1, replication_factor=replication_factor, num_partitions=num_partitions, config=config)]
)
runner = CliRunner()
runner.invoke(create_topic, ["--no-verify", "-l", topic_1, topic_2])
config_from_template = state.cluster.topic_controller.get_cluster_topic(topic_2)
assert config_from_template.replication_factor == replication_factor
assert config_from_template.num_partitions == num_partitions
for config_key, value in config.items():
assert config_from_template.config[config_key] == value
52 changes: 52 additions & 0 deletions tests/integration/commands/test_deletion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import confluent_kafka
import pytest
from click.testing import CliRunner

from esque.cli.commands import delete_topic


@pytest.fixture()
def basic_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic-topic")


@pytest.fixture()
def duplicate_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic.topic")


@pytest.mark.integration
def test_topic_deletion_works(
cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic in topics

result = cli_runner.invoke(delete_topic, [topic], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic not in topics


@pytest.mark.integration
def test_keep_minus_delete_period(
cli_runner: CliRunner,
confluent_admin_client: confluent_kafka.admin.AdminClient,
basic_topic: str,
duplicate_topic: str,
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert basic_topic[0] in topics
assert duplicate_topic[0] in topics

result = cli_runner.invoke(delete_topic, [duplicate_topic[0]], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert duplicate_topic[0] not in topics
assert basic_topic[0] in topics
11 changes: 11 additions & 0 deletions tests/integration/commands/test_describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pytest
from click.testing import CliRunner

from esque.cli.commands import describe_topic


@pytest.mark.integration
def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str):
result = cli_runner.invoke(describe_topic, [topic])

assert result.exit_code == 0
11 changes: 11 additions & 0 deletions tests/integration/commands/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pytest
from click.testing import CliRunner

from esque.cli.commands import get_topics


@pytest.mark.integration
def test_smoke_test_get_topics(cli_runner: CliRunner):
result = cli_runner.invoke(get_topics)

assert result.exit_code == 0
26 changes: 26 additions & 0 deletions tests/integration/commands/test_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
from click.testing import CliRunner

from esque import config
from esque.cli.commands import ping
from esque.controller.topic_controller import TopicController


@pytest.mark.integration
def test_smoke_test_ping(cli_runner: CliRunner):
result = cli_runner.invoke(ping)

assert result.exit_code == 0


@pytest.mark.integration
def test_correct_amount_of_messages(mocker, cli_runner: CliRunner, topic_controller: TopicController):
topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock())

result = cli_runner.invoke(ping)

assert result.exit_code == 0
assert topic_controller_delete_topic.call_count == 1

ping_topic = topic_controller.get_cluster_topic(config.PING_TOPIC)
assert ping_topic.offsets[0].high == 10

0 comments on commit aacadc4

Please sign in to comment.