Skip to content

Commit

Permalink
Fixed ping command
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn committed Sep 18, 2019
1 parent 145af23 commit 6101675
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
4 changes: 2 additions & 2 deletions esque/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def ping(state, times, wait):
deltas = []
try:
try:
topic_controller.create_topics([topic_controller.get_cluster_topic(PING_TOPIC)])
topic_controller.create_topics([Topic(PING_TOPIC)])
except TopicAlreadyExistsException:
click.echo("Topic already exists.")

Expand All @@ -374,7 +374,7 @@ def ping(state, times, wait):

for i in range(times):
producer.produce(PING_TOPIC)
_, delta = consumer.consume()
_, delta = consumer.consume(1)
deltas.append(delta)
click.echo(f"m_seq={i} time={delta:.2f}ms")
sleep(wait)
Expand Down
33 changes: 29 additions & 4 deletions esque/clients.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import pathlib
from abc import ABC, abstractmethod
from contextlib import ExitStack
from glob import glob
from typing import Optional, Tuple
Expand All @@ -9,14 +10,14 @@
import pendulum
from confluent_kafka import Message
from confluent_kafka.avro import AvroProducer
from confluent_kafka.cimpl import TopicPartition

from esque.avromessage import AvroFileReader, AvroFileWriter
from esque.config import Config
from esque.errors import raise_for_kafka_error, raise_for_message, MessageEmptyException
from esque.errors import MessageEmptyException, raise_for_kafka_error, raise_for_message
from esque.helpers import delivery_callback, delta_t
from esque.message import KafkaMessage, PlainTextFileReader, PlainTextFileWriter, FileReader, FileWriter
from esque.message import FileReader, FileWriter, KafkaMessage, PlainTextFileReader, PlainTextFileWriter
from esque.schemaregistry import SchemaRegistryClient
from abc import ABC, abstractmethod


class AbstractConsumer(ABC):
Expand Down Expand Up @@ -56,8 +57,32 @@ def _consume_single_message(self, timeout=30) -> Optional[Message]:


class PingConsumer(AbstractConsumer):
def __init__(self, group_id: str, topic_name: str, last: bool):
offset_reset = "earliest"
if last:
offset_reset = "latest"
self._config = Config().create_confluent_config()
self._config.update(
{
"group.id": group_id,
"error_cb": raise_for_kafka_error,
# We need to commit offsets manually once we"re sure it got saved
# to the sink
"enable.auto.commit": True,
"enable.partition.eof": False,
# We need this to start at the last committed offset instead of the
# latest when subscribing for the first time
"default.topic.config": {"auto.offset.reset": offset_reset},
}
)
self._consumer = confluent_kafka.Consumer(self._config)
self._assign_exact_partitions(topic_name)

def _assign_exact_partitions(self, topic: str) -> None:
self._consumer.assign([TopicPartition(topic=topic, partition=0, offset=0)])

def consume(self, amount: int) -> Optional[Tuple[str, int]]:
message = self._consume_single_message()
message = self._consume_single_message(timeout=1)

msg_sent_at = pendulum.from_timestamp(float(message.value()))
delta_sent = pendulum.now() - msg_sent_at
Expand Down
25 changes: 25 additions & 0 deletions tests/integration/commands/test_ping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import confluent_kafka
from click.testing import CliRunner

from esque import config
from esque.cli.commands import ping
from esque.topic_controller import TopicController


def test_smoke_test_ping(cli_runner: CliRunner):
result = cli_runner.invoke(ping)

assert result.exit_code == 0


def test_correct_amount_of_messages(
mocker, cli_runner: CliRunner, confluent_admin_client: confluent_kafka.admin.AdminClient, topic_id: str
):
config.RANDOM = "test"

topic_controller_delete_topic = mocker.patch.object(TopicController, "delete_topic", mocker.Mock())

result = cli_runner.invoke(ping)

assert result.exit_code == 0
assert topic_controller_delete_topic.call_count == 1

0 comments on commit 6101675

Please sign in to comment.