Skip to content

Commit

Permalink
Merge cac704a into 588d310
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed May 26, 2020
2 parents 588d310 + cac704a commit 7cad704
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,18 @@ public List<SourceRecord> poll() {
List<SourceRecord> totalRecords = filesToProcess().map(metadata -> {
List<SourceRecord> records = new ArrayList<>();
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
if(reader == null){
log.info("Skipping processing file {} as it is unchanged", metadata);
return records;
}
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
return new ArrayList<SourceRecord>();
}
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());

Expand Down Expand Up @@ -122,9 +127,12 @@ private <T> Stream<T> asStream(Iterator<T> src) {
}

private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) {
Map<String, Long> offsetMap = new HashMap<>();
offsetMap.put("offset", offset);
offsetMap.put("fileSizeBytes", metadata.getLen());
return new SourceRecord(
Collections.singletonMap("path", metadata.getPath()),
Collections.singletonMap("offset", offset),
offsetMap,
config.getTopic(),
struct.schema(),
struct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,17 +198,34 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.orElse(null);

Supplier<FileReader> makeReader = () -> ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals()
);
try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Object offsetObject = offset.get("offset");
Object fileSizeBytesObject = offset.get("fileSizeBytes");

// Only new versions of kafka-connect-fs store the file size bytes
// If we have the byte offset, we can skip reading the entire file if the file size has not changed
if(fileSizeBytesObject != null) {
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("File {} has byte length and byte offset of: {}, skipping reading the file as it is unchanged since the last execution", metadata.getPath(), byteOffset);
return null;
}
}

log.info("Seeking to offset [{}] for file [{}].", offset.get("offset"), metadata.getPath());
reader.seek((Long) offset.get("offset"));
FileReader reader = makeReader.get();
reader.seek((Long) offsetObject);
return reader;
}
return reader;
return makeReader.get();
} catch (Exception e) {
throw new ConnectException("An error has occurred when creating reader for file: " + metadata.getPath(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -41,6 +43,8 @@ public class FsSourceTaskTest {
new HdfsFsConfig()
);
private static final int NUM_RECORDS = 10;
private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt";
private static final int NUM_BYTES_PER_FILE = 390;

@BeforeAll
public static void initFs() throws IOException {
Expand Down Expand Up @@ -79,14 +83,39 @@ public void initTask() {
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);

EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
Capture<Map<String, Object>> captureOne = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

Capture<Map<String, Object>> captureTwo = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureTwo))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
Expand Down Expand Up @@ -158,6 +187,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException {
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException {
for (Path dir : fsConfig.getDirectories()) {
//this file will be skipped since the byte offset for the file is equal to the byte size of the file
Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED);
createDataFile(fsConfig.getFs(), dataFile);
}

fsConfig.getTask().start(fsConfig.getTaskConfig());
List<SourceRecord> records = fsConfig.getTask().poll();
assertEquals(0, records.size());
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void nonExistentUri(TaskFsTestConfig fsConfig) {
Expand Down

0 comments on commit 7cad704

Please sign in to comment.