Skip to content

Commit a2fb4a5

Browse files
committed
sub-task(plugin): add new interface TaskPartitioner
1 parent bbeebda commit a2fb4a5

File tree

16 files changed

+406
-208
lines changed

16 files changed

+406
-208
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileSystemMonitor.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,43 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.fs;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
2122
import org.apache.kafka.connect.connector.ConnectorContext;
2223

23-
import java.net.URI;
2424
import java.util.List;
2525

2626
/**
27-
* A {@code FileSystemMonitor} is responsible for scanning a specific file system for new files to stream into Kafka.
27+
* A {@code FileSystemMonitor} is responsible for monitoring a specific file-system
28+
* for new files to stream into Kafka.
2829
*/
2930
public interface FileSystemMonitor {
3031

3132
/**
32-
* Run a single filesystem scan using the specified context.
33+
* Run a single filesystem scan using the specified context. This method must invoke
34+
* the {@link ConnectorContext#requestTaskReconfiguration()} when new object-files can be scheduled.
35+
*
3336
* @param context the connector context.
3437
*/
3538
void invoke(final ConnectorContext context);
3639

3740
/**
38-
* Gets newest files found during last scan partitioned for the specified number of groups.
41+
* Retrieves the list of objects-files that were found during the last the {@link #invoke(ConnectorContext)} call.
42+
* This method should not return more than the given maximum.
3943
*
40-
* @param maxGroups the maximum number of groups.
41-
* @return list of files to execute.
44+
* @return the list of {@link FileObjectMeta} to schedule.
4245
*/
43-
default List<List<URI>> partitionFilesAndGet(final int maxGroups) {
44-
return partitionFilesAndGet(maxGroups, Integer.MAX_VALUE);
45-
}
46+
default List<FileObjectMeta> listFilesToSchedule() {
47+
return listFilesToSchedule(Integer.MAX_VALUE);
48+
}
4649

4750
/**
48-
* Gets newest files found during last scan partitioned for the specified number of groups.
51+
* Retrieves the list of objects-files that were found during the last the {@link #invoke(ConnectorContext)} call.
52+
* This method should not return more than the given maximum.
4953
*
50-
* @param maxGroups the maximum number of groups.
51-
* @param maxFilesToSchedule the maximum number of files to scheduled.
52-
* @return list of files to execute.
54+
* @param maxFilesToSchedule the maximum number of files that can be schedules to tasks.
55+
* @return the list of {@link FileObjectMeta} to schedule.
5356
*/
54-
List<List<URI>> partitionFilesAndGet(final int maxGroups, int maxFilesToSchedule);
57+
List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule);
5558

