Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add files batching capabilities to the Policies #59

Merged
merged 8 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ General config properties for this connector.
* Default: ``false``
* Importance: medium

``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: int
* Default: ``0``
* Importance: medium

``policy.<policy_name>.<policy_property>``
This represents custom properties you can include based on the policy class specified.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS.";
private static final String POLICY_REGEXP_DISPLAY = "File filter regex";

public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size";
private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching.";
private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch";

public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs.";

public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class";
Expand Down Expand Up @@ -76,6 +80,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.MEDIUM,
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.INT,
0,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
++order,
ConfigDef.Width.MEDIUM,
POLICY_BATCH_SIZE_DISPLAY
).define(
FILE_READER_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import com.google.common.collect.Iterators;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -36,6 +38,8 @@ abstract class AbstractPolicy implements Policy {
private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final boolean recursive;
private final int batchSize;
private Iterator<List<FileMetadata>> previous;
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
Expand All @@ -44,7 +48,9 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.executions = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getInt(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;
this.previous = Collections.emptyIterator();

Map<String, Object> customConfigs = customConfigs();
logAll(customConfigs);
Expand Down Expand Up @@ -105,16 +111,27 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}

if (batchSize > 0 && previous.hasNext()) {
return previous.next().iterator();
}

preCheck();

executions.incrementAndGet();
Iterator<FileMetadata> files = Collections.emptyIterator();
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}

postCheck();

if (batchSize > 0) {
previous = Iterators.partition(files, batchSize);
if (!previous.hasNext())
return Collections.emptyIterator();
return previous.next().iterator();
}

return files;
}

Expand Down Expand Up @@ -143,8 +160,7 @@ private TailCall<Boolean> hasNextRec() {
current = it.next();
return this::hasNextRec;
}
if (current.isFile() &
fileRegexp.matcher(current.getPath().getName()).find()) {
if (current.isFile() && fileRegexp.matcher(current.getPath().getName()).find()) {
return TailCall.done(true);
}
current = null;
Expand Down Expand Up @@ -173,7 +189,11 @@ public FileMetadata next() {

@Override
public final boolean hasEnded() {
return interrupted || isPolicyCompleted();
if (interrupted) {
return true;
}

return !previous.hasNext() && isPolicyCompleted();
}

protected abstract boolean isPolicyCompleted();
Expand All @@ -200,7 +220,9 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
current,
new Path(metadata.getPath()), conf.originals()
);
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Expand All @@ -213,8 +235,7 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
}
}

private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1,
final Iterator<FileMetadata> it2) {
private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1, final Iterator<FileMetadata> it2) {
return new Iterator<FileMetadata>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ public void invalidEndDate(PolicyFsTestConfig fsConfig) {
@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void canBeInterrupted(PolicyFsTestConfig fsConfig) throws IOException {
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS),
fsConfig.getSourceTaskConfig());
fsConfig.getSourceTaskConfig())) {

for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
for (int i = 0; i < 5; i++) {
assertFalse(policy.hasEnded());
policy.execute();
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
policy.interrupt();
assertTrue(policy.hasEnded());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,22 @@ public void execPolicyAlreadyEnded(PolicyFsTestConfig fsConfig) throws IOExcepti

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException {
public void notReachableFileSystem(PolicyFsTestConfig fsConfig) throws InterruptedException, IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, "hdfs://localhost:65432/data");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_POLL_MS, "0");
originals.put(HdfsFileWatcherPolicy.HDFS_FILE_WATCHER_POLICY_RETRY_MS, "0");
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
try(Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {
int count = 0;
while (!policy.hasEnded() && count < 10) {
Thread.sleep(500);
count++;
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}
assertTrue(count < 10);
assertTrue(policy.hasEnded());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,24 +180,25 @@ public void dynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.FS_URIS, dynamic.toString());
FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals);
Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg);
fsConfig.setPolicy(policy);
assertEquals(1, fsConfig.getPolicy().getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(fsConfig.getPolicy().getURIs().get(0).endsWith(uri.toString()));
try (Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) {

assertEquals(1, policy.getURIs().size());

LocalDateTime dateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("G");
StringBuilder uri = new StringBuilder(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("yyyy");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("MM");
uri.append(dateTime.format(formatter));
uri.append("/");
formatter = DateTimeFormatter.ofPattern("W");
uri.append(dateTime.format(formatter));
assertTrue(policy.getURIs().get(0).endsWith(uri.toString()));
}
}

@ParameterizedTest
Expand All @@ -221,6 +222,53 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
});
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}

Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertTrue(it.hasNext());
String firstPath = it.next().getPath();
assertFalse(it.hasNext());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());
assertNotEquals(firstPath, it.next().getPath());
assertFalse(it.hasNext());
}
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one");
assertThrows(ConfigException.class, () -> {
new FsSourceTaskConfig(originals);
});

}

protected abstract FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -26,4 +36,43 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories) {
return new FsSourceTaskConfig(cfg);
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

try (Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig)) {

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}

Iterator<FileMetadata> it = policy.execute();

// First batch of files (1 file)
assertFalse(policy.hasEnded());
assertTrue(it.hasNext());
String firstPath = it.next().getPath();
assertFalse(it.hasNext());
assertFalse(policy.hasEnded());

// Second batch of files (1 file)
it = policy.execute();
assertTrue(it.hasNext());
assertNotEquals(firstPath, it.next().getPath());
assertFalse(it.hasNext());
assertTrue(policy.hasEnded());
}
}

}
Loading