Skip to content

Commit 39c37d9

Browse files
committed
feat(plugin): add new prop to configure if task should halt on error (#164)
Resolves: #164
1 parent 7c219b8 commit 39c37d9

File tree

4 files changed

+115
-85
lines changed

4 files changed

+115
-85
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ public class CommonSourceConfig extends AbstractConfig {
5050
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
5151

5252
public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
53-
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
53+
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
54+
55+
public static final String TASKS_HALT_ON_ERROR_CONFIG = "tasks.halt.on.error";
56+
private static final String TASKS_HALT_ON_ERROR_DOC = "Should a task halt when it encounters an error or continue to the next file.";
5457

5558
public static final String OFFSET_STRATEGY_CLASS_CONFIG = "offset.policy.class";
5659
private static final String OFFSET_STRATEGY_CLASS_DOC = "Class which is used to determine the source partition and offset that uniquely identify a input record";
@@ -61,10 +64,10 @@ public class CommonSourceConfig extends AbstractConfig {
6164
private static final String FILTER_DOC = "List of filters aliases to apply on each value (order is important).";
6265

6366
public static final String TASKS_FILE_STATUS_STORAGE_CLASS_CONFIG = "tasks.file.status.storage.class";
64-
public static final String TASKS_FILE_STATUS_STORAGE_CLASS_DOC = "The FileObjectStateBackingStore class to be used for storing status state of file objects.";
67+
private static final String TASKS_FILE_STATUS_STORAGE_CLASS_DOC = "The FileObjectStateBackingStore class to be used for storing status state of file objects.";
6568

6669
public static final String TASK_PARTITIONER_CLASS_CONFIG = "task.partitioner.class";
67-
public static final String TASK_PARTITIONER_CLASS_DOC = "The TaskPartitioner to be used for partitioning files to tasks";
70+
private static final String TASK_PARTITIONER_CLASS_DOC = "The TaskPartitioner to be used for partitioning files to tasks";
6871

6972
/**
7073
* Creates a new {@link CommonSourceConfig} instance.
@@ -100,6 +103,17 @@ public static ConfigDef getConfigDev() {
100103
ConfigDef.Width.NONE,
101104
TASKS_FILE_READER_CLASS_CONFIG
102105
)
106+
.define(
107+
TASKS_HALT_ON_ERROR_CONFIG,
108+
ConfigDef.Type.BOOLEAN,
109+
false,
110+
ConfigDef.Importance.HIGH,
111+
TASKS_HALT_ON_ERROR_DOC,
112+
GROUP,
113+
groupCounter++,
114+
ConfigDef.Width.NONE,
115+
TASKS_HALT_ON_ERROR_CONFIG
116+
)
103117
.define(
104118
OUTPUT_TOPIC_CONFIG,
105119
ConfigDef.Type.STRING,
@@ -155,8 +169,7 @@ public static ConfigDef getConfigDev() {
155169
TASK_PARTITIONER_CLASS_DOC
156170
);
157171
}
158-
159-
172+
160173
public FileSystemListing<?> getFileSystemListing() {
161174
return getConfiguredInstance(FS_LISTING_CLASS_CONFIG, FileSystemListing.class);
162175
}
@@ -169,6 +182,10 @@ public TaskPartitioner getTaskPartitioner() {
169182
return this.getConfiguredInstance(TASK_PARTITIONER_CLASS_CONFIG, TaskPartitioner.class);
170183
}
171184

185+
public boolean isTaskHaltOnError() {
186+
return this.getBoolean(TASKS_HALT_ON_ERROR_CONFIG);
187+
}
188+
172189
public SourceOffsetPolicy getSourceOffsetPolicy() {
173190
return this.getConfiguredInstance(OFFSET_STRATEGY_CLASS_CONFIG, SourceOffsetPolicy.class);
174191
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/DefaultFileRecordsPollingConsumer.java

Lines changed: 69 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2222
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
23-
import io.streamthoughts.kafka.connect.filepulse.filter.FilterException;
2423
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
2524
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
2625
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
@@ -52,7 +51,7 @@ public class DefaultFileRecordsPollingConsumer implements FileRecordsPollingCons
5251
private final SourceTaskContext taskContext;
5352
private final AtomicBoolean closed = new AtomicBoolean(false);
5453

55-
private FileRecord<TypedStruct> latestPollRecord;
54+
private FileRecord<TypedStruct> latestPolledRecord;
5655

5756
private FileInputIterator<FileRecord<TypedStruct>> currentIterator;
5857

@@ -134,11 +133,11 @@ void addAll(final List<URI> files) {
134133
public FileContext context() {
135134
if (currentIterator != null) {
136135
FileContext context = currentIterator.context();
137-
if (latestPollRecord != null) {
136+
if (latestPolledRecord != null) {
138137
context = new FileContext(
139138
context.key(),
140139
context.metadata(),
141-
latestPollRecord.offset().toSourceOffset());
140+
latestPolledRecord.offset().toSourceOffset());
142141
}
143142
return context;
144143
}
@@ -162,39 +161,31 @@ public RecordsIterable<FileRecord<TypedStruct>> next() {
162161
throw new IllegalStateException("FileRecordsPollingConsumer is closed, no more element can be returned");
163162
}
164163

165-
if (!hasNext()) return RecordsIterable.empty();
164+
if (!hasNext())
165+
return RecordsIterable.empty();
166166

167-
// Quickly iterate to lookup for a valid iterator
168-
do {
169-
final DelegateFileInputIterator candidate = queue.peek();
170-
if (candidate.isOpen()) {
171-
currentIterator = getOrCloseIteratorIfNoMoreRecord(candidate);
172-
} else {
173-
currentIterator = openAndGetIteratorOrNullIfInvalid(taskContext, candidate);
174-
}
175-
} while (hasNext() && currentIterator == null);
167+
currentIterator = findNextFileObjectIterator();
176168

177-
if (currentIterator == null) {
169+
if (currentIterator == null)
178170
return RecordsIterable.empty();
179-
}
180-
181-
final RecordsIterable<FileRecord<TypedStruct>> records = currentIterator.next();
182171

183172
Exception exception = null;
184173
try {
174+
// Read the next records from the current iterator
175+
final RecordsIterable<FileRecord<TypedStruct>> records = currentIterator.next();
176+
177+
// Apply the filter-chain in the returned records
185178
final RecordsIterable<FileRecord<TypedStruct>> filtered = pipeline.apply(
186179
records,
187180
currentIterator.hasNext()
188181
);
182+
// May update the last polled records.
189183
if (!filtered.isEmpty()) {
190-
latestPollRecord = filtered.last();
184+
latestPolledRecord = filtered.last();
191185
}
192-
return filtered;
193-
} catch (final FilterException e) {
194-
exception = e;
195-
// ignore the error - and skip the current file.
196-
return RecordsIterable.empty();
197186

187+
// Return record to the connect SourceTask
188+
return filtered;
198189
} catch (final ConnectFilePulseException e) {
199190
exception = e;
200191
throw e;
@@ -210,6 +201,40 @@ public RecordsIterable<FileRecord<TypedStruct>> next() {
210201
}
211202
}
212203

204+
private FileInputIterator<FileRecord<TypedStruct>> findNextFileObjectIterator() {
205+
206+
if (queue.isEmpty()) return null;
207+
208+
FileInputIterator<FileRecord<TypedStruct>> ret;
209+
// Quickly iterate to lookup for a valid iterator
210+
do {
211+
final DelegateFileInputIterator candidate = queue.peek();
212+
if (candidate.isOpen()) {
213+
ret = getOrCloseIteratorIfNoMoreRecord(candidate);
214+
} else {
215+
try {
216+
ret = openAndGetIteratorOrNullIfInvalid(candidate);
217+
if (ret == null) {
218+
var objectMeta = new GenericFileObjectMeta(candidate.getObjectURI());
219+
// Remove the current iterator and continue
220+
deleteFileQueueAndInvokeListener(new FileContext(candidate.key(), objectMeta), null);
221+
}
222+
} catch (Exception e) {
223+
LOG.error(
224+
"Failed to open and initialize new iterator for object: {}.",
225+
candidate.getObjectURI()
226+
);
227+
final FileObjectMeta objectMeta = new GenericFileObjectMeta(candidate.getObjectURI());
228+
deleteFileQueueAndInvokeListener(new FileContext(candidate.key(), objectMeta), e);
229+
// Rethrow the exception.
230+
throw e;
231+
}
232+
}
233+
} while (hasNext() && ret == null);
234+
235+
return ret;
236+
}
237+
213238
/**
214239
* {@inheritDoc}
215240
*/
@@ -259,19 +284,16 @@ public void setStateListener(final StateListener listener) {
259284
* This method will return {@code null} if the iterator point
260285
* to either an invalid file or to an already been completed file.
261286
*
262-
* @param context the connect source task context
263287
* @param iterator the source file iterator
264288
* @return a new {@link FileInputIterator} instance or {@code null} if the iterator is invalid.
265289
*/
266290
private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfInvalid(
267-
final SourceTaskContext context,
268291
final DelegateFileInputIterator iterator
269292
) {
270293
// Re-check if the file is still valid.
271294
if (!iterator.isValid()) {
272-
LOG.error("File does not exist or is not readable, skip entry and continue '{}'", iterator.getObjectURI());
273-
var objectMeta = new GenericFileObjectMeta(iterator.getObjectURI());
274-
deleteFileQueueAndInvokeListener(new FileContext(iterator.key(), objectMeta), null);
295+
LOG.warn("File does not exist or is not readable. Skip entry and continue '{}'", iterator.getObjectURI());
296+
// Return NULL so that the calling method can properly close the iterator.
275297
return null;
276298
}
277299

@@ -282,58 +304,42 @@ private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfInv
282304

283305
if (!ignoreCommittedOffsets) {
284306
committedOffset = offsetPolicy
285-
.getOffsetFor(context, metadata)
307+
.getOffsetFor(taskContext, metadata)
286308
.orElse(FileObjectOffset.empty());
287309
} else {
288310
committedOffset = FileObjectOffset.empty();
289311
}
290312
} catch (final Exception e) {
291-
if (metadata == null) {
292-
LOG.error(
293-
"Failed to load metadata for object file {}. Error: {}",
294-
iterator.getObjectURI(),
295-
e.getMessage()
296-
);
297-
metadata = new GenericFileObjectMeta(iterator.getObjectURI());
298-
deleteFileQueueAndInvokeListener(new FileContext(iterator.key(), metadata), e);
299-
return null;
300-
} else {
313+
if (metadata != null) {
301314
LOG.warn(
302-
"Failed to load committed offset for object file {}." +
303-
" Previous offset will be ignored. Error: {}",
315+
"Failed to load committed offset for object file {}. " +
316+
"Previous offset will be ignored. Error: {}",
304317
iterator.getObjectURI(),
305318
e.getMessage()
306319
);
307320
committedOffset = FileObjectOffset.empty();
321+
} else {
322+
// Rethrow the exception if metadata cannot be loaded.
323+
// The exception handling logic will be delegated to the calling method.
324+
throw e;
325+
308326
}
309327
}
310328

311329
// Quickly check if we can considered this file completed based on the content-length.
312330
boolean isAlreadyCompleted = committedOffset.position() >= metadata.contentLength();
313331
if (!ignoreCommittedOffsets && isAlreadyCompleted) {
314332
LOG.warn(
315-
"Detected source file already completed, skip entry and continue '{}'",
316-
iterator.getObjectURI());
317-
deleteFileQueueAndInvokeListener(
318-
new FileContext(iterator.key(), metadata, committedOffset),
319-
null
333+
"Detected source file already completed. Skip entry and continue '{}'",
334+
iterator.getObjectURI()
320335
);
336+
// Return NULL so that the calling method can properly close the iterator.
321337
return null;
322338
}
323339

324-
try {
325-
iterator.open();
326-
iterator.seekTo(committedOffset);
327-
pipeline.init(iterator.context());
328-
} catch (final Exception e) {
329-
LOG.error(
330-
"Failed to initialized a new iterator for object file '{}'.",
331-
iterator.getObjectURI(),
332-
e
333-
);
334-
deleteFileQueueAndInvokeListener(new FileContext(iterator.key(), metadata), e);
335-
return null;
336-
}
340+
iterator.open();
341+
iterator.seekTo(committedOffset);
342+
pipeline.init(iterator.context());
337343
if (hasListener()) {
338344
listener.onStart(iterator.context());
339345
}
@@ -365,14 +371,14 @@ private void closeIterator(final FileInputIterator<FileRecord<TypedStruct>> iter
365371
}
366372
}
367373

368-
private void deleteFileQueueAndInvokeListener(final FileContext taskContext,
374+
private void deleteFileQueueAndInvokeListener(final FileContext fileContext,
369375
final Throwable exception) {
370376
queue.remove();
371377
if (hasListener()) {
372378
if (exception != null) {
373-
listener.onFailure(taskContext, exception);
379+
listener.onFailure(fileContext, exception);
374380
} else {
375-
listener.onCompleted(taskContext);
381+
listener.onCompleted(fileContext);
376382
}
377383
}
378384
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -174,22 +174,29 @@ public List<SourceRecord> poll() throws InterruptedException {
174174
}
175175
} else {
176176

177-
final RecordsIterable<FileRecord<TypedStruct>> records = consumer.next();
178-
179-
if (!records.isEmpty()) {
180-
final FileContext context = consumer.context();
181-
LOG.debug("Returning {} records for {}", records.size(), context.metadata());
182-
results = records.stream()
183-
.map(r -> buildSourceRecord(context, r))
184-
.collect(Collectors.toList());
185-
186-
// Check if the SourceTask is still running to
187-
// return immediately instead of waiting
188-
} else if (running.get() &&
189-
consumer.hasNext() &&
190-
consecutiveWaits.checkAndDecrement()) {
191-
busyWait();
192-
continue;
177+
try {
178+
final RecordsIterable<FileRecord<TypedStruct>> records = consumer.next();
179+
if (!records.isEmpty()) {
180+
final FileContext context = consumer.context();
181+
LOG.debug("Returning {} records for {}", records.size(), context.metadata());
182+
results = records.stream()
183+
.map(r -> buildSourceRecord(context, r))
184+
.collect(Collectors.toList());
185+
186+
// Check if the SourceTask is still running to
187+
// return immediately instead of waiting
188+
} else if (running.get() &&
189+
consumer.hasNext() &&
190+
consecutiveWaits.checkAndDecrement()) {
191+
busyWait();
192+
continue;
193+
}
194+
} catch (ConnectFilePulseException e) {
195+
if (taskConfig.isTaskHaltOnError()) {
196+
throw e;
197+
} else {
198+
LOG.error("Caught unexpected error while processing file. Ignore and continue", e);
199+
}
193200
}
194201
}
195202

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/config/SourceTaskConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void should_configure_filter_given_on_failure() {
4141
final SourceTaskConfig config = new SourceTaskConfig(new HashMap<>() {{
4242
put(CommonSourceConfig.OUTPUT_TOPIC_CONFIG, "Test");
4343
put(CommonSourceConfig.FS_LISTING_CLASS_CONFIG, MockFileSystemListing.class.getName());
44-
put(CommonSourceConfig.TASKS_FILE_READER_CLASS_CONFIG, LocalRowFileInputReader.class.getName());
44+
put(CommonSourceConfig.TASKS_HALT_ON_ERROR, LocalRowFileInputReader.class.getName());
4545
put(DefaultTaskFileURIProvider.Config.FILE_OBJECT_URIS_CONFIG, "/tmp");
4646
put(CommonSourceConfig.FILTER_CONFIG, "Test");
4747
put(CommonSourceConfig.FILTER_CONFIG + ".Test.type", AppendFilter.class.getName());

0 commit comments

Comments
 (0)