Skip to content

Commit

Permalink
Use offsets_for_times method
Browse files Browse the repository at this point in the history
  • Loading branch information
Bibob7 committed Dec 13, 2019
1 parent 0dd05a9 commit 6445ea6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 47 deletions.
12 changes: 12 additions & 0 deletions esque/clients/consumer.py
Expand Up @@ -13,6 +13,7 @@
from esque.helpers import log_error
from esque.messages.avromessage import AvroFileWriter, StdOutAvroWriter
from esque.messages.message import FileWriter, GenericWriter, PlainTextFileWriter, StdOutWriter, decode_message
from esque.resources.topic import Topic
from esque.ruleparser.ruleengine import RuleTree


Expand Down Expand Up @@ -372,6 +373,17 @@ def consume_to_file_ordered(
return total_number_of_messages


def offsets_for_timestamp(group_id: str, topic: Topic, timestamp_seconds: int) -> Dict[int, int]:
config = Config.get_instance().create_confluent_config()
config.update({"group.id": group_id})
consumer = confluent_kafka.Consumer(config)
topic_partitions_with_timestamp = [
TopicPartition(topic.name, partition.partition_id, timestamp_seconds * 1000) for partition in topic.partitions
]
topic_partitions_with_new_offsets = consumer.offsets_for_times(topic_partitions_with_timestamp)
return {topic_partition.partition: topic_partition.offset for topic_partition in topic_partitions_with_new_offsets}


def consume_to_files(
output_directory: pathlib.Path,
topic: str,
Expand Down
6 changes: 3 additions & 3 deletions esque/controller/consumergroup_controller.py
Expand Up @@ -112,11 +112,11 @@ def create_consumer_group_offset_change_plan(
self._logger.error(message)
elif offset_to_timestamp:
timestamp_limit = pendulum.parse(offset_to_timestamp)
current_offset_dict = TopicController(self.cluster, None).get_offsets_closest_to_timestamp(
topic_name=topic_name, timestamp_limit=timestamp_limit
proposed_offset_dict = TopicController(self.cluster, None).get_offsets_closest_to_timestamp(
group_id=consumer_id, topic_name=topic_name, timestamp_limit=timestamp_limit
)
for plan_element in offset_plans.values():
plan_element.current_offset = current_offset_dict.get(plan_element.partition_id, 0)
plan_element.proposed_offset = proposed_offset_dict.get(plan_element.partition_id, 0)
elif offset_from_group:
_, mirror_consumer_group = self._read_current_consumergroup_offsets(
consumer_id=offset_from_group, topic_name_expression=topic_name
Expand Down
51 changes: 7 additions & 44 deletions esque/controller/topic_controller.py
@@ -1,8 +1,5 @@
import datetime
import logging
import re
import time
from datetime import timezone
from enum import Enum
from itertools import islice
from logging import Logger
Expand All @@ -15,9 +12,9 @@
from confluent_kafka.cimpl import NewTopic
from pykafka.topic import Topic as PyKafkaTopic

from esque.clients.consumer import ConsumerFactory, MessageConsumer
from esque.clients.consumer import MessageConsumer, offsets_for_timestamp
from esque.config import PING_GROUP_ID, Config
from esque.errors import EndOfPartitionReachedException, MessageEmptyException, raise_for_kafka_error
from esque.errors import MessageEmptyException, raise_for_kafka_error
from esque.helpers import ensure_kafka_future_done, invalidate_cache_after
from esque.resources.topic import Partition, PartitionInfo, Topic, TopicDiff

Expand Down Expand Up @@ -114,46 +111,12 @@ def get_cluster_topic(self, topic_name: str) -> Topic:
def get_local_topic(self, topic_name: str) -> Topic:
return Topic(topic_name)

def get_offsets_closest_to_timestamp(self, topic_name: str, timestamp_limit: pendulum) -> Dict[int, int]:
def get_offsets_closest_to_timestamp(
self, group_id: str, topic_name: str, timestamp_limit: pendulum
) -> Dict[int, int]:
topic = self.get_cluster_topic(topic_name=topic_name)
partition_offsets = {partition.partition_id: 0 for partition in topic.partitions}
consumers = []
factory = ConsumerFactory()
partitions = partition_offsets.keys()
group_id = "group_for_" + topic_name + "_" + str(int(round(time.time() * 1000)))
for partition in partitions:
consumer = factory.create_consumer(
group_id=group_id,
topic_name=None,
output_directory=None,
avro=False,
match=None,
last=False,
initialize_default_output_directory=False,
)
consumer.assign_specific_partitions(topic_name, [partition])
consumers.append(consumer)

for partition_counter in range(0, len(consumers)):
max_retry_count = 5
keep_polling_current_partition = True
while keep_polling_current_partition:
try:
message = consumers[partition_counter].consume_single_message(timeout=10)
except MessageEmptyException:
# a possible timeout due to a network issue, retry (but not more than max_retry_count attempts)
max_retry_count -= 1
if max_retry_count <= 0:
keep_polling_current_partition = False
except EndOfPartitionReachedException:
keep_polling_current_partition = False
else:
if (
datetime.datetime.fromtimestamp(int(message.timestamp()[1] / 1000.0), timezone.utc)
< timestamp_limit
):
partition_offsets[message.partition()] = message.offset() + 1
return partition_offsets
offset_for_timestamp = offsets_for_timestamp(group_id, topic, timestamp_limit.int_timestamp)
return offset_for_timestamp

def update_from_cluster(self, topic: Topic):
"""Takes a topic and, based on its name, updates all attributes from the cluster"""
Expand Down

0 comments on commit 6445ea6

Please sign in to comment.