Skip to content

Commit

Permalink
Improve performance of connector by skipping unchanged files and batc…
Browse files Browse the repository at this point in the history
…hing access to kafka connect offsets
  • Loading branch information
grantatspothero committed May 21, 2020
1 parent 588d310 commit 03d81ba
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.policy.Policy;
import com.github.mmolimar.kafka.connect.fs.util.IteratorUtils;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.Version;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -18,9 +19,9 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class FsSourceTask extends SourceTask {

Expand Down Expand Up @@ -74,22 +75,42 @@ public void start(Map<String, String> properties) {
public List<SourceRecord> poll() {
while (!stop.get() && policy != null && !policy.hasEnded()) {
log.trace("Polling for new data...");

List<SourceRecord> totalRecords = filesToProcess().map(metadata -> {
List<SourceRecord> records = new ArrayList<>();
try (FileReader reader = policy.offer(metadata, context.offsetStorageReader())) {
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
Function<FileMetadata, Map<String, Object>> makePartitionKey = (FileMetadata metadata) ->
Collections.singletonMap("path", metadata.getPath());

List<SourceRecord> totalRecords = filesToProcess().flatMap(chunkedMetadata -> {
List<Map<String, Object>> partitions = chunkedMetadata.stream()
.map(makePartitionKey)
.collect(Collectors.toList());

Map<Map<String, Object>, Map<String, Object>> offsets = context.offsetStorageReader().offsets(partitions);

Stream<SourceRecord> chunkedRecords = chunkedMetadata.stream().flatMap( metadata -> {
List<SourceRecord> records = new ArrayList<>();
Map<String, Object> partitionKey = makePartitionKey.apply(metadata);
Map<String, Object> offset = offsets.get(partitionKey);

try (FileReader reader = policy.offer(metadata, offset)) {
if(reader == null){
log.info("Skipping processing file {} as it is unchanged", metadata);
return records.stream();
}
log.info("Processing records for file {}.", metadata);
while (reader.hasNext()) {
records.add(convert(metadata, reader.currentOffset() + 1, reader.next()));
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
}
} catch (ConnectException | IOException e) {
//when an exception happens reading a file, the connector continues
log.error("Error reading file [{}]. Keep going...", metadata.getPath(), e);
}
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());
log.debug("Read [{}] records from file [{}].", records.size(), metadata.getPath());

return records.stream();
});

return chunkedRecords;

return records;
}).flatMap(Collection::stream).collect(Collectors.toList());
}).collect(Collectors.toList());

log.debug("Returning [{}] records in execution number [{}] for policy [{}].",
totalRecords.size(), policy.getExecutions(), policy.getClass().getName());
Expand All @@ -103,10 +124,16 @@ public List<SourceRecord> poll() {
return null;
}

private Stream<FileMetadata> filesToProcess() {
private Stream<List<FileMetadata>> filesToProcess() {
try {
return asStream(policy.execute())
.filter(metadata -> metadata.getLen() > 0);
int chunkSize = config.getInt(FsSourceTaskConfig.FILES_CHUNK_SIZE);
Iterator<List<FileMetadata>> chunked = IteratorUtils.chunkIterator(policy.execute(), chunkSize);
return IteratorUtils.asStream(chunked)
.map(chunk ->
chunk.stream().filter(
metadata -> metadata.getLen() > 0
).collect(Collectors.toList())
);
} catch (IOException | ConnectException e) {
//when an exception happens executing the policy, the connector continues
log.error("Cannot retrieve files to process from the FS: {}. " +
Expand All @@ -116,15 +143,14 @@ private Stream<FileMetadata> filesToProcess() {
}
}

private <T> Stream<T> asStream(Iterator<T> src) {
Iterable<T> iterable = () -> src;
return StreamSupport.stream(iterable.spliterator(), false);
}

private SourceRecord convert(FileMetadata metadata, long offset, Struct struct) {
Map<String, Long> offsetMap = new HashMap<>();
offsetMap.put("offset", offset);
offsetMap.put("fileSizeBytes", metadata.getLen());

return new SourceRecord(
Collections.singletonMap("path", metadata.getPath()),
Collections.singletonMap("offset", offset),
offsetMap,
config.getTopic(),
struct.schema(),
struct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
public static final int POLL_INTERVAL_MS_DEFAULT = 10000;
private static final String POLL_INTERVAL_MS_DISPLAY = "Poll Interval (ms)";

public static final String FILES_CHUNK_SIZE = "files.chunk.size";
private static final String FILES_CHUNK_SIZE_DOC = "The number of files that will be be chunked together for grabbing offsets from kafka connect. Tune this value for higher throughput if you notice a delay between processing files.";
public static final int FILES_CHUNK_SIZE_DEFAULT = 20;
private static final String FILES_CHUNK_SIZE_DISPLAY = "Files chunk size";

private static final String POLICY_GROUP = "Policy";
private static final String CONNECTOR_GROUP = "Connector";

Expand Down Expand Up @@ -96,6 +101,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.SHORT,
POLL_INTERVAL_MS_DISPLAY
).define(
FILES_CHUNK_SIZE,
ConfigDef.Type.INT,
FILES_CHUNK_SIZE_DEFAULT,
ConfigDef.Importance.MEDIUM,
FILES_CHUNK_SIZE_DOC,
CONNECTOR_GROUP,
++order,
ConfigDef.Width.MEDIUM,
FILES_CHUNK_SIZE_DISPLAY
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
Expand All @@ -22,6 +23,7 @@
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -192,22 +194,39 @@ FileMetadata toMetadata(LocatedFileStatus fileStatus) {
}

@Override
public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) {
public FileReader offer(FileMetadata metadata, Map<String, Object> offset) {
FileSystem current = fileSystems.stream()
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.orElse(null);

Supplier<FileReader> makeReader = () -> ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals()
);

try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Object offsetObject = offset.get("offset");
Object fileSizeBytesObject = offset.get("fileSizeBytes");

// Only new versions of kafka-connect-fs store the file size bytes
// If we have the byte offset, we can skip reading the entire file if the file size has not changed
if(fileSizeBytesObject != null) {
Long byteOffset = (Long)fileSizeBytesObject;
if (metadata.getLen() == byteOffset){
log.info("File {} has byte length and byte offset of: {}, skipping reading the file as it is unchanged since the last execution", metadata.getPath(), byteOffset);
return null;
}
}

log.info("Seeking to offset [{}] for file [{}].", offset.get("offset"), metadata.getPath());
reader.seek((Long) offset.get("offset"));
FileReader reader = makeReader.get();
reader.seek((Long) offsetObject);
return reader;

}
return reader;
return makeReader.get();
} catch (Exception e) {
throw new ConnectException("An error has occurred when creating reader for file: " + metadata.getPath(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public interface Policy extends Closeable {

Iterator<FileMetadata> execute() throws IOException;

FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) throws IOException;
FileReader offer(FileMetadata metadata, Map<String, Object> offset) throws IOException;

boolean hasEnded();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.github.mmolimar.kafka.connect.fs.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class IteratorUtils {
public static <T> Iterator<List<T>> chunkIterator(Iterator<T> iterator, int elementsPerChunk){
if(elementsPerChunk <= 0){
throw new IllegalArgumentException(String.format("elementsPerChunk must be greater than 0 but was set to %d", elementsPerChunk));
}
return new Iterator<List<T>>() {

public boolean hasNext() {
return iterator.hasNext();
}

public List<T> next() {
List<T> result = new ArrayList<>(elementsPerChunk);
for (int i = 0; i < elementsPerChunk && iterator.hasNext(); i++) {
result.add(iterator.next());
}
return result;
}
};
}

public static <T> Stream<T> asStream(Iterator<T> src) {
Iterable<T> iterable = () -> src;
return StreamSupport.stream(iterable.spliterator(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -34,6 +36,10 @@

import static org.junit.jupiter.api.Assertions.*;

import org.easymock.EasyMock;
import org.easymock.IAnswer;


public class FsSourceTaskTest {

private static final List<TaskFsTestConfig> TEST_FILE_SYSTEMS = Arrays.asList(
Expand Down Expand Up @@ -79,14 +85,16 @@ public void initTask() {
EasyMock.expect(taskContext.offsetStorageReader())
.andReturn(offsetStorageReader);

EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject()))
.andReturn(new HashMap<String, Object>() {{
put("offset", (long) (NUM_RECORDS / 2));
}});
Capture<Collection<Map<String, Object>>> captureOne = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(
offsetStorageReader.offsets(EasyMock.capture(captureOne))
).andAnswer(() -> {
Map<Map<String, Object>, Map<String, Object>> map = new HashMap<>();
captureOne.getValue().forEach(part -> map.put(part, new HashMap<String, Object>(){{
put("offset", (long)(NUM_RECORDS/2));
}}));
return map;
});

EasyMock.checkOrder(taskContext, false);
EasyMock.replay(taskContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.mmolimar.kafka.connect.fs.util;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.github.mmolimar.kafka.connect.fs.util.IteratorUtils.asStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class IteratorUtilsTest {

@Test
public void testIteratorChunkingWorks() {
Iterator<Integer> iterator = IntStream.rangeClosed(0, 9).boxed().iterator();
Iterator<List<Integer>> chunkedIterator = IteratorUtils.chunkIterator(iterator, 2);

List<List<Integer>> materializedChunkedIterator = asStream(chunkedIterator).collect(Collectors.toList());
ArrayList<ArrayList<Integer>> expected = new ArrayList<ArrayList<Integer>>(){{
add(new ArrayList<Integer>(){{
add(0); add(1);
}});
add(new ArrayList<Integer>(){{
add(2); add(3);
}});
add(new ArrayList<Integer>(){{
add(4); add(5);
}});
add(new ArrayList<Integer>(){{
add(6); add(7);
}});
add(new ArrayList<Integer>(){{
add(8); add(9);
}});
}};

assertEquals(expected, materializedChunkedIterator);
}

@Test
public void testIteratorChunkingWorksWithUnevenChunks() {
Iterator<Integer> iterator = IntStream.rangeClosed(0, 4).boxed().iterator();
Iterator<List<Integer>> chunkedIterator = IteratorUtils.chunkIterator(iterator, 2);

List<List<Integer>> materializedChunkedIterator = asStream(chunkedIterator).collect(Collectors.toList());
ArrayList<ArrayList<Integer>> expected = new ArrayList<ArrayList<Integer>>(){{
add(new ArrayList<Integer>(){{
add(0); add(1);
}});
add(new ArrayList<Integer>(){{
add(2); add(3);
}});
add(new ArrayList<Integer>(){{
add(4);
}});
}};

assertEquals(expected, materializedChunkedIterator);
}

@Test
public void testIteratorChunkingWithChunkGreaterThanNumElementsWorks() {
Iterator<Integer> iterator = IntStream.rangeClosed(0, 2).boxed().iterator();
Iterator<List<Integer>> chunkedIterator = IteratorUtils.chunkIterator(iterator, 5);

List<List<Integer>> materializedChunkedIterator = asStream(chunkedIterator).collect(Collectors.toList());
ArrayList<ArrayList<Integer>> expected = new ArrayList<ArrayList<Integer>>(){{
add(new ArrayList<Integer>(){{
add(0); add(1); add(2);
}});
}};

assertEquals(expected, materializedChunkedIterator);
}

@Test
public void testIteratorChunkingThrowsWithInvalidChunkSize() {
Iterator<Integer> iterator = IntStream.rangeClosed(0, 2).boxed().iterator();

assertThrows(IllegalArgumentException.class, () -> {
IteratorUtils.chunkIterator(iterator, 0);
});

}
}

0 comments on commit 03d81ba

Please sign in to comment.