Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed May 21, 2020
1 parent 03d81ba commit a1705c6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@

import static org.junit.jupiter.api.Assertions.*;

import org.easymock.EasyMock;
import org.easymock.IAnswer;


public class FsSourceTaskTest {

Expand All @@ -47,6 +44,8 @@ public class FsSourceTaskTest {
new HdfsFsConfig()
);
private static final int NUM_RECORDS = 10;
private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt";
private static final int NUM_BYTES_PER_FILE = 390;

@BeforeAll
public static void initFs() throws IOException {
Expand Down Expand Up @@ -90,9 +89,19 @@ public void initTask() {
offsetStorageReader.offsets(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<Map<String, Object>, Map<String, Object>> map = new HashMap<>();
captureOne.getValue().forEach(part -> map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}}));
captureOne.getValue().forEach(part -> {
if(((String)(part.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);

}});
}else{
map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}});
}
});
return map;
});

Expand Down Expand Up @@ -166,6 +175,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException {
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException {
for (Path dir : fsConfig.getDirectories()) {
//this file will be skipped since the byte offset for the file is equal to the byte size of the file
Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED);
createDataFile(fsConfig.getFs(), dataFile);
}

fsConfig.getTask().start(fsConfig.getTaskConfig());
List<SourceRecord> records = fsConfig.getTask().poll();
assertEquals(0, records.size());
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void nonExistentUri(TaskFsTestConfig fsConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public void testIteratorChunkingWithChunkGreaterThanNumElementsWorks() {
assertEquals(expected, materializedChunkedIterator);
}

@Test
public void testIteratorChunkingWithEmptyIteratorWorks() {
Iterator<Integer> iterator = IntStream.range(0, 0).boxed().iterator();
Iterator<List<Integer>> chunkedIterator = IteratorUtils.chunkIterator(iterator, 5);

List<List<Integer>> materializedChunkedIterator = asStream(chunkedIterator).collect(Collectors.toList());
ArrayList<ArrayList<Integer>> expected = new ArrayList<>();

assertEquals(expected, materializedChunkedIterator);
}

@Test
public void testIteratorChunkingThrowsWithInvalidChunkSize() {
Iterator<Integer> iterator = IntStream.rangeClosed(0, 2).boxed().iterator();
Expand Down

0 comments on commit a1705c6

Please sign in to comment.