Skip to content

Commit

Permalink
Setup batching in the AbstractPolicy instead of a seperate Policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
Symbianx committed May 25, 2020
1 parent f1620ce commit 7d1e4e9
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 205 deletions.
21 changes: 7 additions & 14 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ General config properties for this connector.
* Default: ``false``
* Importance: medium

``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: long
* Default: ``0``
* Importance: medium

``policy.<policy_name>.<policy_property>``
This represents custom properties you can include based on the policy class specified.

Expand Down Expand Up @@ -202,20 +209,6 @@ In order to configure custom properties for this policy, the name you must use i
* Default: ``20000``
* Importance: medium

.. _config_options-policies-simple_batch:

Simple Batch
--------------------------------------------

In order to configure custom properties for this policy, the name you must use is ``simple_batch``.

``policy.simple_batch.batch_size``
Number of files to process per execution.

* Type: int
* Default: ``200``
* Importance: High

.. _config_options-filereaders:

File readers
Expand Down
8 changes: 0 additions & 8 deletions docs/source/policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,3 @@ You can learn more about the properties of this policy :ref:`here<config_options
.. attention:: The URIs included in the general property ``fs.uris`` will be filtered and only those
ones which start with the prefix ``hdfs://`` will be watched. Also, this policy
will only work for Hadoop versions 2.6.0 or higher.

Simple Batch
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This policy's behaviour is similar to the Simple policy but it will process files in batches instead
of processing all the files at once.

You can learn more about the properties of this policy :ref:`here<config_options-policies-simple_batch>`.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS.";
private static final String POLICY_REGEXP_DISPLAY = "File filter regex";

public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size";
private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching.";
private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch";

public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs.";

public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class";
Expand Down Expand Up @@ -76,6 +80,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.MEDIUM,
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.LONG,
0l,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
++order,
ConfigDef.Width.MEDIUM,
POLICY_BATCH_SIZE_DISPLAY
).define(
FILE_READER_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.BatchIterator;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -36,6 +37,8 @@ abstract class AbstractPolicy implements Policy {
private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final boolean recursive;
private final long batchSize;
private Iterator<FileMetadata> currentIterator;
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
Expand All @@ -44,6 +47,7 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.executions = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;

Map<String, Object> customConfigs = customConfigs();
Expand Down Expand Up @@ -105,16 +109,25 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}

if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) {
return BatchIterator.batchIterator(currentIterator, batchSize);
}

preCheck();

executions.incrementAndGet();
Iterator<FileMetadata> files = Collections.emptyIterator();
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}

currentIterator = files;

postCheck();

if(batchSize > 0)
return BatchIterator.batchIterator(files, batchSize);

return files;
}

Expand Down Expand Up @@ -173,7 +186,13 @@ public FileMetadata next() {

@Override
public final boolean hasEnded() {
return interrupted || isPolicyCompleted();
if (interrupted)
return true;

if (currentIterator == null)
return isPolicyCompleted();

return !currentIterator.hasNext() && isPolicyCompleted();
}

protected abstract boolean isPolicyCompleted();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.mmolimar.kafka.connect.fs.util;

import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchIterator {
private static final Logger log = LoggerFactory.getLogger(BatchIterator.class);

public static <T> Iterator<T> batchIterator(Iterator<T> iterator, long elementsPerBatch) {
return new Iterator<T>() {
private long count = 0;

@Override
public boolean hasNext() {
log.debug("Current index is {}. Batch size is {}.", count, elementsPerBatch);
return (count < elementsPerBatch) && iterator.hasNext();
}

@Override
public T next() {
T next = iterator.next();
count++;
return next;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,62 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
});
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig);

fsConfig.setPolicy(policy);

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}


Iterator<FileMetadata> it = fsConfig.getPolicy().execute();

// First batch of files (1 file)
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertFalse(it.hasNext());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one");
assertThrows(ConfigException.class, () -> {
new FsSourceTaskConfig(originals);
});

}

protected abstract FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories);

}
Loading

0 comments on commit 7d1e4e9

Please sign in to comment.