Skip to content

Commit

Permalink
Merge 2751510 into 588d310
Browse files Browse the repository at this point in the history
  • Loading branch information
grantatspothero committed May 26, 2020
2 parents 588d310 + 2751510 commit 23673c1
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.policy.Policy;
import com.github.mmolimar.kafka.connect.fs.util.IteratorUtils;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.Version;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -18,9 +19,9 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class FsSourceTask extends SourceTask {

Expand Down Expand Up @@ -74,22 +75,43 @@ public void start(Map<String, String> properties) {
public List<SourceRecord> poll() {
while (!stop.get() && policy != null && !policy.hasEnded()) {
log.trace("Polling for new data...");

List<SourceRecord> totalRecords = filesToProcess().map(metadata -> {
List<SourceRecord> records = new ArrayList<>();
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
Function<FileMetadata, Map<String, Object>> makePartitionKey = (FileMetadata metadata) ->
Collections.singletonMap("path", metadata.getPath());

List<SourceRecord> totalRecords = filesToProcess().flatMap(chunkedMetadata -> {
List<Map<String, Object>> partitions = chunkedMetadata.stream()
.map(makePartitionKey)
.collect(Collectors.toList());

Map<Map<String, Object>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(partitions);

Stream<SourceRecord> chunkedRecords = chunkedMetadata.stream().flatMap( metadata -> {
List<SourceRecord> records = new ArrayList<>();
Map<String, Object> partitionKey = makePartitionKey.apply(metadata);
Map<String, Object> offset = offsets.get(partitionKey);

try (FileReader reader = policy.offer(metadata, offset)) {
if(reader == null){
log.info("Skipping processing file {} as it is unchanged", metadata);
return records.stream();
}
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
return new ArrayList<SourceRecord>().stream();
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
}
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());

return records.stream();
});

return chunkedRecords;

return records;
}).flatMap(Collection::stream).collect(Collectors.toList());
}).collect(Collectors.toList());

log.debug("Returning [{}] records in execution number [{}] for policy [{}].",
totalRecords.size(), policy.getExecutions(), policy.getClass().getName());
Expand All @@ -103,10 +125,16 @@ public List<SourceRecord> poll() {
return null;
}

private Stream<FileMetadata> filesToProcess() {
private Stream<List<FileMetadata>> filesToProcess() {
try {
return asStream(policy.execute())
.filter(metadata -> metadata.getLen() > 0);
int chunkSize = config.getInt(FsSourceTaskConfig.FILES_CHUNK_SIZE);
Iterator<List<FileMetadata>> chunked = IteratorUtils.chunkIterator(policy.execute(), chunkSize);
return IteratorUtils.asStream(chunked)
.map(chunk ->
chunk.stream().filter(
metadata -> metadata.getLen() > 0
).collect(Collectors.toList())
);
} catch (IOException | ConnectException e) {
//when an exception happens executing the policy, the connector continues
log.error("Cannot retrieve files to process from the FS: {}. " +
Expand All @@ -116,15 +144,14 @@ private Stream<FileMetadata> filesToProcess() {
}
}

private <T> Stream<T> asStream(Iterator<T> src) {
Iterable<T> iterable = () -> src;
return StreamSupport.stream(iterable.spliterator(), false);
}

private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) {
Map<String, Long> offsetMap = new HashMap<>();
offsetMap.put("offset", offset);
offsetMap.put("fileSizeBytes", metadata.getLen());

return new SourceRecord(
Collections.singletonMap("path", metadata.getPath()),
Collections.singletonMap("offset", offset),
offsetMap,
config.getTopic(),
struct.schema(),
struct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
public static final int POLL_INTERVAL_MS_DEFAULT = 10000;
private static final String POLL_INTERVAL_MS_DISPLAY = "Poll Interval (ms)";

public static final String FILES_CHUNK_SIZE = "files.chunk.size";
private static final String FILES_CHUNK_SIZE_DOC = "The number of files that will be be chunked together for grabbing offsets from kafka connect. Tune this value for higher throughput if you notice a delay between processing files.";
public static final int FILES_CHUNK_SIZE_DEFAULT = 20;
private static final String FILES_CHUNK_SIZE_DISPLAY = "Files chunk size";

private static final String POLICY_GROUP = "Policy";
private static final String CONNECTOR_GROUP = "Connector";

Expand Down Expand Up @@ -96,6 +101,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.SHORT,
POLL_INTERVAL_MS_DISPLAY
).define(
FILES_CHUNK_SIZE,
ConfigDef.Type.INT,
FILES_CHUNK_SIZE_DEFAULT,
ConfigDef.Importance.MEDIUM,
FILES_CHUNK_SIZE_DOC,
CONNECTOR_GROUP,
++order,
ConfigDef.Width.MEDIUM,
FILES_CHUNK_SIZE_DISPLAY
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
Expand All @@ -22,6 +23,7 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -192,22 +194,39 @@ FileMetadata toMetadata(LocatedFileStatus fileStatus) {
}

@Override
public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) {
public FileReader offer(FileMetadata metadata, Map<String, Object> offset) {
FileSystem current = fileSystems.stream()
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.orElse(null);

Supplier<FileReader> makeReader = () -> ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals()
);

try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Object offsetObject = offset.get("offset");
Object fileSizeBytesObject = offset.get("fileSizeBytes");

// Only new versions of kafka-connect-fs store the file size bytes
// If we have the byte offset, we can skip reading the entire file if the file size has not changed
if(fileSizeBytesObject != null) {
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("File {} has byte length and byte offset of: {}, skipping reading the file as it is unchanged since the last execution", metadata.getPath(), byteOffset);
return null;
}
}

log.info("Seeking to offset [{}] for file [{}].", offset.get("offset"), metadata.getPath());
reader.seek((Long) offset.get("offset"));
FileReader reader = makeReader.get();
reader.seek((Long) offsetObject);
return reader;

}
return reader;
return makeReader.get();
} catch (Exception e) {
throw new ConnectException("An error has occurred when creating reader for file: " + metadata.getPath(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public interface Policy extends Closeable {

Iterator<FileMetadata> execute() throws IOException;

FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) throws IOException;
FileReader offer(FileMetadata metadata, Map<String, Object> offset) throws IOException;

boolean hasEnded();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.mmolimar.kafka.connect.fs.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class IteratorUtils {
public static <T> Iterator<List<T>> chunkIterator(Iterator<T> iterator, int elementsPerChunk){
if(elementsPerChunk <= 0){
throw new IllegalArgumentException(String.format("elementsPerChunk must be greater than 0 but was set to %d", elementsPerChunk));
}
return new Iterator<List<T>>() {

public boolean hasNext() {
return iterator.hasNext();
}

public List<T> next() {
List<T> result = new ArrayList<>(elementsPerChunk);
for (int i = 0; i < elementsPerChunk && iterator.hasNext(); i++) {
result.add(iterator.next());
}
return result;
}
};
}

public static <T> Stream<T> asStream(Iterator<T> src) {
Iterable<T> iterable = () -> src;
return StreamSupport.stream(iterable.spliterator(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -34,13 +36,16 @@

import static org.junit.jupiter.api.Assertions.*;


public class FsSourceTaskTest {

private static final List<TaskFsTestConfig> TEST_FILE_SYSTEMS = Arrays.asList(
new LocalFsConfig(),
new HdfsFsConfig()
);
private static final int NUM_RECORDS = 10;
private static final String SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED = "123.txt";
private static final int NUM_BYTES_PER_FILE = 390;

@BeforeAll
public static void initFs() throws IOException {
Expand Down Expand Up @@ -79,14 +84,26 @@ public void initTask() {
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);

EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
Capture<Collection<Map<String, Object>>> captureOne = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offsets(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<Map<String, Object>, Map<String, Object>> map = new HashMap<>();
captureOne.getValue().forEach(part -> {
if(((String)(part.get("path"))).endsWith(SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED)){
map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS));
put("fileSizeBytes", (long)NUM_BYTES_PER_FILE);

}});
}else{
map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}});
}
});
return map;
});

EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
Expand Down Expand Up @@ -158,6 +175,21 @@ public void oneFilePerFs(TaskFsTestConfig fsConfig) throws IOException {
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void skipsFetchingFileIfByteOffsetExistsAndMatchesFileLength(TaskFsTestConfig fsConfig) throws IOException {
for (Path dir : fsConfig.getDirectories()) {
//this file will be skipped since the byte offset for the file is equal to the byte size of the file
Path dataFile = new Path(dir, SPECIAL_FILE_NAME_INDICATING_FILE_ALREADY_PROCESSED);
createDataFile(fsConfig.getFs(), dataFile);
}

fsConfig.getTask().start(fsConfig.getTaskConfig());
List<SourceRecord> records = fsConfig.getTask().poll();
assertEquals(0, records.size());
assertNull(fsConfig.getTask().poll());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void nonExistentUri(TaskFsTestConfig fsConfig) {
Expand Down
Loading

0 comments on commit 23673c1

Please sign in to comment.