Skip to content

Commit 7b49b81

Browse files
committed
feat(plugin): add new property max.scheduled.files (#122) (#123)
Resolves: #122 Resolves: #123
1 parent 685618a commit 7b49b81

File tree

7 files changed

+247
-152
lines changed

7 files changed

+247
-152
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
-->
4343
<suppress checks="LineLength" files="RecordFilter.java"/>
4444
<suppress checks="LineLength" files="GrokPatternCompiler.java"/>
45-
<suppress checks="LineLength" files="DefaultFileSystemScanner.java"/>
45+
<suppress checks="LineLength" files="DefaultFileSystemMonitor.java"/>
4646
<suppress checks="LineLength" files="ComposeOffsetStrategy.java"/>
4747
<suppress checks="LineLength" files=".*Config.java"/>
4848
<!-- Those classes are copy from kafka-connect api-->
Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,34 @@
2424
import java.util.List;
2525

2626
/**
27-
* A {@link FileSystemScanner} is responsible to scan a specific file system
28-
* for new files to stream into Kafka;
27+
* A {@code FileSystemMonitor} is responsible for scanning a specific file system for new files to stream into Kafka.
2928
*/
30-
public interface FileSystemScanner {
29+
public interface FileSystemMonitor {
3130

3231
/**
33-
* Run a single file system scan using the specified context.
32+
* Run a single filesystem scan using the specified context.
3433
* @param context the connector context.
3534
*/
36-
void scan(final ConnectorContext context);
35+
void invoke(final ConnectorContext context);
3736

3837
/**
3938
* Gets newest files found during last scan partitioned for the specified number of groups.
4039
*
41-
* @param maxGroups the maximum number of groups.
40+
* @param maxGroups the maximum number of groups.
41+
* @return list of files to execute.
42+
*/
43+
default List<List<URI>> partitionFilesAndGet(final int maxGroups) {
44+
return partitionFilesAndGet(maxGroups, Integer.MAX_VALUE);
45+
}
46+
47+
/**
48+
* Gets newest files found during last scan partitioned for the specified number of groups.
49+
*
50+
* @param maxGroups the maximum number of groups.
51+
* @param maxFilesToSchedule the maximum number of files to scheduled.
4252
* @return list of files to execute.
4353
*/
44-
List<List<URI>> partitionFilesAndGet(final int maxGroups);
54+
List<List<URI>> partitionFilesAndGet(final int maxGroups, int maxFilesToSchedule);
4555

4656
/**
4757
* Close underlying I/O resources.

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/ConnectorConfig.java

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class ConnectorConfig extends CommonConfig {
3737
public static final String FS_LISTING_FILTERS_CONFIG = "fs.listing.filters";
3838
private static final String FS_SCAN_FILTERS_DOC = "Filters classes which are used to apply list input files.";
3939

40+
public static final String MAX_SCHEDULED_FILES_CONFIG = "max.scheduled.files";
41+
public static final String MAX_SCHEDULED_FILES_DOC = "Maximum number of files that can be schedules to tasks.";
42+
public static final int MAX_SCHEDULED_FILES_DEFAULT = 1000;
43+
4044
public static final String ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_CONFIG = "allow.tasks.reconfiguration.after.timeout.ms";
4145
public static final String ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for the connector to allow tasks to be reconfigured when new files are detected, even if some tasks are still being processed.";
4246

@@ -65,23 +69,60 @@ public ConnectorConfig(final Map<?, ?> originals) {
6569

6670
public static ConfigDef getConf() {
6771
return CommonConfig.getConf()
68-
.define(FS_LISTING_CLASS_CONFIG, ConfigDef.Type.CLASS,
69-
LocalFSDirectoryListing.class, ConfigDef.Importance.HIGH, FS_LISTING_CLASS_DOC)
70-
71-
.define(FS_LISTING_FILTERS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(),
72-
ConfigDef.Importance.MEDIUM, FS_SCAN_FILTERS_DOC)
73-
74-
.define(FS_SCAN_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, FS_SCAN_INTERVAL_MS_DEFAULT,
75-
ConfigDef.Importance.HIGH, FS_SCAN_INTERVAL_MS_DOC)
76-
77-
.define(FILE_CLEANER_CLASS_CONFIG,
78-
ConfigDef.Type.CLASS, ConfigDef.Importance.HIGH, FILE_CLEANER_CLASS_DOC)
79-
80-
.define(ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, Long.MAX_VALUE,
81-
ConfigDef.Importance.MEDIUM, ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_DOC)
82-
83-
.define(INTERNAL_REPORTER_GROUP_ID, ConfigDef.Type.STRING, null,
84-
ConfigDef.Importance.MEDIUM, INTERNAL_REPORTER_GROUP_ID_DOC);
72+
.define(
73+
FS_LISTING_CLASS_CONFIG,
74+
ConfigDef.Type.CLASS,
75+
LocalFSDirectoryListing.class,
76+
ConfigDef.Importance.HIGH,
77+
FS_LISTING_CLASS_DOC
78+
)
79+
80+
.define(
81+
FS_LISTING_FILTERS_CONFIG,
82+
ConfigDef.Type.LIST,
83+
Collections.emptyList(),
84+
ConfigDef.Importance.MEDIUM,
85+
FS_SCAN_FILTERS_DOC
86+
)
87+
88+
.define(
89+
FS_SCAN_INTERVAL_MS_CONFIG,
90+
ConfigDef.Type.LONG,
91+
FS_SCAN_INTERVAL_MS_DEFAULT,
92+
ConfigDef.Importance.HIGH,
93+
FS_SCAN_INTERVAL_MS_DOC
94+
)
95+
96+
.define(
97+
FILE_CLEANER_CLASS_CONFIG,
98+
ConfigDef.Type.CLASS,
99+
ConfigDef.Importance.HIGH,
100+
FILE_CLEANER_CLASS_DOC
101+
)
102+
103+
.define(
104+
ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_CONFIG,
105+
ConfigDef.Type.LONG,
106+
Long.MAX_VALUE,
107+
ConfigDef.Importance.MEDIUM,
108+
ALLOW_TASKS_RECONFIG_AFTER_TIMEOUT_MS_DOC
109+
)
110+
111+
.define(
112+
INTERNAL_REPORTER_GROUP_ID,
113+
ConfigDef.Type.STRING,
114+
null,
115+
ConfigDef.Importance.MEDIUM,
116+
INTERNAL_REPORTER_GROUP_ID_DOC
117+
)
118+
119+
.define(
120+
MAX_SCHEDULED_FILES_CONFIG,
121+
ConfigDef.Type.INT,
122+
MAX_SCHEDULED_FILES_DEFAULT,
123+
ConfigDef.Importance.MEDIUM,
124+
MAX_SCHEDULED_FILES_DOC
125+
);
85126
}
86127

87128
public Long allowTasksReconfigurationAfterTimeoutMs() {
@@ -92,6 +133,10 @@ public String getTasksReporterGroupId() {
92133
return getString(INTERNAL_REPORTER_GROUP_ID);
93134
}
94135

136+
public int getMaxScheduledFiles() {
137+
return getInt(MAX_SCHEDULED_FILES_CONFIG);
138+
}
139+
95140
public FileCleanupPolicy cleanupPolicy() {
96141
return getConfiguredInstance(FILE_CLEANER_CLASS_CONFIG, FileCleanupPolicy.class);
97142
}

0 commit comments

Comments
 (0)