Skip to content

Commit e9462c8

Browse files
committed
fix(plugin): fix NPE using version 2.0 with KafkaFileObjectStateBackingStore (#149)
Resolves: #149
1 parent df7f193 commit e9462c8

File tree

4 files changed

+30
-12
lines changed

4 files changed

+30
-12
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaBasedLog.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,18 @@ public class KafkaBasedLog<K, V> {
5252
private static final Logger LOG = LoggerFactory.getLogger(KafkaBasedLog.class);
5353
private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
5454

55-
private Time time;
55+
private final Time time;
5656
private final String topic;
5757
private final Map<String, Object> producerConfigs;
5858
private final Map<String, Object> consumerConfigs;
5959
private final Callback<ConsumerRecord<K, V>> consumedCallback;
6060
private Consumer<K, V> consumer;
61-
private Producer<K, V> producer;
61+
private volatile Producer<K, V> producer;
6262

6363
private Thread thread;
6464
private boolean stopRequested;
65-
private Queue<Callback<Void>> readLogEndOffsetCallbacks;
66-
private Runnable initializer;
65+
private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
66+
private final Runnable initializer;
6767

6868
private volatile States state;
6969

@@ -230,14 +230,22 @@ public Future<Void> readToEnd() {
230230
return future;
231231
}
232232

233-
public void send(K key, V value) {
233+
public void send(final K key, final V value) {
234+
checkIsRunning();
234235
send(key, value, null);
235236
}
236237

237-
public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
238+
public void send(final K key, final V value, final org.apache.kafka.clients.producer.Callback callback) {
239+
checkIsRunning();
238240
producer.send(new ProducerRecord<>(topic, key, value), callback);
239241
}
240242

243+
private synchronized void checkIsRunning() {
244+
if (state != States.RUNNING) {
245+
throw new IllegalStateException("KafkaBasedLog is already not running.");
246+
}
247+
}
248+
241249

242250
private Producer<K, V> createProducer() {
243251
// Always require producer acks to all to ensure durable writes

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private synchronized void setState(final States status) {
8585
* {@inheritDoc}
8686
*/
8787
@Override
88-
public void start() {
88+
public synchronized void start() {
8989
if (isStarted()) {
9090
throw new IllegalStateException("Cannot init again.");
9191
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
2424
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
2525
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
26+
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
2627
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStoreManager;
2728
import io.streamthoughts.kafka.connect.filepulse.state.internal.OpaqueMemoryResource;
2829
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
@@ -189,7 +190,13 @@ private void initSharedStateBackingStore(final String connectorGroupName) {
189190
sharedStore = FileObjectStateBackingStoreManager.INSTANCE
190191
.getOrCreateSharedStore(
191192
connectorGroupName,
192-
taskConfig::getStateBackingStore,
193+
() -> {
194+
final FileObjectStateBackingStore store = taskConfig.getStateBackingStore();
195+
// Always invoke the start() method when store is created from Task
196+
// because this means the connector is running on a remote worker.
197+
store.start();
198+
return store;
199+
},
193200
new Object()
194201
);
195202
} catch (Exception exception) {

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/AbstractKafkaConnectTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,17 @@
3838
import org.testcontainers.containers.GenericContainer;
3939
import org.testcontainers.containers.Network;
4040
import org.testcontainers.containers.output.Slf4jLogConsumer;
41-
import org.testcontainers.containers.wait.Wait;
41+
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
4242

43+
import java.time.Duration;
4344
import java.util.Collections;
4445
import java.util.HashMap;
4546
import java.util.Map;
4647
import java.util.UUID;
4748
import java.util.stream.Stream;
4849

50+
import static org.testcontainers.containers.wait.strategy.Wait.forHttp;
51+
4952
public class AbstractKafkaConnectTest {
5053

5154
private static final Logger LOG = LoggerFactory.getLogger(FilePulseIT.class);
@@ -63,7 +66,7 @@ public class AbstractKafkaConnectTest {
6366
private static final String CONNECT_PLUGIN_PATH = "/usr/share/java/";
6467

6568
private static final String DOCKER_USERNAME = "confluentinc";
66-
private static final String DOCKER_CONFLUENT_TAG = "5.2.1";
69+
private static final String DOCKER_CONFLUENT_TAG = "6.2.0";
6770
private static final String CP_ZOOKEEPER_IMAGE = "cp-zookeeper";
6871
private static final String CP_KAFKA_IMAGE = "cp-kafka";
6972
private static final String CP_CONNECT_IMAGE = "cp-kafka-connect-base";
@@ -137,7 +140,7 @@ private static GenericContainer createConnectWorkerContainer() {
137140
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", CP_CONNECT_IMAGE)
138141
.withEnv("CONNECT_PLUGIN_PATH", CONNECT_PLUGIN_PATH)
139142
.withEnv("CONNECT_BOOTSTRAP_SERVERS", KAFKA_NETWORK_ALIAS + ":29092")
140-
.waitingFor(Wait.forHttp("/"))
143+
.waitingFor(forHttp("/"))
141144
.withFileSystemBind(getConnectPluginsDistDir(), CONNECT_PLUGIN_PATH + CONNECTOR_DIR_NAME + "/", BindMode.READ_WRITE);
142145
}
143146

@@ -203,7 +206,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
203206
}
204207
});
205208
while (true) {
206-
ConsumerRecords<String, String> records = consumer.poll(100);
209+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
207210
records.forEach(System.out::println);
208211

209212
}

0 commit comments

Comments
 (0)