Skip to content

Commit

Permalink
Merge 0e6f300 into 588d310
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero authored May 27, 2020
2 parents 588d310 + 0e6f300 commit c1fa47d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public List<SourceRecord> poll() {
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
return new ArrayList<SourceRecord>();
}
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());

Expand Down Expand Up @@ -122,9 +123,12 @@ private <T> Stream<T> asStream(Iterator<T> src) {
}

private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) {
Map<String, Long> offsetMap = new HashMap<>();
offsetMap.put("offset", offset);
offsetMap.put("fileSizeBytes", metadata.getLen());
return new SourceRecord(
Collections.singletonMap("path", metadata.getPath()),
Collections.singletonMap("offset", offset),
offsetMap,
config.getTopic(),
struct.schema(),
struct
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.mmolimar.kafka.connect.fs.file.reader;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Struct;

import java.util.Map;

public class EmptyFileReader extends AbstractFileReader<Void>{
/*
An empty file reader that will always return no records
Used as a null object instead of returning null
*/
boolean closed;

public EmptyFileReader(FileSystem fs, Path filePath, Map<String, Object> config) {
super(fs, filePath, new FakeReaderAdapter(), config);
this.closed = false;
}

@Override
protected void configure(Map<String, String> config) {}

@Override
protected Void nextRecord() {
return null;
}

@Override
protected boolean hasNextRecord() {
return false;
}

@Override
public void seekFile(long offset) {}

@Override
public boolean isClosed(){
return this.closed;
}

@Override
public void close() {
closed = true;
}

static class FakeReaderAdapter implements ReaderAdapter<Void> {

@Override
public Struct apply(Void record) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.EmptyFileReader;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
Expand All @@ -22,6 +23,7 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,17 +199,34 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.orElse(null);

Supplier<FileReader> makeReader = () -> ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals()
);
try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Object offsetObject = offset.get("offset");
Object fileSizeBytesObject = offset.get("fileSizeBytes");

// Only new versions of kafka-connect-fs store the file size bytes
// If we have the byte offset, we can skip reading the entire file if the file size has not changed
if(fileSizeBytesObject != null) {
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("Skipping file: file {} has byte length and byte offset of: {}", metadata.getPath(), byteOffset);
return new EmptyFileReader(current, new Path(metadata.getPath()), conf.originals());
}
}

log.info("Seeking to offset [{}] for file [{}].", offset.get("offset"), metadata.getPath());
reader.seek((Long) offset.get("offset"));
FileReader reader = makeReader.get();
reader.seek((Long) offsetObject);
return reader;
}
return reader;
return makeReader.get();
} catch (Exception e) {
throw new ConnectException("An error has occurred when creating reader for file: " + metadata.getPath(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -41,6 +43,8 @@ public class FsSourceTaskTest {
new HdfsFsConfig()
);
private static final int NUM_RECORDS = 10;
private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt";
private static final int NUM_BYTES_PER_FILE = 390;

@BeforeAll
public static void initFs() throws IOException {
Expand Down Expand Up @@ -79,14 +83,39 @@ public void initTask() {
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);

EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
Capture<Map<String, Object>> captureOne = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

Capture<Map<String, Object>> captureTwo = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offset(EasyMock.capture(captureTwo))
).andAnswer(() -> {
Map<String, Object> captured = captureOne.getValue();
if(((String)(captured.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);
}};
}else{
return new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}};
}
});

EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
Expand Down Expand Up @@ -158,6 +187,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException {
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException {
for (Path dir : fsConfig.getDirectories()) {
//this file will be skipped since the byte offset for the file is equal to the byte size of the file
Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED);
createDataFile(fsConfig.getFs(), dataFile);
}

fsConfig.getTask().start(fsConfig.getTaskConfig());
List<SourceRecord> records = fsConfig.getTask().poll();
assertEquals(0, records.size());
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void nonExistentUri(TaskFsTestConfig fsConfig) {
Expand Down

0 comments on commit c1fa47d

Please sign in to comment.