Skip to content

Commit 57da04c

Browse files
committed
subtask(all): refactor FilePulse API to support remote storages (#100)
This commit contains a large refactoring of the Connect FilePulse API to remove the used of java class File for referencing an input file. This is prerequisite for Support streaming files from a remote storage Blob, S3, etc. Breaking changes: - add new methods to FileInputReader: readMetadata, isAccessible - update and rename interface FSDirectoryWalker to FileSystemListing - update interface FileListFilter to not use the File class - refactor class OffsetManager to SourceOffsetPolicy - add new class DefaultOffsetPolicy - rename SourceFile to FileObject - rename SourceOffset to FileObjectOffset - rename SourceStatus to FileObjectStatus - rename SourceMetadara to FileObjectMeta - add new top-level module filepulse-filesystems - add new module filepulse-local-fs - relocate readers for local filesystems Change configuration properties: - fs.scanner.class -> fs.listing.class - fs.scan.filers -> fs.listing.filters - fs.scan.directory.path -> fs.listing.directory.path - fs.recursive.scan.enable -> fs.listing.recursive.enabled Resolves: #100
1 parent be2d90e commit 57da04c

File tree

129 files changed

+1972
-1489
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

129 files changed

+1972
-1489
lines changed

checkstyle/suppressions.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@
5353
<suppress checks="LineLength" files="ConnectorConfig.java"/>
5454
<suppress checks="LineLength" files="TaskConfig.java"/>
5555
<suppress checks="LineLength" files="CommonConfig.java"/>
56-
<suppress checks="LineLength" files="LocalFileSystemScanner.java"/>
56+
<suppress checks="LineLength" files="DefaultFileSystemScanner.java"/>
5757
<suppress checks="LineLength" files="ComposeOffsetStrategy.java"/>
58+
<suppress checks="LineLength" files="DefaultOffsetPolicyConfig.java"/>
5859
<!-- Those classes are copy from kafka-connect api-->
5960
<suppress checks="LineLength" files="KafkaBasedLog.java"/>
6061
<suppress checks="NPathComplexity" files="KafkaBasedLog.java"/>

config/connect-file-pulse-example-override-topic-and-key.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212
"filters.ParseDelimitedRow.trimColumn": "true",
1313
"filters.ParseDelimitedRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
1414
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
15-
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
15+
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
1616
"fs.scan.directory.path": "/tmp/kafka-connect/examples/",
1717
"fs.scan.interval.ms": "10000",
1818
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
1919
"internal.kafka.reporter.topic": "connect-file-pulse-status",
2020
"offset.strategy": "name+hash",
2121
"skip.headers": "1",
22-
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
22+
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.RowFileInputReader",
2323
"topic": "connect-file-pulse-quickstart-csv",
2424
"tasks.max": 1
2525
},

config/connect-file-pulse-quickstart-avro.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
"config": {
33
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
44
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
5-
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
5+
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
66
"fs.scan.directory.path": "/tmp/kafka-connect/examples/",
77
"fs.scan.interval.ms": "10000",
88
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
99
"internal.kafka.reporter.topic": "connect-file-pulse-status",
1010
"offset.strategy": "name",
1111
"read.max.wait.ms": "5000",
12-
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.AvroFileInputReader",
12+
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.AvroFileInputReader",
1313
"topic": "connect-file-pulse-quickstart-avro",
1414
"tasks.max": 1
1515
},

config/connect-file-pulse-quickstart-csv.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
"filters.ParseDelimitedRow.trimColumn": "true",
1010
"filters.ParseDelimitedRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
1111
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
12-
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
12+
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
1313
"fs.scan.directory.path": "/tmp/kafka-connect/examples/",
1414
"fs.scan.interval.ms": "10000",
1515
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
1616
"internal.kafka.reporter.topic": "connect-file-pulse-status",
1717
"offset.strategy": "name+hash",
1818
"skip.headers": "1",
19-
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
19+
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.RowFileInputReader",
2020
"topic": "connect-file-pulse-quickstart-csv",
2121
"tasks.max": 1
2222
},

config/connect-file-pulse-quickstart-log4j.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
"filters.ParseLog4jLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
1212
"filters.ParseLog4jLog.ignoreFailure": "true",
1313
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
14-
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
14+
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.LocalFSDirectoryListing",
1515
"fs.scan.directory.path": "/var/log/kafka/",
1616
"fs.scan.interval.ms": "10000",
1717
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
1818
"internal.kafka.reporter.topic": "connect-file-pulse-status",
1919
"offset.strategy": "name",
2020
"read.max.wait.ms": "5000",
21-
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
21+
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.local.reader.RowFileInputReader",
2222
"topic": "connect-file-pulse-quickstart-log4j",
2323
"tasks.max": 1
2424
},

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/BatchFileCleanupPolicy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.clean;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2222

