Skip to content

Commit

Permalink
try to make sure consumer knows about the topic whose offets_for_time…
Browse files Browse the repository at this point in the history
…s are being queried
  • Loading branch information
Swen committed Mar 17, 2021
1 parent 19b8443 commit 40f3117
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions esque/controller/topic_controller.py
Expand Up @@ -127,15 +127,13 @@ def get_local_topic(self, topic_name: str) -> Topic:
def get_offsets_closest_to_timestamp(
self, group_id: str, topic_name: str, timestamp_limit: pendulum
) -> Dict[int, int]:
topic = self.get_cluster_topic(topic_name=topic_name)
config = Config.get_instance().create_confluent_config()
config.update({"group.id": group_id})
consumer = confluent_kafka.Consumer(config)
topic_data = consumer.list_topics(topic=topic_name).topics[topic_name]
topic_partitions_with_timestamp = [
TopicPartition(
topic=topic.name, partition=partition.partition_id, offset=timestamp_limit.int_timestamp * 1000
)
for partition in topic.partitions
TopicPartition(topic=topic_name, partition=partition_id, offset=timestamp_limit.int_timestamp * 1000)
for partition_id in topic_data.partitions.keys()
]
topic_partitions_with_new_offsets = consumer.offsets_for_times(topic_partitions_with_timestamp)
return {
Expand Down

0 comments on commit 40f3117

Please sign in to comment.