Skip to content

Commit ef8fbc3

Browse files
committed
feat(plugin): add support for task file listing delegation
This commit adds the ability to delegate the file listing process to tasks. This new capability is intended for use in contexts where a large number of files are created in very short periods on the local file system. Changes: - add new config property fs.listing.tasks.delegation.enabled
1 parent 137c1a6 commit ef8fbc3

19 files changed

+648
-171
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileSystemMonitor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ public interface FileSystemMonitor {
3737
*/
3838
void invoke(final ConnectorContext context);
3939

40+
/**
41+
* Enables or disables the the file-listing process either temporarily or permanently.
42+
* In other words, if disabled then {@link #listFilesToSchedule()} will always return an empty list.
43+
*
44+
* @param enabled is the file-listing process enabled.
45+
*/
46+
void setFileSystemListingEnabled(final boolean enabled);
47+
4048
/**
4149
* Retrieves the list of objects-files that were found during the last the {@link #invoke(ConnectorContext)} call.
4250
* This method should not return more than the given maximum.

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileRecordsPollingConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ public interface FileRecordsPollingConsumer<T> extends FileInputIterator<T> {
3939
*
4040
* @param listener the {@link StateListener} instance to be used.
4141
*/
42-
void setFileListener(final StateListener listener);
42+
void setStateListener(final StateListener listener);
4343
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/StateListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.source;
2020

21-
interface StateListener {
21+
public interface StateListener {
2222

2323
/**
2424
* This method is invoked when a file is scheduled by the task.

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/TaskPartitioner.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package io.streamthoughts.kafka.connect.filepulse.source;
2020

2121
import java.net.URI;
22+
import java.util.Collection;
23+
import java.util.Collections;
2224
import java.util.List;
2325

24-
public interface TaskPartitioner {
26+
public interface TaskPartitioner extends AutoCloseable {
2527

2628
/**
2729
* Partitions the specified object-file URIs.
@@ -30,7 +32,7 @@ public interface TaskPartitioner {
3032
* @param taskCount the total number of tasks.
3133
* @return the list of URIs for the given {@literal taskId}.
3234
*/
33-
List<List<URI>> partition(final List<FileObjectMeta> files, final int taskCount);
35+
List<List<URI>> partition(final Collection<FileObjectMeta> files, final int taskCount);
3436

3537
/**
3638
* Partitions the specified object-file URIs.
@@ -40,9 +42,15 @@ public interface TaskPartitioner {
4042
* @param taskId the task id.
4143
* @return the list of URIs for the given {@literal taskId}.
4244
*/
43-
default List<URI> partitionForTask(final List<FileObjectMeta> files,
45+
default List<URI> partitionForTask(final Collection<FileObjectMeta> files,
4446
final int taskCount,
4547
final int taskId) {
46-
return partition(files, taskCount).get(taskId);
48+
return files == null || files.isEmpty() ? Collections.emptyList() : partition(files, taskCount).get(taskId);
4749
}
50+
51+
/**
52+
* {@inheritDoc}
53+
*/
54+
@Override
55+
default void close() { }
4856
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/SourceConnectorConfig.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class SourceConnectorConfig extends CommonSourceConfig {
4242
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
4343
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
4444

45+
public static final String FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG = "fs.listing.task.delegation.enabled";
46+
private static final String FS_LISTING_TASK_DELEGATION_ENABLED_DOC = "Boolean indicating whether the file listing process should be delegated to tasks.";
47+
4548
/**
4649
* Creates a new {@link SourceConnectorConfig} instance.
4750
* @param originals the originals configuration.
@@ -81,6 +84,14 @@ public static ConfigDef getConf() {
8184
MAX_SCHEDULED_FILES_DEFAULT,
8285
ConfigDef.Importance.MEDIUM,
8386
MAX_SCHEDULED_FILES_DOC
87+
)
88+
89+
.define(
90+
FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG,
91+
ConfigDef.Type.BOOLEAN,
92+
false,
93+
ConfigDef.Importance.LOW,
94+
FS_LISTING_TASK_DELEGATION_ENABLED_DOC
8495
);
8596
}
8697

@@ -92,12 +103,16 @@ public int getMaxScheduledFiles() {
92103
return getInt(MAX_SCHEDULED_FILES_CONFIG);
93104
}
94105

95-
public FileCleanupPolicy cleanupPolicy() {
106+
public FileCleanupPolicy getFileCleanupPolicy() {
96107
return getConfiguredInstance(FILE_CLEANER_CLASS_CONFIG, FileCleanupPolicy.class);
97108
}
98109

99-
public long scanInternalMs() {
110+
public long getListingInterval() {
100111
return this.getLong(FS_LISTING_INTERVAL_MS_CONFIG);
101112
}
102113

114+
public boolean isFileListingTaskDelegationEnabled() {
115+
return getBoolean(FS_LISTING_TASK_DELEGATION_ENABLED_CONFIG);
116+
}
117+
103118
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/SourceTaskConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*/
4242
public class SourceTaskConfig extends CommonSourceConfig {
4343

44-
private static final String FILE_URIS_PROVIDER_CONFIG = "file.uris.provider";
44+
public static final String FILE_URIS_PROVIDER_CONFIG = "file.uris.provider";
4545
private static final String FILE_URIS_PROVIDER_DOC = "The FileURIProvider class to be used for retrieving the file URIs to process";
4646

4747
private static final String OMIT_READ_COMMITTED_FILE_CONFIG = "ignore.committed.offsets";

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
2525
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
2626
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
27-
import io.streamthoughts.kafka.connect.filepulse.config.SourceConnectorConfig;
28-
import io.streamthoughts.kafka.connect.filepulse.internal.KeyValuePair;
2927
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
3028
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectKey;
3129
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
@@ -54,7 +52,6 @@
5452
import java.util.concurrent.TimeoutException;
5553
import java.util.concurrent.atomic.AtomicBoolean;
5654
import java.util.stream.Collectors;
57-
import java.util.stream.Stream;
5855

5956
/**
6057
* A default {@link FileSystemMonitor} that can be used to trigger file
@@ -102,6 +99,8 @@ public class DefaultFileSystemMonitor implements FileSystemMonitor {
10299

103100
private final AtomicBoolean changed = new AtomicBoolean(false);
104101

102+
private final AtomicBoolean fileSystemListingEnabled = new AtomicBoolean(true);
103+
105104
/**
106105
* Creates a new {@link DefaultFileSystemMonitor} instance.
107106
*
@@ -211,7 +210,7 @@ private boolean readStatesToEnd(final Duration timeout) {
211210
public void invoke(final ConnectorContext context) {
212211
// It seems to be OK to always run cleanup even if connector is not yet started or is being shut down.
213212
cleanUpCompletedFiles();
214-
if (running.get()) {
213+
if (running.get() && fileSystemListingEnabled.get()) {
215214
if (!taskReconfigurationRequested.get()) {
216215
if (updateFiles()) {
217216
LOG.info("Requesting task reconfiguration");
@@ -221,11 +220,19 @@ public void invoke(final ConnectorContext context) {
221220
} else {
222221
LOG.info("Task reconfiguration requested. Skip filesystem listing.");
223222
}
224-
} else {
223+
} else if (fileSystemListingEnabled.get()) {
225224
LOG.info("The connector is not completely started or is being shut down. Skip filesystem listing.");
226225
}
227226
}
228227

228+
/**
229+
* {@inheritDoc}
230+
*/
231+
@Override
232+
public void setFileSystemListingEnabled(final boolean enabled) {
233+
this.fileSystemListingEnabled.set(enabled);
234+
}
235+
229236
private void cleanUpCompletedFiles() {
230237
if (completed.isEmpty()) {
231238
LOG.debug("Skipped cleanup. No object file completed.");
@@ -271,8 +278,8 @@ private synchronized boolean updateFiles() {
271278
long took = Time.SYSTEM.milliseconds() - started;
272279
LOG.info("Completed object files listing. '{}' object files found in {}ms", objects.size(), took);
273280

274-
final StateSnapshot<FileObject> snapshot = store.snapshot();
275-
final Map<FileObjectKey, FileObjectMeta> toScheduled = toScheduled(objects, snapshot);
281+
final Map<FileObjectKey, FileObjectMeta> toScheduled = FileObjectCandidatesFilter
282+
.filter(offsetPolicy, store.snapshot(), objects);
276283

277284
// Some scheduled files are still being processed, but new files are detected
278285
if (!noScheduledFiles) {
@@ -310,52 +317,6 @@ private synchronized boolean updateFiles() {
310317
return !scanned.isEmpty() && running.get();
311318
}
312319

313-
private Map<FileObjectKey, FileObjectMeta> toScheduled(final Collection<FileObjectMeta> scanned,
314-
final StateSnapshot<FileObject> snapshot) {
315-
316-
final List<KeyValuePair<String, FileObjectMeta>> toScheduled = scanned.stream()
317-
.map(source -> KeyValuePair.of(offsetPolicy.toPartitionJson(source), source))
318-
.filter(kv -> maybeScheduled(snapshot, kv.key))
319-
.collect(Collectors.toList());
320-
321-
// Looking for duplicates in sources files, i.e the OffsetPolicy generate two identical offsets for two files.
322-
final Stream<Map.Entry<String, List<KeyValuePair<String, FileObjectMeta>>>> entryStream = toScheduled
323-
.stream()
324-
.collect(Collectors.groupingBy(kv -> kv.key))
325-
.entrySet()
326-
.stream()
327-
.filter(entry -> entry.getValue().size() > 1);
328-
329-
final Map<String, List<String>> duplicates = entryStream
330-
.collect(Collectors.toMap(
331-
Map.Entry::getKey,
332-
e -> e.getValue().stream().map(m -> m.value.stringURI()).collect(Collectors.toList()))
333-
);
334-
335-
if (!duplicates.isEmpty()) {
336-
final String formatted = duplicates
337-
.entrySet()
338-
.stream()
339-
.map(e -> "partition_key=" + e.getKey() + ", files=" + e.getValue())
340-
.collect(Collectors.joining("\n\t", "\n\t", "\n"));
341-
LOG.error(
342-
"Duplicates object files detected. " +
343-
"Consider changing the configuration for '{}'. " +
344-
"Scan is ignored: {}",
345-
SourceConnectorConfig.OFFSET_STRATEGY_CLASS_CONFIG,
346-
formatted
347-
);
348-
return Collections.emptyMap(); // ignore all sources files
349-
}
350-
351-
return toScheduled.stream().collect(Collectors.toMap(kv -> FileObjectKey.of(kv.key), kv -> kv.value));
352-
}
353-
354-
private boolean maybeScheduled(final StateSnapshot<FileObject> snapshot,
355-
final String partition) {
356-
return !snapshot.contains(partition) ||
357-
snapshot.getForKey(partition).status().isOneOf(FileObjectStatus.started());
358-
}
359320

360321
/**
361322
* {@inheritDoc}

0 commit comments

Comments
 (0)