Skip to content

Commit

Permalink
#6549 additional debug to prove correct behaviour of offset
Browse files Browse the repository at this point in the history
Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
  • Loading branch information
planetf1 committed Jul 11, 2022
1 parent f669a4d commit b6efea7
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received PartitionsAssigned event with " + partitionCount + " partitions. This is not supported.");
log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
Expand All @@ -536,13 +536,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
TopicPartition partition = partitions.iterator().next();

// query offset by timestamp (when we started connector) - NULL if there are no messages later than this offset
long reqStartTime=KafkaOpenMetadataEventConsumer.this.startTime;
log.info("Querying for offset by timestamp: {}",reqStartTime);
OffsetAndTimestamp otByStartTime = consumer.offsetsForTimes(Collections.singletonMap(partition,
KafkaOpenMetadataEventConsumer.this.startTime)).get(partition);
reqStartTime)).get(partition);

// If null, then we don't have any earlier messages - ie there is no offset found
if (otByStartTime != null) {
// where we want to scoll to - the messages sent since we thought we started
maxOffsetWanted = otByStartTime.offset();
log.info("Earliest offset found for {} is {}",reqStartTime,otByStartTime.timestamp());

// get the current offset
long currentOffset = consumer.position(partition);
Expand Down

0 comments on commit b6efea7

Please sign in to comment.