Navigation Menu

Skip to content

Commit

Permalink
fix(filesystems): fix FileSystemMonitorThread should not fail if file…
Browse files Browse the repository at this point in the history
… metadata cannot be retrieved (#150)

Resolves: #150
  • Loading branch information
fhussonnois committed Jul 31, 2021
1 parent aa62022 commit 370a1d6
Showing 1 changed file with 19 additions and 5 deletions.
Expand Up @@ -18,11 +18,12 @@
*/
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecHandler;
import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecManager;
import io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalFileStorage;
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand All @@ -63,7 +65,7 @@ public LocalFSDirectoryListing() {
/**
* Creates a new {@link LocalFSDirectoryListing} instance.
*
* @param filters the list of filters
* @param filters the list of filters
*/
public LocalFSDirectoryListing(final List<FileListFilter> filters) {
Objects.requireNonNull(filters, "filters can't be null");
Expand All @@ -90,8 +92,19 @@ public Collection<FileObjectMeta> listObjects() throws IllegalArgumentException

private Collection<FileObjectMeta> toSourceObjects(final Collection<File> allFiles) {
return allFiles.stream()
.map(LocalFileObjectMeta::new)
.collect(Collectors.toList());
.map(f -> {
try {
return Optional.of(new LocalFileObjectMeta(f));
} catch (ConnectFilePulseException e) {
LOG.warn(
"Failed to read metadata. Object file is ignored: {}",
e.getMessage()
);
return Optional.<LocalFileObjectMeta>empty();
}
})
.flatMap(Optional::stream)
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -147,7 +160,8 @@ private List<File> listEligibleFiles(final File input) {
listingLocalFiles.addAll(directories.stream()
.filter(f -> !decompressedDirs.contains(f))
.flatMap(f -> listEligibleFiles(f).stream())
.collect(Collectors.toList()));
.collect(Collectors.toList())
);
}
return listingLocalFiles;
}
Expand Down

0 comments on commit 370a1d6

Please sign in to comment.