From 370a1d6fc036886f8685ee87f012f2c9a13cdaf6 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Wed, 28 Jul 2021 09:54:28 +0200 Subject: [PATCH] fix(filesystems): fix FileSystemMonitorThread should not fail if file metadata cannot be retrieved (#150) Resolves: #150 --- .../filepulse/fs/LocalFSDirectoryListing.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java index b6606476a..e22a4db5c 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java +++ b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java @@ -18,11 +18,12 @@ */ package io.streamthoughts.kafka.connect.filepulse.fs; +import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException; import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecHandler; import io.streamthoughts.kafka.connect.filepulse.fs.codec.CodecManager; import io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalFileStorage; -import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -63,7 +65,7 @@ public LocalFSDirectoryListing() { /** * Creates a new {@link LocalFSDirectoryListing} instance. * - * @param filters the list of filters + * @param filters the list of filters */ public LocalFSDirectoryListing(final List filters) { Objects.requireNonNull(filters, "filters can't be null"); @@ -90,8 +92,19 @@ public Collection listObjects() throws IllegalArgumentException private Collection toSourceObjects(final Collection allFiles) { return allFiles.stream() - .map(LocalFileObjectMeta::new) - .collect(Collectors.toList()); + .map(f -> { + try { + return Optional.of(new LocalFileObjectMeta(f)); + } catch (ConnectFilePulseException e) { + LOG.warn( + "Failed to read metadata. Object file is ignored: {}", + e.getMessage() + ); + return Optional.empty(); + } + }) + .flatMap(Optional::stream) + .collect(Collectors.toList()); } /** @@ -147,7 +160,8 @@ private List listEligibleFiles(final File input) { listingLocalFiles.addAll(directories.stream() .filter(f -> !decompressedDirs.contains(f)) .flatMap(f -> listEligibleFiles(f).stream()) - .collect(Collectors.toList())); + .collect(Collectors.toList()) + ); } return listingLocalFiles; }