From 0dd05a9dbe84f338c282611505152a5d7bdc50ae Mon Sep 17 00:00:00 2001 From: Kevin K Date: Mon, 25 Nov 2019 20:55:35 +0100 Subject: [PATCH] Use default timeout for consume (#126) --- esque/clients/consumer.py | 2 +- esque/controller/topic_controller.py | 14 +++++++++++--- esque/resources/topic.py | 5 ++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/esque/clients/consumer.py b/esque/clients/consumer.py index 15860451..6badd61c 100644 --- a/esque/clients/consumer.py +++ b/esque/clients/consumer.py @@ -121,7 +121,7 @@ def create_internal_consumer(self): def consume(self, offset: int = 0, partition: int = 0) -> Message: self.assign_specific_partitions(self._topic_name, partitions=[partition], offset=offset) - return self.consume_single_message(timeout=1) + return self.consume_single_message(30) class PingConsumer(AbstractConsumer): diff --git a/esque/controller/topic_controller.py b/esque/controller/topic_controller.py index 0bc1a553..2d8444d4 100644 --- a/esque/controller/topic_controller.py +++ b/esque/controller/topic_controller.py @@ -1,9 +1,11 @@ import datetime +import logging import re import time from datetime import timezone from enum import Enum from itertools import islice +from logging import Logger from typing import TYPE_CHECKING, Dict, List, Union import pendulum @@ -19,6 +21,8 @@ from esque.helpers import ensure_kafka_future_done, invalidate_cache_after from esque.resources.topic import Partition, PartitionInfo, Topic, TopicDiff +logger: Logger = logging.getLogger(__name__) + if TYPE_CHECKING: from esque.cluster import Cluster @@ -180,10 +184,14 @@ def _get_partition_data( for partition_id, meta in confluent_topic.partitions.items(): low = int(low_watermarks[partition_id].offset[0]) high = int(high_watermarks[partition_id].offset[0]) + latest_timestamp = None if high > low: - latest_timestamp = float(consumer.consume(high - 1, partition_id).timestamp()[1]) / 1000 - else: - latest_timestamp = None + try: + latest_timestamp = float(consumer.consume(high - 1, partition_id).timestamp()[1]) / 1000 + except MessageEmptyException: + logger.warning( + f"Due to timeout latest timestamp for topic `{topic.name}` and partition `{partition_id}` is missing." + ) partition = Partition(partition_id, low, high, meta.isrs, meta.leader, meta.replicas, latest_timestamp) partitions.append(partition) diff --git a/esque/resources/topic.py b/esque/resources/topic.py index d119e872..6cc23d62 100644 --- a/esque/resources/topic.py +++ b/esque/resources/topic.py @@ -30,9 +30,8 @@ def __init__( self.partition_isrs = partition_isrs self.partition_leader = partition_leader self.partition_replicas = partition_replicas - if latest_message_timestamp is None: - self.latest_message_timestamp = None - else: + self.latest_message_timestamp = None + if latest_message_timestamp is not None: self.latest_message_timestamp = pendulum.from_timestamp(latest_message_timestamp).to_datetime_string() def as_dict(self):