Skip to content

Commit

Permalink
minor restructuring of ping command code
Browse files Browse the repository at this point in the history
  • Loading branch information
Swen committed Oct 1, 2021
1 parent 14e3fcf commit 7eb5948
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions esque/cli/commands/ping.py
Expand Up @@ -2,7 +2,7 @@
import time
import uuid
from time import sleep
from typing import List
from typing import Callable, List

import click

Expand Down Expand Up @@ -32,12 +32,6 @@ def ping(state: State, times: int, wait: int):
s2c: server to client (time from kafka write to disk till client received it again)
c2c: client to client (complete round trip)
"""
input_handler = KafkaHandler(
KafkaHandlerConfig(scheme="kafka", host=state.config.current_context, path=PING_TOPIC)
)
output_handler = KafkaHandler(
KafkaHandlerConfig(scheme="kafka", host=state.config.current_context, path=PING_TOPIC)
)
topic_controller = state.cluster.topic_controller

if not topic_controller.topic_exists(PING_TOPIC):
Expand All @@ -54,24 +48,26 @@ def ping(state: State, times: int, wait: int):
click.echo(click.style("Aborted!", bg="red"))
return

deltas = []

ping_id = uuid.uuid4().bytes

def matches_ping_id(msg: BinaryMessage) -> bool:
return msg.key == ping_id
click.echo("Initializing producer.")
output_handler = KafkaHandler(
KafkaHandlerConfig(scheme="kafka", host=state.config.current_context, path=PING_TOPIC)
)
output_handler.write_message(create_tombstone_message(ping_id))

input_stream = filter(matches_ping_id, skip_stream_events(input_handler.message_stream()))
input_handler = KafkaHandler(
KafkaHandlerConfig(scheme="kafka", host=state.config.current_context, path=PING_TOPIC)
)
input_stream = filter(key_matches(ping_id), skip_stream_events(input_handler.message_stream()))
message_iterator = iter(input_stream)

click.echo("Initializing producer.")
output_handler.write_message(create_tombstone_message(ping_id)) # initialize producer

click.echo("Initializing consumer.")
input_handler.seek(KafkaHandler.OFFSET_AT_LAST_MESSAGE)
next(message_iterator)

click.echo(f"Pinging cluster with bootstrap servers {state.cluster.bootstrap_servers}.")
deltas = []
try:
for i in range(times):
output_handler.write_message(create_ping_message(ping_id))
Expand Down Expand Up @@ -105,6 +101,13 @@ def matches_ping_id(msg: BinaryMessage) -> bool:
click.echo(f"c2c {stats(c2c_times)}")


def key_matches(ping_id: bytes) -> Callable[[BinaryMessage], bool]:
def matcher(msg: BinaryMessage) -> bool:
return msg.key == ping_id

return matcher


def stats(deltas: List[int]) -> str:
return f"min/avg/max = {min(deltas):.2f}/{(sum(deltas) / len(deltas)):.2f}/{max(deltas):.2f} ms"

Expand Down

0 comments on commit 7eb5948

Please sign in to comment.