Skip to content

Commit

Permalink
Merge 7b58c64 into 7b10ecb
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn authored Sep 18, 2019
2 parents 7b10ecb + 7b58c64 commit de41915
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 167 deletions.
4 changes: 2 additions & 2 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
topic_controller.create_topics([Topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand All @@ -382,7 +382,7 @@ def ping(state, times, wait):

for i in range(times):
producer.produce(PING_TOPIC)
_, delta = consumer.consume()
_, delta = consumer.consume(1)
deltas.append(delta)
click.echo(f"m_seq={i} time={delta:.2f}ms")
sleep(wait)
Expand Down
33 changes: 29 additions & 4 deletions esque/clients.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import pathlib
from abc import ABC, abstractmethod
from contextlib import ExitStack
from glob import glob
from typing import Optional, Tuple
Expand All @@ -9,14 +10,14 @@
import pendulum
from confluent_kafka import Message
from confluent_kafka.avro import AvroProducer
from confluent_kafka.cimpl import TopicPartition

from esque.avromessage import AvroFileReader, AvroFileWriter
from esque.config import Config
from esque.errors import raise_for_kafka_error, raise_for_message, MessageEmptyException
from esque.errors import MessageEmptyException, raise_for_kafka_error, raise_for_message
from esque.helpers import delivery_callback, delta_t
from esque.message import KafkaMessage, PlainTextFileReader, PlainTextFileWriter, FileReader, FileWriter
from esque.message import FileReader, FileWriter, KafkaMessage, PlainTextFileReader, PlainTextFileWriter
from esque.schemaregistry import SchemaRegistryClient
from abc import ABC, abstractmethod


class AbstractConsumer(ABC):
Expand Down Expand Up @@ -56,8 +57,32 @@ def _consume_single_message(self, timeout=30) -> Optional[Message]:


class PingConsumer(AbstractConsumer):
def __init__(self, group_id: str, topic_name: str, last: bool):
offset_reset = "earliest"
if last:
offset_reset = "latest"
self._config = Config().create_confluent_config()
self._config.update(
{
"group.id": group_id,
"error_cb": raise_for_kafka_error,
# We need to commit offsets manually once we"re sure it got saved
# to the sink
"enable.auto.commit": True,
"enable.partition.eof": False,
# We need this to start at the last committed offset instead of the
# latest when subscribing for the first time
"default.topic.config": {"auto.offset.reset": offset_reset},
}
)
self._consumer = confluent_kafka.Consumer(self._config)
self._assign_exact_partitions(topic_name)

def _assign_exact_partitions(self, topic: str) -> None:
self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)])

def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message()
message = self._consume_single_message(timeout=1)

msg_sent_at = pendulum.from_timestamp(float(message.value()))
delta_sent = pendulum.now() - msg_sent_at
Expand Down
10 changes: 8 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from concurrent.futures import Future
from pathlib import Path
from string import ascii_letters
from typing import Iterable, Tuple, Callable
from typing import Callable, Iterable, Tuple

import confluent_kafka
import pytest
from click.testing import CliRunner
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.avro import AvroProducer
from confluent_kafka.cimpl import TopicPartition, Producer
from confluent_kafka.cimpl import Producer, TopicPartition
from pykafka.exceptions import NoBrokersAvailableError

from esque.cluster import Cluster
Expand Down Expand Up @@ -38,6 +39,11 @@ def pytest_collection_modifyitems(config, items):
item.add_marker(integration)


@pytest.fixture()
def cli_runner():
yield CliRunner()


@pytest.fixture()
def test_config_path(mocker, tmpdir_factory):
fn: Path = tmpdir_factory.mktemp("config").join("dummy.cfg")
Expand Down
Empty file.
119 changes: 119 additions & 0 deletions tests/integration/commands/test_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Dict

import pytest
import yaml
from click.testing import CliRunner

from esque.cli.commands import apply
from esque.errors import KafkaException
from esque.topic import Topic
from esque.topic_controller import TopicController


@pytest.mark.integration
def test_apply(cli_runner, topic_controller: TopicController, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name + "_1",
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
topic_2 = {
"name": topic_name + "_2",
"replication_factor": 1,
"num_partitions": 5,
"config": {"cleanup.policy": "delete", "delete.retention.ms": 50000},
}
apply_conf = {"topics": [topic_1]}

# 1: topic creation
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 2: change cleanup policy to delete
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 3: add another topic and change the first one again
apply_conf["topics"].append(topic_2)
topic_1["config"]["cleanup.policy"] = "compact"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "Successfully applied changes" in result.output
), f"Calling apply failed, error: {result.output}"

