Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed May 26, 2020
1 parent 9af7a02 commit cac704a
Showing 1 changed file with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -41,6 +43,8 @@ public class FsSourceTaskTest {
new HdfsFsConfig()
);
private static final int NUM_RECORDS = 10;
private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt";
private static final int NUM_BYTES_PER_FILE = 390;

@BeforeAll
public static void initFs() throws IOException {
Expand Down Expand Up @@ -79,14 +83,39 @@ public void initTask() {
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);

EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
Capture<Map<String, Object>> captureOne = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

Capture<Map<String, Object>> captureTwo = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureTwo))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
Expand Down Expand Up @@ -158,6 +187,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException {
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException {
for (Path dir : fsConfig.getDirectories()) {
//this file will be skipped since the byte offset for the file is equal to the byte size of the file
Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED);
createDataFile(fsConfig.getFs(), dataFile);
}

fsConfig.getTask().start(fsConfig.getTaskConfig());
List<SourceRecord> records = fsConfig.getTask().poll();
assertEquals(0, records.size());
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void nonExistentUri(TaskFsTestConfig fsConfig) {
Expand Down

0 comments on commit cac704a

Please sign in to comment.