Skip to content

Commit

Permalink
Merge pull request #6712 from planetf1/issue6549a
Browse files Browse the repository at this point in the history
#6549 Fix kafka consumer initial seek position
  • Loading branch information
planetf1 authored Jul 11, 2022
2 parents fc52eb0 + f3128ce commit 64fd229
Showing 1 changed file with 67 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,21 @@
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Thread.sleep;


/**
Expand All @@ -52,7 +43,10 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
private final long maxMsBetweenPolls;


// Keep track of when an initial rebalance is done
private boolean initialPartitionAssignment = true;


//If we get close enough to the consumer timeout timestamp, force a poll so that
//we do not exceed the timeout. This parameter controls how close we can get
Expand All @@ -68,6 +62,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private final AtomicBoolean running = new AtomicBoolean(true);

private final boolean isAutoCommitEnabled;
private final long startTime = System.currentTimeMillis();

/**
* Constructor for the event consumer.
Expand Down Expand Up @@ -112,6 +107,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
this.messageProcessingStatusCheckIntervalMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
long messageTimeoutMins = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
this.messageProcessingTimeoutMs = messageTimeoutMins < 0 ? messageTimeoutMins : TimeUnit.MILLISECONDS.convert(messageTimeoutMins, TimeUnit.MINUTES);

}


Expand All @@ -135,8 +131,7 @@ private void updateNextMaxPollTimestamp() {
public void run()
{
final String actionDescription = "run";
KafkaOpenMetadataTopicConnectorAuditCode auditCode;


while (isRunning())
{
try
Expand Down Expand Up @@ -452,7 +447,7 @@ private int getNumberOfUnprocessedMessages() {
private void awaitNextPollingTime() {
try
{
Thread.sleep(1000);
sleep(1000);
}
catch (InterruptedException e)
{
Expand All @@ -468,7 +463,7 @@ private void recoverAfterError()

try
{
Thread.sleep(recoverySleepTimeSec * 1000L);
sleep(recoverySleepTimeSec * 1000L);
}
catch (InterruptedException e1)
{
Expand Down Expand Up @@ -514,17 +509,68 @@ private void stopRunning()
}


private class HandleRebalance implements ConsumerRebalanceListener
{
private class HandleRebalance implements ConsumerRebalanceListener {
AuditLog auditLog = null;

public HandleRebalance(AuditLog auditLog) {
this.auditLog = auditLog;
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial PartitionsAssigned event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
long maxOffsetWanted = 0; // same as 'beginning'

TopicPartition partition = partitions.iterator().next();

// query offset by timestamp (when we started connector) - NULL if there are no messages later than this offset
long reqStartTime=KafkaOpenMetadataEventConsumer.this.startTime;
log.info("Querying for offset by timestamp: {}",reqStartTime);
OffsetAndTimestamp otByStartTime = consumer.offsetsForTimes(Collections.singletonMap(partition,
reqStartTime)).get(partition);

// If null, then we don't have any earlier messages - ie there is no offset found
if (otByStartTime != null) {
// where we want to scoll to - the messages sent since we thought we started
maxOffsetWanted = otByStartTime.offset();
log.info("Earliest offset found for {} is {}",reqStartTime,otByStartTime.timestamp());

// get the current offset
long currentOffset = consumer.position(partition);

// if the current offset is later than the start time we want, rewind to the start time
if (currentOffset > maxOffsetWanted) {

log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
consumer.seek(partition, maxOffsetWanted);
} else
log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
}
else
log.info("No missed events found for partition {} and topic {}", partition.partition(), partition.topic());
}
}
} catch (Exception e) {
// We leave the offset as-is if anything goes wrong. Eventually other messages will cause the effective state to be updated
log.info("Error correcting seek position, continuing with defaults", e);
}
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
final String methodName = "onPartitionsRevoked.commitSync";
Expand Down

0 comments on commit 64fd229

Please sign in to comment.