# 4: no changes
result = cli_runner.invoke(apply, ["-f", path])
assert (
result.exit_code == 0 and "No changes detected, aborting" in result.output
), f"Calling apply failed, error: {result.output}"

# 5: change partitions - this attempt should be cancelled
topic_1["num_partitions"] = 3
topic_1["config"]["cleanup.policy"] = "delete"
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert (
result.exit_code == 0 and "to `replication_factor` and `num_partitions`" in result.output
), f"Calling apply failed, error: {result.output}"
# reset config to the old settings again
topic_1["num_partitions"] = 50
topic_1["config"]["cleanup.policy"] = "compact"

# final: check results in the cluster to make sure they match
for topic_conf in apply_conf["topics"]:
topic_from_conf = Topic.from_dict(topic_conf)
assert not topic_controller.diff_with_cluster(
topic_from_conf
).has_changes, f"Topic configs don't match, diff is {topic_controller.diff_with_cluster(topic_from_conf)}"


@pytest.mark.integration
def test_apply_duplicate_names(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 1,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1, topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, ValueError), f"Calling apply should have failed"


@pytest.mark.integration
def test_apply_invalid_replicas(cli_runner: CliRunner, topic_id: str):
topic_name = f"apply_{topic_id}"
topic_1 = {
"name": topic_name,
"replication_factor": 100,
"num_partitions": 50,
"config": {"cleanup.policy": "compact"},
}
apply_conf = {"topics": [topic_1]}

# having the same topic name twice in apply should raise an ValueError
path = save_yaml(topic_id, apply_conf)
result = cli_runner.invoke(apply, ["-f", path], input="Y\n")
assert result.exit_code != 0 and isinstance(result.exception, KafkaException), f"Calling apply should have failed"


def save_yaml(fname: str, data: Dict[str, Any]) -> str:
# this path name is in the gitignore so the temp files are not committed
path = f"tests/test_samples/{fname}_apply.yaml"
with open(path, "w") as outfile:
yaml.dump(data, outfile, default_flow_style=False)
return path
12 changes: 12 additions & 0 deletions tests/integration/commands/test_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import confluent_kafka
from click.testing import CliRunner

from esque.cli.commands import create_topic


def test_create(cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str):

cli_runner.invoke(create_topic, [topic_id])

topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic_id not in topics
52 changes: 52 additions & 0 deletions tests/integration/commands/test_deletion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import confluent_kafka
import pytest
from click.testing import CliRunner

from esque.cli.commands import delete_topic


@pytest.fixture()
def basic_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic-topic")


@pytest.fixture()
def duplicate_topic(num_partitions, topic_factory):
yield from topic_factory(num_partitions, "basic.topic")


@pytest.mark.integration
def test_topic_deletion_works(
cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic: str
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic in topics

result = cli_runner.invoke(delete_topic, [topic], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert topic not in topics


@pytest.mark.integration
def test_keep_kafka_duplicated(
cli_runner: CliRunner,
confluent_admin_client: confluent_kafka.admin.AdminClient,
basic_topic: str,
duplicate_topic: str,
):
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert basic_topic[0] in topics
assert duplicate_topic[0] in topics

result = cli_runner.invoke(delete_topic, [duplicate_topic[0]], input="y\n")
assert result.exit_code == 0

# Invalidate cache
confluent_admin_client.poll(timeout=1)
topics = confluent_admin_client.list_topics(timeout=5).topics.keys()
assert duplicate_topic[0] not in topics
assert basic_topic[0] in topics
9 changes: 9 additions & 0 deletions tests/integration/commands/test_describe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from click.testing import CliRunner

from esque.cli.commands import describe_topic


def test_smoke_test_describe_topic(cli_runner: CliRunner, topic: str):
result = cli_runner.invoke(describe_topic, [topic])

assert result.exit_code == 0
9 changes: 9 additions & 0 deletions tests/integration/commands/test_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from click.testing import CliRunner

from esque.cli.commands import get_topics


def test_smoke_test_get_topics(cli_runner: CliRunner):
result = cli_runner.invoke(get_topics)

assert result.exit_code == 0
25 changes: 25 additions & 0 deletions tests/integration/commands/test_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import confluent_kafka
from click.testing import CliRunner

from esque import config
from esque.cli.commands import ping
from esque.topic_controller import TopicController


def test_smoke_test_ping(cli_runner: CliRunner):
result = cli_runner.invoke(ping)

assert result.exit_code == 0


def test_correct_amount_of_messages(
mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str
):
config.RANDOM = "test"

topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock())

result = cli_runner.invoke(ping)

assert result.exit_code == 0
assert topic_controller_delete_topic.call_count == 1
Loading

0 comments on commit de41915

Please sign in to comment.