Skip to content

Commit d86804b

Browse files
committed
feat(plugin): add new config tasks.empty.poll.wait.ms
This commit adds the new prop `tasks.empty.poll.wait.ms` to fix the amount of time in millisecond a tasks should wait if a poll returns an empty list of records.
1 parent 39c37d9 commit d86804b

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/CommonSourceConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public class CommonSourceConfig extends AbstractConfig {
5555
public static final String TASKS_HALT_ON_ERROR_CONFIG = "tasks.halt.on.error";
5656
private static final String TASKS_HALT_ON_ERROR_DOC = "Should a task halt when it encounters an error or continue to the next file.";
5757

58+
public static final String TASKS_EMPTY_POLL_WAIT_MS_CONFIG = "tasks.empty.poll.wait.ms";
59+
public static final String TASKS_EMPTY_POLL_WAIT_MS_DOC = "The amount of time in millisecond a tasks should wait if a poll returns an empty list of records.";
60+
5861
public static final String OFFSET_STRATEGY_CLASS_CONFIG = "offset.policy.class";
5962
private static final String OFFSET_STRATEGY_CLASS_DOC = "Class which is used to determine the source partition and offset that uniquely identify a input record";
6063
private static final String OFFSET_STRATEGY_CLASS_DEFAULT = DefaultSourceOffsetPolicy.class.getName();
@@ -114,6 +117,17 @@ public static ConfigDef getConfigDev() {
114117
ConfigDef.Width.NONE,
115118
TASKS_HALT_ON_ERROR_CONFIG
116119
)
120+
.define(
121+
TASKS_EMPTY_POLL_WAIT_MS_CONFIG,
122+
ConfigDef.Type.LONG,
123+
500,
124+
ConfigDef.Importance.LOW,
125+
TASKS_EMPTY_POLL_WAIT_MS_DOC,
126+
GROUP,
127+
groupCounter++,
128+
ConfigDef.Width.NONE,
129+
TASKS_EMPTY_POLL_WAIT_MS_CONFIG
130+
)
117131
.define(
118132
OUTPUT_TOPIC_CONFIG,
119133
ConfigDef.Type.STRING,
@@ -169,7 +183,7 @@ public static ConfigDef getConfigDev() {
169183
TASK_PARTITIONER_CLASS_DOC
170184
);
171185
}
172-
186+
173187
public FileSystemListing<?> getFileSystemListing() {
174188
return getConfiguredInstance(FS_LISTING_CLASS_CONFIG, FileSystemListing.class);
175189
}
@@ -186,6 +200,10 @@ public boolean isTaskHaltOnError() {
186200
return this.getBoolean(TASKS_HALT_ON_ERROR_CONFIG);
187201
}
188202

203+
public long getTaskEmptyPollWaitMs() {
204+
return this.getLong(TASKS_EMPTY_POLL_WAIT_MS_CONFIG);
205+
}
206+
189207
public SourceOffsetPolicy getSourceOffsetPolicy() {
190208
return this.getConfiguredInstance(OFFSET_STRATEGY_CLASS_CONFIG, SourceOffsetPolicy.class);
191209
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ public class FilePulseSourceTask extends SourceTask {
5151

5252
private static final Integer NO_PARTITION = null;
5353

54-
private static final int DEFAULT_POLL_WAIT_MS = 500;
55-
5654
public SourceTaskConfig taskConfig;
5755

5856
private String topic;
@@ -216,8 +214,8 @@ public List<SourceRecord> poll() throws InterruptedException {
216214
}
217215

218216
private void busyWait() throws InterruptedException {
219-
LOG.trace("Waiting {} ms to poll next records", DEFAULT_POLL_WAIT_MS);
220-
Thread.sleep(DEFAULT_POLL_WAIT_MS);
217+
LOG.trace("Waiting {} ms to poll next records", taskConfig.getTaskEmptyPollWaitMs());
218+
Thread.sleep(taskConfig.getTaskEmptyPollWaitMs());
221219
}
222220

223221
/**

0 commit comments

Comments
 (0)