Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add files batching capabilities to the Policies #59

Merged
merged 8 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ General config properties for this connector.
* Default: ``false``
* Importance: medium

``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: long
* Default: ``0``
* Importance: medium

``policy.<policy_name>.<policy_property>``
This represents custom properties you can include based on the policy class specified.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS.";
private static final String POLICY_REGEXP_DISPLAY = "File filter regex";

public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size";
private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching.";
private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch";

public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs.";

public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class";
Expand Down Expand Up @@ -76,6 +80,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.MEDIUM,
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.LONG,
0l,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
++order,
ConfigDef.Width.MEDIUM,
POLICY_BATCH_SIZE_DISPLAY
).define(
FILE_READER_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.BatchIterator;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -36,6 +37,8 @@ abstract class AbstractPolicy implements Policy {
private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final boolean recursive;
private final long batchSize;
private Iterator<FileMetadata> currentIterator;
mmolimar marked this conversation as resolved.
Show resolved Hide resolved
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
Expand All @@ -44,6 +47,7 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.executions = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;

Map<String, Object> customConfigs = customConfigs();
Expand Down Expand Up @@ -105,16 +109,25 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}

if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to avoid this check and set the files iterator with the previous value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we skip this check, the preCheck and postCheck methods will be executed which will cause the CronPolicy, SleepyPolicy and HdfsFileWatcherPolicy to sleep between batches which breaks the current behaviour of sleeping only after handling all the files. Is this what you want?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes totally sense but just returning previous (in case it has elements and ignoring the batch size), ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds ok.
Since previous is the original iterator (not BatchIterator) I will need to create a custom class extending Iterator with a method to reset the counter to 0. Then we can store the BatchIterator in the previous variable instead.

I will change it tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to use com.google.common.collect.Iterators.partition instead which simplified the implementation and becomes more standard.

return BatchIterator.batchIterator(currentIterator, batchSize);
}

preCheck();

executions.incrementAndGet();
Iterator<FileMetadata> files = Collections.emptyIterator();
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}

currentIterator = files;

postCheck();

if(batchSize > 0)
return BatchIterator.batchIterator(files, batchSize);

return files;
}

Expand Down Expand Up @@ -173,7 +186,13 @@ public FileMetadata next() {

@Override
public final boolean hasEnded() {
return interrupted || isPolicyCompleted();
if (interrupted)
return true;

if (currentIterator == null)
return isPolicyCompleted();

return !currentIterator.hasNext() && isPolicyCompleted();
}

protected abstract boolean isPolicyCompleted();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.mmolimar.kafka.connect.fs.util;

import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchIterator {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add this functionality inside the listFiles method in the AbstractPolicy class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we put it in the listFiles method we will be batching each file system independently. Feels to me that is not the intended behaviour.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Maybe renaming this class to Iterators with a partition method with the batch size and a "duplicates" flag to allow removing duplicates in the same partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't understand what you mean by "allow removing duplicates in the same partition".

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example: in the first execution of the policy you get an iterator with 5 records (with a batch size of 3) so 2 remaining. In the next iteration (let's say we're using the Cron policy) we concat the previous 2 with other 5 records. We re-split that iterator and "maybe" in this batch could be an item duplicated, so we should ignore this record and get another one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will not be inline with the present implementation.
An execution is only finished when the iterator is fully traversed, only after that we will sleep. If we mix files from the current iterator with the next one we will be creating a weird behaviour in the policies.

private static final Logger log = LoggerFactory.getLogger(BatchIterator.class);

public static <T> Iterator<T> batchIterator(Iterator<T> iterator, long elementsPerBatch) {
return new Iterator<T>() {
private long count = 0;

@Override
public boolean hasNext() {
log.debug("Current index is {}. Batch size is {}.", count, elementsPerBatch);
return (count < elementsPerBatch) && iterator.hasNext();
}

@Override
public T next() {
T next = iterator.next();
count++;
return next;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ public void invalidEndDate(PolicyFsTestConfig fsConfig) {
@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void canBeInterrupted(PolicyFsTestConfig fsConfig) throws IOException {
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS),
fsConfig.getSourceTaskConfig());
fsConfig.getSourceTaskConfig())) {

for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,22 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException {
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException, IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, "hdfs://localhost:65432/data");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_POLL_MS, "0");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_RETRY_MS, "0");
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
try(Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,25 @@ public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, dynamic.toString());
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
fsConfig.setPolicy(policy);
assertEquals(1, fsConfig.getPolicy().getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(fsConfig.getPolicy().getURIs().get(0).endsWith(uri.toString()));
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {

assertEquals(1, policy.getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(policy.getURIs().get(0).endsWith(uri.toString()));
}
}

@ParameterizedTest
Expand All @@ -221,6 +222,57 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
});
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}


Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());
}
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one");
assertThrows(ConfigException.class, () -> {
new FsSourceTaskConfig(originals);
});

}

protected abstract FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -26,4 +36,47 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories) {
return new FsSourceTaskConfig(cfg);
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}


Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertFalse(policy.hasEnded());
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());
assertFalse(policy.hasEnded());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());
assertTrue(policy.hasEnded());
}
}

}
Loading