Skip to content

Commit

Permalink
Use default timeout for consume (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibob7 authored and swenzel committed Nov 25, 2019
1 parent 9b6260b commit 0dd05a9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion esque/clients/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def create_internal_consumer(self):

def consume(self, offset: int = 0, partition: int = 0) -> Message:
self.assign_specific_partitions(self._topic_name, partitions=[partition], offset=offset)
return self.consume_single_message(timeout=1)
return self.consume_single_message(30)


class PingConsumer(AbstractConsumer):
Expand Down
14 changes: 11 additions & 3 deletions esque/controller/topic_controller.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import datetime
import logging
import re
import time
from datetime import timezone
from enum import Enum
from itertools import islice
from logging import Logger
from typing import TYPE_CHECKING, Dict, List, Union

import pendulum
Expand All @@ -19,6 +21,8 @@
from esque.helpers import ensure_kafka_future_done, invalidate_cache_after
from esque.resources.topic import Partition, PartitionInfo, Topic, TopicDiff

logger: Logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from esque.cluster import Cluster

Expand Down Expand Up @@ -180,10 +184,14 @@ def _get_partition_data(
for partition_id, meta in confluent_topic.partitions.items():
low = int(low_watermarks[partition_id].offset[0])
high = int(high_watermarks[partition_id].offset[0])
latest_timestamp = None
if high > low:
latest_timestamp = float(consumer.consume(high - 1, partition_id).timestamp()[1]) / 1000
else:
latest_timestamp = None
try:
latest_timestamp = float(consumer.consume(high - 1, partition_id).timestamp()[1]) / 1000
except MessageEmptyException:
logger.warning(
f"Due to timeout latest timestamp for topic `{topic.name}` and partition `{partition_id}` is missing."
)
partition = Partition(partition_id, low, high, meta.isrs, meta.leader, meta.replicas, latest_timestamp)
partitions.append(partition)

Expand Down
5 changes: 2 additions & 3 deletions esque/resources/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ def __init__(
self.partition_isrs = partition_isrs
self.partition_leader = partition_leader
self.partition_replicas = partition_replicas
if latest_message_timestamp is None:
self.latest_message_timestamp = None
else:
self.latest_message_timestamp = None
if latest_message_timestamp is not None:
self.latest_message_timestamp = pendulum.from_timestamp(latest_message_timestamp).to_datetime_string()

def as_dict(self):
Expand Down

0 comments on commit 0dd05a9

Please sign in to comment.