2323
import java.util.List;
2424

2525
/**
2626
* Policy for cleaning a batch of completed source files.
2727
*/
28-
public interface BatchFileCleanupPolicy extends GenericFileCleanupPolicy<List<SourceFile>,FileCleanupPolicyResultSet> {
28+
public interface BatchFileCleanupPolicy
29+
extends GenericFileCleanupPolicy<List<FileObject>,FileCleanupPolicyResultSet> {
2930

3031
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/DelegateBatchFileCleanupPolicy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.clean;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2222

2323
import java.util.List;
2424
import java.util.Map;
@@ -50,9 +50,9 @@ public void configure(final Map<String, ?> configs) {
5050
* {@inheritDoc}
5151
*/
5252
@Override
53-
public FileCleanupPolicyResultSet apply(final List<SourceFile> sources) {
53+
public FileCleanupPolicyResultSet apply(final List<FileObject> sources) {
5454
FileCleanupPolicyResultSet rs = new FileCleanupPolicyResultSet();
55-
for (SourceFile source : sources) {
55+
for (FileObject source : sources) {
5656
if (delegate.apply(source)) {
5757
rs.add(source, FileCleanupPolicyResult.SUCCEED);
5858
} else {

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/FileCleanupPolicy.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,23 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.clean;
2020

21-
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
22-
import io.streamthoughts.kafka.connect.filepulse.source.SourceStatus;
21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
2323

2424
/**
2525
* Policy for cleaning individual completed source files.
2626
*/
27-
public interface FileCleanupPolicy extends GenericFileCleanupPolicy<SourceFile, Boolean> {
27+
public interface FileCleanupPolicy extends GenericFileCleanupPolicy<FileObject, Boolean> {
2828

2929
/**
3030
* {@inheritDoc}
3131
*/
3232
@Override
33-
default Boolean apply(final SourceFile source) {
34-
return source.status().equals(SourceStatus.COMPLETED) ? onSuccess(source) : onFailure(source);
33+
default Boolean apply(final FileObject source) {
34+
return source.status().equals(FileObjectStatus.COMPLETED) ? onSuccess(source) : onFailure(source);
3535
}
3636

37-
boolean onSuccess(final SourceFile source);
37+
boolean onSuccess(final FileObject source);
3838

39-
boolean onFailure(final SourceFile source);
39+
boolean onFailure(final FileObject source);
4040
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/FileCleanupPolicyResultSet.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package io.streamthoughts.kafka.connect.filepulse.clean;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.internal.KeyValuePair;
22-
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2323

2424
import java.util.Collection;
2525
import java.util.LinkedList;
@@ -28,7 +28,7 @@
2828

2929
public class FileCleanupPolicyResultSet {
3030

31-
private final Collection<KeyValuePair<SourceFile, FileCleanupPolicyResult>> resultSet;
31+
private final Collection<KeyValuePair<FileObject, FileCleanupPolicyResult>> resultSet;
3232

3333
/**
3434
* Creates a new {@link FileCleanupPolicyResultSet} instance.
@@ -38,15 +38,15 @@ public FileCleanupPolicyResultSet() {
3838
}
3939

4040

41-
public void add(final SourceFile source, FileCleanupPolicyResult result) {
41+
public void add(final FileObject source, FileCleanupPolicyResult result) {
4242
Objects.requireNonNull(source, "source cannot be null");
4343
Objects.requireNonNull(result, "result cannot be null");
4444
resultSet.add(KeyValuePair.of(source, result));
4545
}
4646

47-
public void forEach(final BiConsumer<? super SourceFile, FileCleanupPolicyResult> action) {
47+
public void forEach(final BiConsumer<? super FileObject, FileCleanupPolicyResult> action) {
4848
Objects.requireNonNull(action);
49-
for (KeyValuePair<SourceFile, FileCleanupPolicyResult> kv : resultSet) {
49+
for (KeyValuePair<FileObject, FileCleanupPolicyResult> kv : resultSet) {
5050
action.accept(kv.key, kv.value);
5151
}
5252
}

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
2424
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
2525
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
26-
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
26+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2727
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
@@ -102,7 +102,7 @@ public RecordsIterable<FileRecord<TypedStruct>> apply(final RecordsIterable<File
102102
}
103103

104104
private FilterContext getContextFor(final FileRecord<TypedStruct> record,
105-
final SourceMetadata metadata) {
105+
final FileObjectMeta metadata) {
106106
return FilterContextBuilder
107107
.newBuilder()
108108
.withMetadata(metadata)

0 commit comments

Comments
 (0)