Skip to content

Commit

Permalink
#6549 update after review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Nigel Jones <nigel.l.jones+git@gmail.com>
  • Loading branch information
planetf1 committed Jul 8, 2022
1 parent c903a6c commit a21d00d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.*;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Thread.sleep;
import static java.lang.Thread.sleep;


/**
Expand All @@ -43,10 +43,10 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
private final long maxMsBetweenPolls;

// Keep track of when an initial rebalance is done
private static boolean initialPartitionAssignment = true;


// Keep track of when an initial rebalance is done
private boolean initialPartitionAssignment = true;


//If we get close enough to the consumer timeout timestamp, force a poll so that
//we do not exceed the timeout. This parameter controls how close we can get
Expand All @@ -62,7 +62,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
private final AtomicBoolean running = new AtomicBoolean(true);

private final boolean isAutoCommitEnabled;
private final long startTime = System.currentTimeMillis();
private final long startTime = System.currentTimeMillis();

/**
* Constructor for the event consumer.
Expand Down Expand Up @@ -107,7 +107,7 @@ public class KafkaOpenMetadataEventConsumer implements Runnable
this.messageProcessingStatusCheckIntervalMs = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
long messageTimeoutMins = config.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
this.messageProcessingTimeoutMs = messageTimeoutMins < 0 ? messageTimeoutMins : TimeUnit.MILLISECONDS.convert(messageTimeoutMins, TimeUnit.MINUTES);


}


Expand All @@ -131,7 +131,7 @@ private void updateNextMaxPollTimestamp() {
public void run()
{
final String actionDescription = "run";


while (isRunning())
{
try
Expand Down Expand Up @@ -447,7 +447,7 @@ private int getNumberOfUnprocessedMessages() {
private void awaitNextPollingTime() {
try
{
sleep(1000);
sleep(1000);
}
catch (InterruptedException e)
{
Expand All @@ -463,7 +463,7 @@ private void recoverAfterError()

try
{
sleep(recoverySleepTimeSec * 1000L);
sleep(recoverySleepTimeSec * 1000L);
}
catch (InterruptedException e1)
{
Expand Down Expand Up @@ -509,54 +509,57 @@ private void stopRunning()
}


private class HandleRebalance implements ConsumerRebalanceListener {
private class HandleRebalance implements ConsumerRebalanceListener {
AuditLog auditLog = null;


@Override
public HandleRebalance(AuditLog auditLog) {
this.auditLog = auditLog;
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial rebalance event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received rebalance event with " + partitionCount + " partitions. This is not supported.");
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
TopicPartition partition = partitions.iterator().next();

// query offset by timestamp (when we started connector)
OffsetAndTimestamp otByStartTime = consumer.offsetsForTimes(Collections.singletonMap(partition,
KafkaOpenMetadataEventConsumer.this.startTime)).get(partition);
long maxOffsetWanted = otByStartTime.offset();

// get the current offset
long currentOffset = consumer.position(partition);

// if the current offset is later than the start time we want, rewind to the start time
if (currentOffset > maxOffsetWanted) {

log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
consumer.seek(partition, maxOffsetWanted);
} else
log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
}
}
} catch (Exception e) {
// We leave the offset as-is if anything goes wrong. Eventually other messages will cause the effective state to be updated
log.info("Error correcting seek position, continuing with defaults", e);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial rebalance event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received rebalance event with " + partitionCount + " partitions. This is not supported.");
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
TopicPartition partition = partitions.iterator().next();

// query offset by timestamp (when we started connector)
OffsetAndTimestamp otByStartTime = consumer.offsetsForTimes(Collections.singletonMap(partition,
KafkaOpenMetadataEventConsumer.this.startTime)).get(partition);
long maxOffsetWanted = otByStartTime.offset();

// get the current offset
long currentOffset = consumer.position(partition);

// if the current offset is later than the start time we want, rewind to the start time
if (currentOffset > maxOffsetWanted) {

log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
consumer.seek(partition, maxOffsetWanted);
} else
log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", maxOffsetWanted, partition.partition(),
partition.topic(), currentOffset);
}
}
} catch (Exception e) {
// We leave the offset as-is if anything goes wrong. Eventually other messages will cause the effective state to be updated
log.info("Error correcting seek position, continuing with defaults", e);
}
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
final String methodName = "onPartitionsRevoked.commitSync";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class KafkaOpenMetadataTopicConnector extends OpenMetadataTopicConnector
private String topicName = null;
private String serverId = null;

// Start time needed for setting topic offset
private final long connectorStartTime = System.currentTimeMillis();

/* this buffer is for consumed events */
private final List<IncomingEvent> incomingEventsList = Collections.synchronizedList(new ArrayList<>());

Expand Down

0 comments on commit a21d00d

Please sign in to comment.