Skip to content

Commit

Permalink
Add files batching capabilities to the Policies (#59)
Browse files Browse the repository at this point in the history
* Add a simple batch policy for file systems with a lot of files

* Improve code test coverage

* Setup batching in the AbstractPolicy instead of a seperate Policy.

* Fix unit tests

* Fix unit tests by isolating newly created policies

* Rename iterator

* Use com.google.common.collect.Iterators.partition method instead

* Minor changes

Co-authored-by: Mario Molina <mmolimar@gmail.com>
  • Loading branch information
Symbianx and mmolimar committed Jun 11, 2020
1 parent 559ec4b commit 7a35c9b
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 57 deletions.
7 changes: 7 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ General config properties for this connector.
* Default: ``false``
* Importance: medium

``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: int
* Default: ``0``
* Importance: medium

``policy.<policy_name>.<policy_property>``
This represents custom properties you can include based on the policy class specified.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS.";
private static final String POLICY_REGEXP_DISPLAY = "File filter regex";

public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size";
private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching.";
private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch";

public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs.";

public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class";
Expand Down Expand Up @@ -76,6 +80,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.MEDIUM,
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.INT,
0,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
++order,
ConfigDef.Width.MEDIUM,
POLICY_BATCH_SIZE_DISPLAY
).define(
FILE_READER_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import com.google.common.collect.Iterators;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -36,6 +38,8 @@ abstract class AbstractPolicy implements Policy {
private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final boolean recursive;
private final int batchSize;
private Iterator<List<FileMetadata>> previous;
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
Expand All @@ -44,7 +48,9 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.executions = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getInt(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;
this.previous = Collections.emptyIterator();

Map<String, Object> customConfigs = customConfigs();
logAll(customConfigs);
Expand Down Expand Up @@ -105,16 +111,27 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}

if (batchSize > 0 && previous.hasNext()) {
return previous.next().iterator();
}

preCheck();

executions.incrementAndGet();
Iterator<FileMetadata> files = Collections.emptyIterator();
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}

postCheck();

if (batchSize > 0) {
previous = Iterators.partition(files, batchSize);
if (!previous.hasNext())
return Collections.emptyIterator();
return previous.next().iterator();
}

return files;
}

Expand Down Expand Up @@ -143,8 +160,7 @@ private TailCall<Boolean> hasNextRec() {
current = it.next();
return this::hasNextRec;
}
if (current.isFile() &
fileRegexp.matcher(current.getPath().getName()).find()) {
if (current.isFile() && fileRegexp.matcher(current.getPath().getName()).find()) {
return TailCall.done(true);
}
current = null;
Expand Down Expand Up @@ -173,7 +189,11 @@ public FileMetadata next() {

@Override
public final boolean hasEnded() {
return interrupted || isPolicyCompleted();
if (interrupted) {
return true;
}

return !previous.hasNext() && isPolicyCompleted();
}

protected abstract boolean isPolicyCompleted();
Expand All @@ -200,7 +220,9 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
current,
new Path(metadata.getPath()), conf.originals()
);
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Expand All @@ -213,8 +235,7 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
}
}

private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1,
final Iterator<FileMetadata> it2) {
private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1, final Iterator<FileMetadata> it2) {
return new Iterator<FileMetadata>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ public void invalidEndDate(PolicyFsTestConfig fsConfig) {
@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void canBeInterrupted(PolicyFsTestConfig fsConfig) throws IOException {
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS),
fsConfig.getSourceTaskConfig());
fsConfig.getSourceTaskConfig())) {

for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,22 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException {
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException, IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, "hdfs://localhost:65432/data");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_POLL_MS, "0");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_RETRY_MS, "0");
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
try(Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,25 @@ public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, dynamic.toString());
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
fsConfig.setPolicy(policy);
assertEquals(1, fsConfig.getPolicy().getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(fsConfig.getPolicy().getURIs().get(0).endsWith(uri.toString()));
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {

assertEquals(1, policy.getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(policy.getURIs().get(0).endsWith(uri.toString()));
}
}

@ParameterizedTest
Expand All @@ -221,6 +222,53 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
});
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}

Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertTrue(it.hasNext());
String firstPath = it.next().getPath();
assertFalse(it.hasNext());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());
assertNotEquals(firstPath, it.next().getPath());
assertFalse(it.hasNext());
}
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one");
assertThrows(ConfigException.class, () -> {
new FsSourceTaskConfig(originals);
});

}

protected abstract FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -26,4 +36,43 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories) {
return new FsSourceTaskConfig(cfg);
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}

Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertFalse(policy.hasEnded());
assertTrue(it.hasNext());
String firstPath = it.next().getPath();
assertFalse(it.hasNext());
assertFalse(policy.hasEnded());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());
assertNotEquals(firstPath, it.next().getPath());
assertFalse(it.hasNext());
assertTrue(policy.hasEnded());
}
}

}
Loading

0 comments on commit 7a35c9b

Please sign in to comment.