Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6549 Fix kafka consumer initial seek position #6712

Merged
merged 5 commits into from
Jul 11, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,21 @@
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Thread.sleep;


/**
Expand All @@ -52,7 +43,10 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
private final long maxMsBetweenPolls;


// Keep track of when an initial rebalance is done
private static boolean initialPartitionAssignment = true;
planetf1 marked this conversation as resolved.
Show resolved Hide resolved


//If we get close enough to the consumer timeout timestamp, force a poll so that
//we do not exceed the timeout. This parameter controls how close we can get
Expand All @@ -68,6 +62,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private final AtomicBoolean running = new AtomicBoolean(true);

private final boolean isAutoCommitEnabled;
private final long startTime = System.currentTimeMillis();

/**
* Constructor for the event consumer.
Expand Down Expand Up @@ -112,6 +107,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
this.messageProcessingStatusCheckIntervalMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
long messageTimeoutMins = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
this.messageProcessingTimeoutMs = messageTimeoutMins < 0 ? messageTimeoutMins : TimeUnit.MILLISECONDS.convert(messageTimeoutMins, TimeUnit.MINUTES);

}


Expand All @@ -135,8 +131,7 @@ private void updateNextMaxPollTimestamp() {
public void run()
{
final String actionDescription = "run";
KafkaOpenMetadataTopicConnectorAuditCode auditCode;


while (isRunning())
{
try
Expand Down Expand Up @@ -452,7 +447,7 @@ private int getNumberOfUnprocessedMessages() {
private void awaitNextPollingTime() {
try
{
Thread.sleep(1000);
sleep(1000);
}
catch (InterruptedException e)
{
Expand All @@ -468,7 +463,7 @@ private void recoverAfterError()

try
{
Thread.sleep(recoverySleepTimeSec * 1000L);
sleep(recoverySleepTimeSec * 1000L);
}
catch (InterruptedException e1)
{
Expand Down Expand Up @@ -514,15 +509,52 @@ private void stopRunning()
}


private class HandleRebalance implements ConsumerRebalanceListener
{
private class HandleRebalance implements ConsumerRebalanceListener {
AuditLog auditLog = null;

public HandleRebalance(AuditLog auditLog) {
this.auditLog = auditLog;
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved

// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial rebalance event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received rebalance event with " + partitionCount + " partitions. This is not supported.");
planetf1 marked this conversation as resolved.
Show resolved Hide resolved
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
planetf1 marked this conversation as resolved.
Show resolved Hide resolved
TopicPartition partition = partitions.iterator().next();

// query offset by timestamp (when we started connector)
OffsetAndTimestamp otByStartTime = consumer.offsetsForTimes(Collections.singletonMap(partition,
KafkaOpenMetadataEventConsumer.this.startTime)).get(partition);
long maxOffsetWanted = otByStartTime.offset();

// get the current offset
long currentOffset = consumer.position(partition);

// if the current offset is later than the start time we want, rewind to the start time
if (currentOffset > maxOffsetWanted) {

log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
consumer.seek(partition, maxOffsetWanted);
} else
log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
}
}
} catch (Exception e) {
// We leave the offset as-is if anything goes wrong. Eventually other messages will cause the effective state to be updated
log.info("Error correcting seek position, continuing with defaults", e);
}
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions)
Expand Down