Skip to content

Commit

Permalink
Merge b9b0ca0 into 7129c8a
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn authored Sep 19, 2019
2 parents 7129c8a + b9b0ca0 commit 5e550c3
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 164 deletions.
4 changes: 2 additions & 2 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
topic_controller.create_topics([Topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand All @@ -383,7 +383,7 @@ def ping(state, times, wait):

for i in range(times):
producer.produce(PING_TOPIC)
_, delta = consumer.consume()
_, delta = consumer.consume(1)
deltas.append(delta)
click.echo(f"m_seq={i} time={delta:.2f}ms")
sleep(wait)
Expand Down
15 changes: 11 additions & 4 deletions esque/clients/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import confluent_kafka
import pendulum
from confluent_kafka.cimpl import Message
from confluent_kafka.cimpl import Message, TopicPartition

from esque.clients.schemaregistry import SchemaRegistryClient
from esque.config import Config
Expand Down Expand Up @@ -35,7 +35,7 @@ def __init__(self, group_id: str, topic_name: str, last: bool):
}
)
self._consumer = confluent_kafka.Consumer(self._config)
self._subscribe(topic_name)
self._topic_name = topic_name

def _subscribe(self, topic: str) -> None:
self._consumer.subscribe([topic])
Expand All @@ -51,13 +51,19 @@ def _consume_single_message(self, timeout=30) -> Optional[Message]:


class PingConsumer(AbstractConsumer):
def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message()
def __init__(self, group_id: str, topic_name: str, last: bool):
super().__init__(group_id, topic_name, last)
self._assign_exact_partitions(topic_name)

def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message(timeout=1)
msg_sent_at = pendulum.from_timestamp(float(message.value()))
delta_sent = pendulum.now() - msg_sent_at
return message.key(), delta_sent.microseconds / 1000

def _assign_exact_partitions(self, topic: str) -> None:
self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)])


class FileConsumer(AbstractConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
Expand Down Expand Up @@ -101,6 +107,7 @@ class AvroFileConsumer(FileConsumer):
def __init__(self, group_id: str, topic_name: str, working_dir: pathlib.Path, last: bool):
super().__init__(group_id, topic_name, working_dir, last)
self.schema_registry_client = SchemaRegistryClient(Config().schema_registry)
self._subscribe(topic_name)

def get_file_writer(self, partition: int) -> FileWriter:
return AvroFileWriter((self.working_dir / f"partition_{partition}"), self.schema_registry_client)
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import confluent_kafka
import pytest
from click.testing import CliRunner
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.avro import AvroProducer
from confluent_kafka.cimpl import Producer, TopicPartition
Expand Down Expand Up @@ -38,6 +39,11 @@ def pytest_collection_modifyitems(config, items):
item.add_marker(integration)


@pytest.fixture()
def cli_runner():
yield CliRunner()


@pytest.fixture()
def test_config_path(mocker, tmpdir_factory):
fn: Path = tmpdir_factory.mktemp("config").join("dummy.cfg")
Expand Down
Empty file.
119 changes: 119 additions & 0 deletions tests/integration/commands/test_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Dict

import pytest
import yaml
from click.testing import CliRunner

from esque.cli.commands import apply
from esque.errors import KafkaException
from esque.topic import Topic
from esque.topic_controller import TopicController


@pytest.mark.integration
def test_apply(cli_runner, topic_controller: TopicController, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name + "_1",
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
topic_2 = {
"name": topic_name + "_2",
"replication_factor": 1,
"num_partitions": 5,
"config": {"cleanup.policy": "delete", "delete.retention.ms": 50000},
}
apply_conf = {"topics": [topic_1]}

# 1: topic creation
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 2: change cleanup policy to delete
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 3: add another topic and change the first one again
apply_conf["topics"].append(topic_2)
topic_1["config"]["cleanup.policy"] = "compact"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 4: no changes
result = cli_runner.invoke(apply, ["-f", path])
assert (
result.exit_code == 0 and "No changes detected, aborting" in result.output
), f"Calling apply failed, error: {result.output}"

# 5: change partitions - this attempt should be cancelled
topic_1["num_partitions"] = 3
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output
), f"Calling apply failed, error: {result.output}"
# reset config to the old settings again
topic_1["num_partitions"] = 50
topic_1["config"]["cleanup.policy"] = "compact"

# final: check results in the cluster to make sure they match
for topic_conf in apply_conf["topics"]:
topic_from_conf = Topic.from_dict(topic_conf)
assert not topic_controller.diff_with_cluster(
topic_from_conf
).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}"


@pytest.mark.integration
def test_apply_duplicate_names(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1, topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed"


@pytest.mark.integration
def test_apply_invalid_replicas(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 100,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed"


def save_yaml(fname: str, data: Dict[str, Any]) -> str:
# this path name is in the gitignore so the temp files are not committed
path = f"tests/test_samples/{fname}_apply.yaml"
with open(path, "w") as outfile:
yaml.dump(data, outfile, default_flow_style=False)
return path
12 changes: 12 additions & 0 deletions tests/integration/commands/test_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import confluent_kafka
from click.testing import CliRunner

from esque.cli.commands import create_topic


def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str):

cli_runner.invoke(create_topic, [topic_id])

topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic_id not in topics
52 changes: 52 additions & 0 deletions tests/integration/commands/test_deletion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import confluent_kafka
import pytest
from click.testing import CliRunner

from esque.cli.commands import delete_topic


@pytest.fixture()
def basic_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic-topic")


@pytest.fixture()
def duplicate_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic.topic")


@pytest.mark.integration
def test_topic_deletion_works(
cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic in topics

result = cli_runner.invoke(delete_topic, [topic], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic not in topics


@pytest.mark.integration
def test_keep_kafka_duplicated(
cli_runner: CliRunner,
confluent_admin_client: confluent_kafka.admin.AdminClient,
basic_topic: str,
duplicate_topic: str,
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert basic_topic[0] in topics
assert duplicate_topic[0] in topics

result = cli_runner.invoke(delete_topic, [duplicate_topic[0]], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert duplicate_topic[0] not in topics
assert basic_topic[0] in topics
9 changes: 9 additions & 0 deletions tests/integration/commands/test_describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from click.testing import CliRunner

from esque.cli.commands import describe_topic


def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str):
result = cli_runner.invoke(describe_topic, [topic])

assert result.exit_code == 0
9 changes: 9 additions & 0 deletions tests/integration/commands/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from click.testing import CliRunner

from esque.cli.commands import get_topics


def test_smoke_test_get_topics(cli_runner: CliRunner):
result = cli_runner.invoke(get_topics)

assert result.exit_code == 0
25 changes: 25 additions & 0 deletions tests/integration/commands/test_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import confluent_kafka
from click.testing import CliRunner

from esque import config
from esque.cli.commands import ping
from esque.topic_controller import TopicController


def test_smoke_test_ping(cli_runner: CliRunner):
result = cli_runner.invoke(ping)

assert result.exit_code == 0


def test_correct_amount_of_messages(
mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str
):
config.RANDOM = "test"

topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock())

result = cli_runner.invoke(ping)

assert result.exit_code == 0
assert topic_controller_delete_topic.call_count == 1
Loading

0 comments on commit 5e550c3

Please sign in to comment.