Skip to content

Commit

Permalink
Add EmptyFileReader
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed May 27, 2020
1 parent 9acbff0 commit 0e6f300
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ public List<SourceRecord> poll() {
List<SourceRecord> totalRecords = filesToProcess().map(metadata -> {
List<SourceRecord> records = new ArrayList<>();
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
if(reader == null){
log.info("Skipping processing file {} as it is unchanged", metadata);
return records;
}
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.mmolimar.kafka.connect.fs.file.reader;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Struct;

import java.util.Map;

public class EmptyFileReader extends AbstractFileReader<Void>{
/*
An empty file reader that will always return no records
Used as a null object instead of returning null
*/
boolean closed;

public EmptyFileReader(FileSystem fs, Path filePath, Map<String, Object> config) {
super(fs, filePath, new FakeReaderAdapter(), config);
this.closed = false;
}

@Override
protected void configure(Map<String, String> config) {}

@Override
protected Void nextRecord() {
return null;
}

@Override
protected boolean hasNextRecord() {
return false;
}

@Override
public void seekFile(long offset) {}

@Override
public boolean isClosed(){
return this.closed;
}

@Override
public void close() {
closed = true;
}

static class FakeReaderAdapter implements ReaderAdapter<Void> {

@Override
public Struct apply(Void record) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.EmptyFileReader;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
Expand Down Expand Up @@ -216,7 +217,7 @@ current, new Path(metadata.getPath()), conf.originals()
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("Skipping file: file {} has byte length and byte offset of: {}", metadata.getPath(), byteOffset);
return null;
return new EmptyFileReader(current, new Path(metadata.getPath()), conf.originals());
}
}

Expand Down

0 comments on commit 0e6f300

Please sign in to comment.