Skip to content

Commit bbeebda

Browse files
committed
subtask(plugin): add new interface FileURIProvider
This commit adds a new interface named FileURIProvider which is used to provide file URIs to the SourceTask. The DefaultFileURIProvider implementation just return the URIs passed through the task's configuration.
1 parent 47c166e commit bbeebda

File tree

6 files changed

+246
-44
lines changed

6 files changed

+246
-44
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
20+
21+
import org.apache.kafka.common.Configurable;
22+
23+
import java.net.URI;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
/**
28+
* Class that can be used to provide to {@link org.apache.kafka.connect.source.SourceTask}
29+
* the next URIs of the object files to process.
30+
*/
31+
public interface FileURIProvider extends Configurable {
32+
33+
/**
34+
* {@inheritDoc}
35+
*/
36+
@Override
37+
default void configure(final Map<String, ?> configs) { }
38+
39+
/**
40+
* Retrieves the {@link URI}s of the next object files to read.
41+
*
42+
* @throws java.util.NoSuchElementException if the provider has no more elements.
43+
* @return the URIs of the object file to read.
44+
*/
45+
List<URI> nextURIs();
46+
47+
/**
48+
* Returns {@code true} if the provider has more URIs.
49+
* (In other words, returns {@code true} if {@link #nextURIs} would
50+
* return an element rather than throwing an exception.)
51+
*
52+
* @return {@code true} if the provider has more URIs.
53+
*/
54+
boolean hasMore();
55+
56+
/**
57+
* Close underlying I/O resources.
58+
*/
59+
default void close() {
60+
61+
}
62+
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/TaskConfig.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.config;
2020

21-
import java.net.URI;
21+
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilter;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileURIProvider;
23+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
24+
import io.streamthoughts.kafka.connect.filepulse.fs.FileURIProvider;
25+
import org.apache.kafka.common.config.AbstractConfig;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.apache.kafka.common.config.ConfigException;
28+
import org.apache.kafka.connect.errors.ConnectException;
29+
2230
import java.util.ArrayList;
2331
import java.util.Arrays;
2432
import java.util.Collection;
@@ -28,20 +36,13 @@
2836
import java.util.Set;
2937
import java.util.stream.Collectors;
3038

31-
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilter;
32-
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
33-
import org.apache.kafka.common.config.AbstractConfig;
34-
import org.apache.kafka.common.config.ConfigDef;
35-
import org.apache.kafka.common.config.ConfigException;
36-
import org.apache.kafka.connect.errors.ConnectException;
37-
3839
/**
3940
*
4041
*/
4142
public class TaskConfig extends CommonConfig {
4243

43-
public static final String FILE_OBJECT_URIS_CONFIG = "file.object.uris";
44-
private static final String FILE_OBJECT_URIS_DOC = "The list of files task must proceed.";
44+
private static final String FILE_URIS_PROVIDER_CONFIG = "file.uris.provider";
45+
private static final String FILE_URIS_PROVIDER_DOC = "The FileURIProvider class to be used for retrieving the file URIs to process";
4546

4647
private static final String OMIT_READ_COMMITTED_FILE_CONFIG = "ignore.committed.offsets";
4748
private static final String OMIT_READ_COMMITTED_FILE_DOC = "Boolean indicating whether offsets check has to be performed, to avoid multiple (default : false)";
@@ -51,10 +52,11 @@ public class TaskConfig extends CommonConfig {
5152
static ConfigDef getConf() {
5253
return CommonConfig.getConf()
5354
.define(
54-
FILE_OBJECT_URIS_CONFIG,
55-
ConfigDef.Type.LIST,
55+
FILE_URIS_PROVIDER_CONFIG,
56+
ConfigDef.Type.CLASS,
57+
DefaultFileURIProvider.class,
5658
ConfigDef.Importance.HIGH,
57-
FILE_OBJECT_URIS_DOC
59+
FILE_URIS_PROVIDER_DOC
5860
)
5961
.define(
6062
OMIT_READ_COMMITTED_FILE_CONFIG,
@@ -160,8 +162,8 @@ private static void addConfigDefForFilter(final Map<String, String> props,
160162
newDef.embed(prefix, group, orderInGroup, filterConfigDef);
161163
}
162164

163-
public List<URI> files() {
164-
return this.getList(FILE_OBJECT_URIS_CONFIG).stream().map(URI::create).collect(Collectors.toList());
165+
public FileURIProvider getFileURIProvider() {
166+
return this.getConfiguredInstance(FILE_URIS_PROVIDER_CONFIG, FileURIProvider.class);
165167
}
166168

167169
public boolean isReadCommittedFile() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.config.TaskConfig;
22+
import org.apache.kafka.common.config.AbstractConfig;
23+
import org.apache.kafka.common.config.ConfigDef;
24+
25+
import java.net.URI;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.NoSuchElementException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Collectors;
31+
32+
public class DefaultFileURIProvider implements FileURIProvider {
33+
34+
private List<URI> objectURIs;
35+
36+
private final AtomicBoolean hasMore = new AtomicBoolean(true);
37+
38+
/**
39+
* {@inheritDoc}
40+
*/
41+
@Override
42+
public void configure(final Map<String, ?> configs) {
43+
objectURIs = new Config(configs).objectURIs();
44+
}
45+
46+
/**
47+
* {@inheritDoc}
48+
*/
49+
@Override
50+
public List<URI> nextURIs() {
51+
if (hasMore.compareAndSet(true, false)) {
52+
return objectURIs;
53+
} else {
54+
throw new NoSuchElementException("No more URIs can be retrieved from this provider.");
55+
}
56+
}
57+
58+
/**
59+
* {@inheritDoc}
60+
*/
61+
@Override
62+
public boolean hasMore() {
63+
return hasMore.get();
64+
}
65+
66+
public static final class Config extends AbstractConfig {
67+
68+
public static final String FILE_OBJECT_URIS_CONFIG = "file.object.uris";
69+
private static final String FILE_OBJECT_URIS_DOC = "The list of files task must proceed.";
70+
71+
/**
72+
* Creates a new {@link TaskConfig} instance.
73+
*
74+
* @param originals the original configs.
75+
*/
76+
public Config(final Map<String, ?> originals) {
77+
super(getConf(), originals, false);
78+
}
79+
80+
static ConfigDef getConf() {
81+
return new ConfigDef()
82+
.define(
83+
FILE_OBJECT_URIS_CONFIG,
84+
ConfigDef.Type.LIST,
85+
ConfigDef.Importance.HIGH,
86+
FILE_OBJECT_URIS_DOC
87+
);
88+
}
89+
90+
public List<URI> objectURIs() {
91+
return this.getList(FILE_OBJECT_URIS_CONFIG).stream().map(URI::create).collect(Collectors.toList());
92+
}
93+
}
94+
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import io.streamthoughts.kafka.connect.filepulse.Version;
2222
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
2323
import io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig;
24-
import io.streamthoughts.kafka.connect.filepulse.config.TaskConfig;
2524
import io.streamthoughts.kafka.connect.filepulse.fs.CompositeFileListFilter;
2625
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor;
26+
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileURIProvider;
2727
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
2828
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
2929
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStoreManager;
@@ -179,7 +179,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
179179

180180
private Map<String, String> createTaskConfig(final String URIs) {
181181
final Map<String, String> taskProps = new HashMap<>(configProperties);
182-
taskProps.put(TaskConfig.FILE_OBJECT_URIS_CONFIG, URIs);
182+
taskProps.put(DefaultFileURIProvider.Config.FILE_OBJECT_URIS_CONFIG, URIs);
183183
return taskProps;
184184
}
185185

0 commit comments

Comments
 (0)