Skip to content

Commit

Permalink
Kafka upgrade (#881)
Browse files Browse the repository at this point in the history
* Kafka upgrade

* Kafka upgrade

* Removing temp version

* Addressing comments: removing flag update after close to prevent threads misbehavior

* Removing dependency on li-apache-kafka-clients
  • Loading branch information
srinagaraj committed Feb 2, 2022
1 parent 2cd2b8b commit 4d867ef
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 237 deletions.
6 changes: 1 addition & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ project(':datastream-common') {
compile "com.linkedin.pegasus:restli-server:$pegasusVersion"
compile "com.intellij:annotations:$intellijAnnotationsVersion"
compile "com.google.guava:guava:$guavaVersion"
compile "com.linkedin.kafka.clients:li-apache-kafka-clients:$LIKafkaVersion"
compile "com.linkedin.kafka:kafka-clients:$kafkaVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
}
}
Expand All @@ -150,7 +150,6 @@ project(':datastream-server-api') {
dependencies {
compile project(':datastream-common')
compile project(':datastream-utils')
compile "com.linkedin.kafka.clients:li-apache-kafka-clients:$LIKafkaVersion"
}
}

Expand Down Expand Up @@ -189,7 +188,6 @@ project(':datastream-directory') {
project(":datastream-kafka_$scalaSuffix") {
dependencies {
compile "com.linkedin.kafka:kafka_$scalaSuffix:$kafkaVersion"
compile "com.linkedin.kafka.clients:li-apache-kafka-clients:$LIKafkaVersion"
compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"

compile project(':datastream-server')
Expand All @@ -214,7 +212,6 @@ project(":datastream-kafka_$scalaSuffix") {
project(':datastream-kafka-factory-impl') {
dependencies {
compile project(':datastream-kafka-connector')
compile "com.linkedin.kafka.clients:li-apache-kafka-clients:$LIKafkaVersion"

testCompile project(":datastream-kafka_$scalaSuffix")
}
Expand All @@ -225,7 +222,6 @@ project(':datastream-kafka-connector') {
compile project(':datastream-server-api')
compile project(':datastream-common')
compile project(":datastream-kafka_$scalaSuffix")
compile "com.linkedin.kafka.clients:li-apache-kafka-clients:$LIKafkaVersion"
compile "org.apache.httpcomponents:httpclient:$apacheHttpClientVersion"
compile "commons-validator:commons-validator:$commonsValidatorVersion"
compile "org.apache.commons:commons-lang3:$commonslang3Version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
// lifecycle
private volatile Thread _connectorTaskThread;
protected volatile boolean _shutdown = false;
private volatile boolean _failure = false;
protected volatile long _lastPolledTimeMillis = System.currentTimeMillis();
protected volatile long _lastPollCompletedTimeMillis = 0;
protected final CountDownLatch _startedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -303,6 +304,8 @@ protected void rewindAndPausePartitionOnException(TopicPartition srcTopicPartiti
} catch (Exception e) {
// Seek to last checkpoint failed. Throw an exception to avoid any data loss scenarios where the consumed
// offset can be committed even though the send for that offset has failed.
// This flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls
_failure = true;
String errorMessage = String.format("Partition rewind for %s failed due to ", srcTopicPartition);
throw new DatastreamRuntimeException(errorMessage, e);
}
Expand Down Expand Up @@ -766,7 +769,7 @@ protected void updateConsumerAssignment(Collection<TopicPartition> partitions) {
public void onPartitionsRevoked(Collection<TopicPartition> topicPartitions) {
_logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions);
_kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions);
if (!_shutdown && !topicPartitions.isEmpty()) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode.
try {
maybeCommitOffsets(_consumer, true); // happens inline as part of poll
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import com.linkedin.datastream.connectors.kafka.KafkaBasedConnectorConfig;
import com.linkedin.datastream.connectors.kafka.KafkaBasedConnectorConfigBuilder;
import com.linkedin.datastream.connectors.kafka.LiKafkaConsumerFactory;
import com.linkedin.datastream.connectors.kafka.NoOpAuditor;
import com.linkedin.datastream.connectors.kafka.NoOpSegmentDeserializer;
import com.linkedin.datastream.server.DatastreamTaskImpl;
import com.linkedin.datastream.testutil.DatastreamEmbeddedZookeeperKafkaCluster;

Expand Down Expand Up @@ -163,24 +161,10 @@ static Thread runKafkaMirrorMakerConnectorTask(KafkaMirrorMakerConnectorTask con

static KafkaBasedConnectorConfigBuilder getKafkaBasedConnectorConfigBuilder() {
return new KafkaBasedConnectorConfigBuilder()
.setConsumerProps(getKafkaConsumerProperties())
.setPausePartitionOnError(true)
.setPauseErrorPartitionDuration(Duration.ofSeconds(5));
}

/**
* Returns properties that will be used to configure Kafka consumer in BMM.
* Right now it returns No Op Segment Deserializer and No Op Auditor, as BMM doesn't need to assemble/disassemble
* any message, it just needs to do byte-byte copying.
* @return Properties to be used by Kafka consumer in BMM.
*/
static Properties getKafkaConsumerProperties() {
Properties props = new Properties();
props.put("segment.deserializer.class", NoOpSegmentDeserializer.class.getCanonicalName());
props.put("auditor.class", NoOpAuditor.class.getCanonicalName());
return props;
}

/**
* Get the default config properties of a Kafka-based connector
* @param override Configuration properties to override default config properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public void testAutoPauseOnSendFailure() throws Exception {
new MockDatastreamEventProducer((r) -> new String((byte[]) r.getEvents().get(0).key().get()).equals("key-2"));
task.setEventProducer(datastreamProducer);

Properties consumerProps = KafkaMirrorMakerConnectorTestUtils.getKafkaConsumerProperties();
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaMirrorMakerConnectorTask connectorTask =
KafkaMirrorMakerConnectorTestUtils.createKafkaMirrorMakerConnectorTask(task,
Expand Down Expand Up @@ -884,7 +884,7 @@ public void testAutoPauseAndResumeOnSendFailure() throws Exception {
new MockDatastreamEventProducer((r) -> new String((byte[]) r.getEvents().get(0).key().get()).equals("key-2"));
task.setEventProducer(datastreamProducer);

Properties consumerProps = KafkaMirrorMakerConnectorTestUtils.getKafkaConsumerProperties();
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaMirrorMakerConnectorTask connectorTask =
KafkaMirrorMakerConnectorTestUtils.createKafkaMirrorMakerConnectorTask(task,
Expand Down Expand Up @@ -960,7 +960,7 @@ private void testValidateTaskDiesOnRewindFailure(boolean failOnGetLastCheckpoint

createAndConnectZkAdapter(task);

Properties consumerProps = KafkaMirrorMakerConnectorTestUtils.getKafkaConsumerProperties();
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaBasedConnectorConfig connectorConfig = KafkaMirrorMakerConnectorTestUtils
.getKafkaBasedConnectorConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import com.linkedin.datastream.kafka.factory.KafkaConsumerFactory;
import com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl;


/**
Expand All @@ -23,6 +23,6 @@ public class LiKafkaConsumerFactory implements KafkaConsumerFactory<byte[], byte
public Consumer<byte[], byte[]> createConsumer(Properties properties) {
properties.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
properties.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
return new LiKafkaConsumerImpl<>(properties);
return new KafkaConsumer<>(properties);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

import kafka.server.KafkaConfig;
Expand Down Expand Up @@ -122,7 +123,7 @@ public void startup() {
}

private KafkaServer startBroker(Properties props) {
KafkaServer server = new KafkaServer(KafkaConfig.fromProps(props), new SystemTime(),
KafkaServer server = new KafkaServer(KafkaConfig.fromProps(props), new EmbeddedSystemTime(),
scala.Option.apply(""), scala.collection.JavaConversions.asScalaBuffer(Collections.emptyList()));
server.startup();
return server;
Expand Down Expand Up @@ -179,7 +180,7 @@ public String toString() {
return sb.toString();
}

static class SystemTime implements Time {
static class EmbeddedSystemTime extends SystemTime implements Time {
public long milliseconds() {
return System.currentTimeMillis();
}
Expand Down

This file was deleted.

3 changes: 1 addition & 2 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
ext {
LIKafkaVersion = "1.0.65"
apacheHttpClientVersion = "4.5.3"
avroVersion = "1.7.7"
commonsCliVersion = "1.2"
Expand All @@ -10,7 +9,7 @@ ext {
guavaVersion = "25.0-jre"
intellijAnnotationsVersion = "12.0"
jacksonVersion = "1.8.5"
kafkaVersion = "2.0.0.25"
kafkaVersion = "2.4.1.42"
log4jVersion = "1.2.17"
metricsCoreVersion = "4.1.0"
mockitoVersion = "1.10.19"
Expand Down

0 comments on commit 4d867ef

Please sign in to comment.