Skip to content

Commit 17e9efb

Browse files
committed
fix(plugin): fix regression cleanup object files should not be rescheduled (#178)
1 parent 0644cb9 commit 17e9efb

File tree

4 files changed

+33
-21
lines changed

4 files changed

+33
-21
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectStatus.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,15 @@ public enum FileObjectStatus {
6060
FAILED,
6161

6262
/**
63-
* The file has been successfully clean up (depending of the configured strategy).
63+
* The file has been successfully clean up (depending on the configured strategy).
6464
*/
6565
CLEANED;
6666

6767
public boolean isOneOf(final FileObjectStatus...states) {
6868
return Arrays.asList(states).contains(this);
6969
}
70+
71+
public boolean isDone() {
72+
return isOneOf(COMMITTED, FAILED, CLEANED);
73+
}
7074
}

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,10 @@ private synchronized boolean updateFiles() {
289289
offsetPolicy,
290290
fileObjectKey -> {
291291
final FileObject fileObject = snapshot.getForKey(fileObjectKey.original());
292-
return !(fileObject != null && cleanablePredicate.test(fileObject.status()));
292+
if (fileObject == null) return true;
293+
294+
final FileObjectStatus status = fileObject.status();
295+
return !(cleanablePredicate.test(status) || status.isDone());
293296
},
294297
objects
295298
);
@@ -378,6 +381,7 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
378381
LOG.warn("Failed to read state changelog while scheduling object files. Timeout.");
379382
}
380383

384+
// Check if all scanned object-files can be schedule.
381385
if (scanned.size() <= maxFilesToSchedule) {
382386
scheduled.putAll(scanned);
383387
} else {
@@ -388,10 +392,7 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
388392
}
389393
}
390394

391-
partitions = scanned.values()
392-
.stream()
393-
.sorted(BY_LAST_MODIFIED)
394-
.collect(Collectors.toList());
395+
partitions = new ArrayList<>(scheduled.values());
395396

396397
attempts++;
397398
if (changed.get()) {
@@ -411,10 +412,17 @@ public List<FileObjectMeta> listFilesToSchedule(final int maxFilesToSchedule) {
411412
}
412413

413414
if (partitions.isEmpty()) {
414-
LOG.warn("Filesystem could not be scanned quickly enough, " +
415-
"or no object file was detected after starting the connector.");
415+
LOG.warn(
416+
"Filesystem could not be scanned quickly enough, " +
417+
"or no object file was detected after starting the connector."
418+
);
416419
}
417-
return partitions;
420+
421+
// Sort all object files by the last-modified date before returning
422+
return partitions
423+
.stream()
424+
.sorted(BY_LAST_MODIFIED)
425+
.collect(Collectors.toList());
418426
} finally {
419427
scanned.clear();
420428
taskReconfigurationRequested.set(false);

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/DelegateTaskFileURIProvider.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig;
2222
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2323
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
24+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
2425
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
2526
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
2627
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
@@ -95,16 +96,15 @@ public List<URI> nextURIs() {
9596
if (fileObject == null)
9697
return true;
9798

98-
switch (fileObject.status()) {
99-
case COMMITTED:
100-
case FAILED:
101-
case CLEANED:
102-
return false;
103-
case COMPLETED:
104-
return isFirstCall;
105-
default:
106-
return true;
107-
}
99+
final FileObjectStatus status = fileObject.status();
100+
101+
// If an object file is marked as COMPLETED (i.e. not COMMITTED) we should only consider
102+
// it processable the first time this method is called.
103+
if (status == FileObjectStatus.COMPLETED)
104+
return isFirstCall;
105+
106+
// Otherwise, only return true if the is file not already done.DelegateTaskFileURIProvider
107+
return !status.isDone();
108108
},
109109
fileSystemListing.listObjects()
110110
).values();

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/FileObjectCandidatesFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import java.util.stream.Stream;
3737

3838
/**
39-
* Class which is used to determinate if a a list of file can be processed.
39+
* Class which is used to determinate if a list of file can be processed.
4040
*/
4141
public class FileObjectCandidatesFilter {
4242

@@ -70,7 +70,7 @@ public Map<FileObjectKey, FileObjectMeta> filter(final Collection<FileObjectMeta
7070
.filter(kv -> predicate.test(FileObjectKey.of(kv.key)))
7171
.collect(Collectors.toList());
7272

73-
// Looking for duplicates in sources files, i.e the OffsetPolicy generate two identical offsets for two files.
73+
// Looking for duplicates in object files, i.e., the OffsetPolicy generates two identical offsets for two files.
7474
final Stream<Map.Entry<String, List<KeyValuePair<String, FileObjectMeta>>>> entryStream = toScheduled
7575
.stream()
7676
.collect(Collectors.groupingBy(kv -> kv.key))

0 commit comments

Comments
 (0)