5659
/**
5760
* Close underlying I/O resources.

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileURIProvider.java renamed to connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/TaskFileURIProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Class that can be used to provide to {@link org.apache.kafka.connect.source.SourceTask}
2929
* the next URIs of the object files to process.
3030
*/
31-
public interface FileURIProvider extends Configurable {
31+
public interface TaskFileURIProvider extends Configurable {
3232

3333
/**
3434
* {@inheritDoc}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.source;
20+
21+
import java.net.URI;
22+
import java.util.List;
23+
24+
public interface TaskPartitioner {
25+
26+
/**
27+
* Partitions the specified object-file URIs.
28+
*
29+
* @param files the object-file URIs to partition.
30+
* @param taskCount the total number of tasks.
31+
* @return the list of URIs for the given {@literal taskId}.
32+
*/
33+
List<List<URI>> partition(final List<FileObjectMeta> files, final int taskCount);
34+
35+
/**
36+
* Partitions the specified object-file URIs.
37+
*
38+
* @param files the object-file URIs to partition.
39+
* @param taskCount the total number of tasks.
40+
* @param taskId the task id.
41+
* @return the list of URIs for the given {@literal taskId}.
42+
*/
43+
default List<URI> partitionForTask(final List<FileObjectMeta> files,
44+
final int taskCount,
45+
final int taskId) {
46+
return partition(files, taskCount).get(taskId);
47+
}
48+
}
Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,37 @@
1818
*/
1919
package io.streamthoughts.kafka.connect.filepulse.config;
2020

21+
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
22+
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
2123
import io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy;
24+
import io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner;
2225
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
26+
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
2327
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
2428
import io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore;
2529
import org.apache.kafka.common.config.AbstractConfig;
2630
import org.apache.kafka.common.config.ConfigDef;
2731

2832
import java.util.Collections;
33+
import java.util.List;
2934
import java.util.Map;
3035

3136
/**
3237
*
3338
*/
34-
public class CommonConfig extends AbstractConfig {
39+
public class CommonSourceConfig extends AbstractConfig {
3540

3641
private static final String GROUP = "Common";
3742

3843
public static final String OUTPUT_TOPIC_CONFIG = "topic";
3944
private static final String OUTPUT_TOPIC_DOC = "The Kafka topic to write the value to.";
4045

46+
public static final String FS_LISTING_CLASS_CONFIG = "fs.listing.class";
47+
private static final String FS_LISTING_CLASS_DOC = "Class which is used to list eligible files from the scanned file system.";
48+
49+
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
50+
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
51+
4152
public static final String TASKS_FILE_READER_CLASS_CONFIG = "tasks.reader.class";
4253
private static final String TASKS_FILE_READER_CLASS_DOC = "Class which is used by tasks to read an input file.";
4354

@@ -52,16 +63,33 @@ public class CommonConfig extends AbstractConfig {
5263
public static final String TASKS_FILE_STATUS_STORAGE_CLASS_CONFIG = "tasks.file.status.storage.class";
5364
public static final String TASKS_FILE_STATUS_STORAGE_CLASS_DOC = "The FileObjectStateBackingStore class to be used for storing status state of file objects.";
5465

66+
public static final String TASK_PARTITIONER_CLASS_CONFIG = "task.partitioner.class";
67+
public static final String TASK_PARTITIONER_CLASS_DOC = "The TaskPartitioner to be used for partitioning files to tasks";
68+
5569
/**
56-
* Creates a new {@link CommonConfig} instance.
70+
* Creates a new {@link CommonSourceConfig} instance.
5771
*/
58-
CommonConfig(final ConfigDef definition, final Map<?, ?> originals) {
72+
public CommonSourceConfig(final ConfigDef definition, final Map<?, ?> originals) {
5973
super(definition, originals, false);
6074
}
6175

62-
static ConfigDef getConf() {
76+
public static ConfigDef getConfigDev() {
6377
int groupCounter = 0;
6478
return new ConfigDef()
79+
.define(
80+
FS_LISTING_CLASS_CONFIG,
81+
ConfigDef.Type.CLASS,
82+
ConfigDef.Importance.HIGH,
83+
FS_LISTING_CLASS_DOC
84+
)
85+
86+
.define(
87+
FS_LISTING_FILTERS_CONFIG,
88+
ConfigDef.Type.LIST,
89+
Collections.emptyList(),
90+
ConfigDef.Importance.MEDIUM,
91+
FS_SCAN_FILTERS_DOC
92+
)
6593
.define(
6694
TASKS_FILE_READER_CLASS_CONFIG,
6795
ConfigDef.Type.CLASS,
@@ -114,9 +142,33 @@ static ConfigDef getConf() {
114142
-1,
115143
ConfigDef.Width.NONE,
116144
FILTER_CONFIG
145+
)
146+
.define(
147+
TASK_PARTITIONER_CLASS_CONFIG,
148+
ConfigDef.Type.CLASS,
149+
DefaultTaskPartitioner.class,
150+
ConfigDef.Importance.HIGH,
151+
FILTER_DOC,
152+
FILTERS_GROUP,
153+
-1,
154+
ConfigDef.Width.NONE,
155+
TASK_PARTITIONER_CLASS_DOC
117156
);
118157
}
119158

159+
160+
public FileSystemListing<?> getFileSystemListing() {
161+
return getConfiguredInstance(FS_LISTING_CLASS_CONFIG, FileSystemListing.class);
162+
}
163+
164+
public List<FileListFilter> getFileSystemListingFilter() {
165+
return getConfiguredInstances(FS_LISTING_FILTERS_CONFIG, FileListFilter.class);
166+
}
167+
168+
public TaskPartitioner getTaskPartitioner() {
169+
return this.getConfiguredInstance(TASK_PARTITIONER_CLASS_CONFIG, TaskPartitioner.class);
170+
}
171+
120172
public SourceOffsetPolicy getSourceOffsetPolicy() {
121173
return this.getConfiguredInstance(OFFSET_STRATEGY_CLASS_CONFIG, SourceOffsetPolicy.class);
122174
}
Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,18 @@
1919
package io.streamthoughts.kafka.connect.filepulse.config;
2020

2121
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
22-
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
23-
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
2422
import org.apache.kafka.common.config.ConfigDef;
2523

26-
import java.util.Collections;
27-
import java.util.List;
2824
import java.util.Map;
2925

30-
public class ConnectorConfig extends CommonConfig {
26+
public class SourceConnectorConfig extends CommonSourceConfig {
3127

3228
/* Settings for DefaultFileSystemMonitor */
33-
public static final String FS_LISTING_CLASS_CONFIG = "fs.listing.class";
34-
private static final String FS_LISTING_CLASS_DOC = "Class which is used to list eligible files from the scanned file system.";
35-
36-
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
37-
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
38-
3929
public static final String ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_CONFIG = "allow.tasks.reconfiguration.after.timeout.ms";
40-
public static final String ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed.";
30+
private static final String ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed.";
4131

4232
public static final String FILE_CLEANER_CLASS_CONFIG = "fs.cleanup.policy.class";
43-
public static final String FILE_CLEANER_CLASS_DOC = "The class used to cleanup files that have been processed by tasks.";
33+
private static final String FILE_CLEANER_CLASS_DOC = "The class used to cleanup files that have been processed by tasks.";
4434

4535
/* Settings for FileSystemMonitorThread */
4636
public static final String FS_LISTING_INTERVAL_MS_CONFIG = "fs.listing.interval.ms";
@@ -49,34 +39,19 @@ public class ConnectorConfig extends CommonConfig {
4939

5040
/* Settings for FilePulseSourceConnector */
5141
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
52-
public static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
53-
public static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
42+
private static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
43+
private static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
5444

5545
/**
56-
* Creates a new {@link ConnectorConfig} instance.
46+
* Creates a new {@link SourceConnectorConfig} instance.
5747
* @param originals the originals configuration.
5848
*/
59-
public ConnectorConfig(final Map<?, ?> originals) {
49+
public SourceConnectorConfig(final Map<?, ?> originals) {
6050
super(getConf(), originals);
6151
}
6252

6353
public static ConfigDef getConf() {
64-
return CommonConfig.getConf()
65-
.define(
66-
FS_LISTING_CLASS_CONFIG,
67-
ConfigDef.Type.CLASS,
68-
ConfigDef.Importance.HIGH,
69-
FS_LISTING_CLASS_DOC
70-
)
71-
72-
.define(
73-
FS_LISTING_FILTERS_CONFIG,
74-
ConfigDef.Type.LIST,
75-
Collections.emptyList(),
76-
ConfigDef.Importance.MEDIUM,
77-
FS_SCAN_FILTERS_DOC
78-
)
79-
54+
return CommonSourceConfig.getConfigDev()
8055
.define(
8156
FS_LISTING_INTERVAL_MS_CONFIG,
8257
ConfigDef.Type.LONG,
@@ -121,15 +96,8 @@ public FileCleanupPolicy cleanupPolicy() {
12196
return getConfiguredInstance(FILE_CLEANER_CLASS_CONFIG, FileCleanupPolicy.class);
12297
}
12398

124-
public FileSystemListing fileSystemListing() {
125-
return getConfiguredInstance(FS_LISTING_CLASS_CONFIG, FileSystemListing.class);
126-
}
127-
12899
public long scanInternalMs() {
129100
return this.getLong(FS_LISTING_INTERVAL_MS_CONFIG);
130101
}
131102

132-
public List<FileListFilter> fileSystemListingFilter() {
133-
return getConfiguredInstances(FS_LISTING_FILTERS_CONFIG, FileListFilter.class);
134-
}
135103
}

0 commit comments

Comments
 (0)