Skip to content

Commit

Permalink
Makes retrieval of timestamp optional
Browse files Browse the repository at this point in the history
  • Loading branch information
hfjn committed Jun 22, 2020
1 parent 399ae58 commit bca2a8f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
12 changes: 10 additions & 2 deletions esque/cli/commands.py
Expand Up @@ -486,15 +486,23 @@ def apply(state: State, file: str):
help="Will output the consumer groups reading from this topic."
f" {red_bold('Beware! This can be a really expensive operation.')}",
)
@click.option(
"--last-timestamp",
required=False,
is_flag=True,
default=False,
help="Will output the last message's timestamp for each partition"
f" {red_bold('Beware! This can be a really expensive operation.')}",
)
@output_format_option
@default_options
def describe_topic(state: State, topic_name: str, consumers: bool, output_format: str):
def describe_topic(state: State, topic_name: str, consumers: bool, last_timestamp: bool, output_format: str):
"""Describe a topic.
Returns information on a given topic and its partitions, with the option of including
all consumer groups that read from the topic.
"""
topic = state.cluster.topic_controller.get_cluster_topic(topic_name)
topic = state.cluster.topic_controller.get_cluster_topic(topic_name, retrieve_last_timestamp=last_timestamp)

output_dict = {
"topic": topic_name,
Expand Down
13 changes: 8 additions & 5 deletions esque/controller/topic_controller.py
Expand Up @@ -117,9 +117,9 @@ def delete_topic(self, topic: Topic):
future = self.cluster.confluent_client.delete_topics([topic.name])[topic.name]
ensure_kafka_future_done(future)

def get_cluster_topic(self, topic_name: str) -> Topic:
def get_cluster_topic(self, topic_name: str, *, retrieve_last_timestamp: bool = False) -> Topic:
"""Convenience function getting an existing topic based on topic_name"""
return self.update_from_cluster(Topic(topic_name))
return self.update_from_cluster(Topic(topic_name), retrieve_last_timestamp=retrieve_last_timestamp)

def get_local_topic(self, topic_name: str) -> Topic:
return Topic(topic_name)
Expand All @@ -140,15 +140,17 @@ def get_offsets_closest_to_timestamp(
topic_partition.partition: topic_partition.offset for topic_partition in topic_partitions_with_new_offsets
}

def update_from_cluster(self, topic: Topic) -> Topic:
def update_from_cluster(self, topic: Topic, *, retrieve_last_timestamp: bool = False) -> Topic:
"""Takes a topic and, based on its name, updates all attributes from the cluster"""

confluent_topic: ConfluentTopic = self._get_client_topic(topic.name, ClientTypes.Confluent)
pykafka_topic: PyKafkaTopic = self._get_client_topic(topic.name, ClientTypes.PyKafka)
low_watermarks = pykafka_topic.earliest_available_offsets()
high_watermarks = pykafka_topic.latest_available_offsets()

topic.partition_data = self._get_partition_data(confluent_topic, low_watermarks, high_watermarks, topic)
topic.partition_data = self._get_partition_data(
confluent_topic, low_watermarks, high_watermarks, topic, retrieve_last_timestamp
)
topic.config = self.cluster.retrieve_config(ConfigResource.Type.TOPIC, topic.name)

topic.is_only_local = False
Expand All @@ -161,6 +163,7 @@ def _get_partition_data(
low_watermarks: PartitionInfo,
high_watermarks: PartitionInfo,
topic: Topic,
retrieve_last_timestamp: bool,
) -> List[Partition]:

consumer = MessageConsumer(PING_GROUP_ID, topic.name, True)
Expand All @@ -170,7 +173,7 @@ def _get_partition_data(
low = int(low_watermarks[partition_id].offset[0])
high = int(high_watermarks[partition_id].offset[0])
latest_timestamp = None
if high > low:
if high > low and retrieve_last_timestamp:
try:
latest_timestamp = float(consumer.consume(high - 1, partition_id).timestamp()[1]) / 1000
except MessageEmptyException:
Expand Down

0 comments on commit bca2a8f

Please sign in to comment.