diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java index 8581f5f..da097ef 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java @@ -36,9 +36,6 @@ import static org.junit.jupiter.api.Assertions.*; -import org.easymock.EasyMock; -import org.easymock.IAnswer; - public class FsSourceTaskTest { @@ -47,6 +44,8 @@ public class FsSourceTaskTest { new HdfsFsConfig() ); private static final int NUM_RECORDS = 10; + private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt"; + private static final int NUM_BYTES_PER_FILE = 390; @BeforeAll public static void initFs() throws IOException { @@ -90,9 +89,19 @@ public void initTask() { offsetStorageReader.offsets(EasyMock.capture(captureOne)) ).andAnswer(() -> { Map, Map> map = new HashMap<>(); - captureOne.getValue().forEach(part -> map.put(part, new HashMap(){{ - put("offset", (long)(NUM_RECORDS/2)); - }})); + captureOne.getValue().forEach(part -> { + if(((String)(part.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){ + map.put(part, new HashMap(){{ + put("offset", (long)(NUM_RECORDS)); + put("fileSizeBytes", (long)NUM_BYTES_PER_FILE); + + }}); + }else{ + map.put(part, new HashMap(){{ + put("offset", (long)(NUM_RECORDS/2)); + }}); + } + }); return map; }); @@ -166,6 +175,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException { assertNull(fsConfig.getTask().poll()); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + //this file will be skipped since the byte offset for the file is equal to the byte size of the file + Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED); + createDataFile(fsConfig.getFs(), dataFile); + } + + fsConfig.getTask().start(fsConfig.getTaskConfig()); + List records = fsConfig.getTask().poll(); + assertEquals(0, records.size()); + assertNull(fsConfig.getTask().poll()); + } + @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void nonExistentUri(TaskFsTestConfig fsConfig) { diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/util/IteratorUtilsTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/util/IteratorUtilsTest.java index f5663c8..818b191 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/util/IteratorUtilsTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/util/IteratorUtilsTest.java @@ -77,6 +77,17 @@ public void testIteratorChunkingWithChunkGreaterThanNumElementsWorks() { assertEquals(expected, materializedChunkedIterator); } + @Test + public void testIteratorChunkingWithEmptyIteratorWorks() { + Iterator iterator = IntStream.range(0, 0).boxed().iterator(); + Iterator> chunkedIterator = IteratorUtils.chunkIterator(iterator, 5); + + List> materializedChunkedIterator = asStream(chunkedIterator).collect(Collectors.toList()); + ArrayList> expected = new ArrayList<>(); + + assertEquals(expected, materializedChunkedIterator); + } + @Test public void testIteratorChunkingThrowsWithInvalidChunkSize() { Iterator iterator = IntStream.rangeClosed(0, 2).boxed().iterator();