Skip to content

Commit

Permalink
Merge 91c43ee into d826ab6
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettthomaskth committed Nov 12, 2019
2 parents d826ab6 + 91c43ee commit 09cdfd3
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 11 deletions.
10 changes: 5 additions & 5 deletions esque/cli/commands.py
Expand Up @@ -321,17 +321,17 @@ def describe_topic(state: State, topic_name: str, consumers: bool, output_format
click.echo(format_output(output_dict, output_format))


@get.command("offsets")
@get.command("watermarks")
@click.option("-t", "--topic-name", required=False, type=click.STRING, autocompletion=list_topics)
@output_format_option
@default_options
def get_offsets(state: State, topic_name: str, output_format: str):
# TODO: Gathering of all offsets takes super long
def get_watermarks(state: State, topic_name: str, output_format: str):
# TODO: Gathering of all watermarks takes super long
topics = state.cluster.topic_controller.list_topics(search_string=topic_name)

offsets = {topic.name: max(v for v in topic.offsets.values()) for topic in topics}
watermarks = {topic.name: max(v for v in topic.watermarks.values()) for topic in topics}

click.echo(format_output(offsets, output_format))
click.echo(format_output(watermarks, output_format))


@describe.command("broker")
Expand Down
2 changes: 1 addition & 1 deletion esque/resources/consumergroup.py
Expand Up @@ -85,7 +85,7 @@ def _get_consumer_offsets(self, consumer_id, verbose: bool):
}
return consumer_offsets
for topic in consumer_offsets.keys():
topic_offsets = self.topic_controller.get_cluster_topic(topic).offsets
topic_offsets = self.topic_controller.get_cluster_topic(topic).watermarks
new_consumer_offsets = {
"consumer_offset": (float("inf"), float("-inf")),
"topic_low_watermark": (float("inf"), float("-inf")),
Expand Down
2 changes: 1 addition & 1 deletion esque/resources/topic.py
Expand Up @@ -155,7 +155,7 @@ def partitions(self) -> List[Partition]:
return self.partition_data

@property
def offsets(self) -> Dict[int, Watermark]:
def watermarks(self) -> Dict[int, Watermark]:
"""
Returns the low and high watermark for each partition in a topic
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/commands/test_ping.py
Expand Up @@ -21,4 +21,4 @@ def test_correct_amount_of_messages(mocker, non_interactive_cli_runner: CliRunne
assert topic_controller_delete_topic.call_count == 1

ping_topic = topic_controller.get_cluster_topic(config.PING_TOPIC)
assert ping_topic.offsets[0].high == 10
assert ping_topic.watermarks[0].high == 10
4 changes: 2 additions & 2 deletions tests/integration/test_topic.py
Expand Up @@ -5,9 +5,9 @@


@pytest.mark.integration
def test_offsets(filled_topic: Topic, topic_controller: TopicController):
def test_watermarks(filled_topic: Topic, topic_controller: TopicController):
topic_controller.update_from_cluster(filled_topic)
assert filled_topic.offsets == {0: Watermark(10, 0)}
assert filled_topic.watermarks == {0: Watermark(10, 0)}


@pytest.mark.integration
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_topic_controller.py
Expand Up @@ -65,7 +65,7 @@ def test_topic_listing_works(topic_controller: TopicController, topic: str):
def test_topic_object_works(topic_controller: TopicController, topic: str):
topic = topic_controller.get_cluster_topic(topic)
assert isinstance(topic, Topic)
assert len(topic.offsets) != 0
assert len(topic.watermarks) != 0


@pytest.mark.integration
Expand Down

0 comments on commit 09cdfd3

Please sign in to comment.