Skip to content

Commit

Permalink
Merge 94b4fbf into 588d310
Browse files Browse the repository at this point in the history
  • Loading branch information
Symbianx committed May 20, 2020
2 parents 588d310 + 94b4fbf commit e655f4e
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
14 changes: 14 additions & 0 deletions docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,20 @@ In order to configure custom properties for this policy, the name you must use i
* Default: ``20000``
* Importance: medium

.. _config_options-policies-simple_batch:

Simple Batch
--------------------------------------------

In order to configure custom properties for this policy, the name you must use is ``simple_batch``.

``policy.simple_batch.batch_size``
Number of files to process per execution.

* Type: int
* Default: ``200``
* Importance: High

.. _config_options-filereaders:

File readers
Expand Down
8 changes: 8 additions & 0 deletions docs/source/policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ You can learn more about the properties of this policy :ref:`here<config_options
.. attention:: The URIs included in the general property ``fs.uris`` will be filtered and only those
ones which start with the prefix ``hdfs://`` will be watched. Also, this policy
will only work for Hadoop versions 2.6.0 or higher.

Simple Batch
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This policy's behaviour is similar to the Simple policy but it will process files in batches instead
of processing them all at once.

You can learn more about the properties of this policy :ref:`here<config_options-policies-simple_batch>`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;

import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class SimpleBatchPolicy extends AbstractPolicy {

private static final Logger log = LoggerFactory.getLogger(SimpleBatchPolicy.class);

private static final int DEFAULT_BATCH_SIZE = 200;

private static final String BATCH_POLICY_PREFIX = FsSourceTaskConfig.POLICY_PREFIX + "simple_batch.";
public static final String BATCH_POLICY_BATCH_SIZE = BATCH_POLICY_PREFIX + "batch_size";

private int batchSize;
private Map<FileSystem, Iterator<FileMetadata>> innerIterators = new HashMap<>();

public SimpleBatchPolicy(FsSourceTaskConfig conf) throws IOException {
super(conf);
}

@Override
protected void configPolicy(Map<String, Object> customConfigs) {
try {
this.batchSize = Integer.parseInt(
(String) customConfigs.getOrDefault(BATCH_POLICY_BATCH_SIZE, String.valueOf(DEFAULT_BATCH_SIZE)));
} catch (NumberFormatException nfe) {
throw new ConfigException(BATCH_POLICY_BATCH_SIZE + " property is required and must be a "
+ "number (int). Got: " + customConfigs.get(BATCH_POLICY_BATCH_SIZE));
}
}

@Override
public Iterator<FileMetadata> listFiles(final FileSystem fs) throws IOException {
if (!innerIterators.containsKey(fs)) {
innerIterators.put(fs, super.listFiles(fs));
}

return new Iterator<FileMetadata>() {
private int currentFileIndex = 0;
private Iterator<FileMetadata> iterator = innerIterators.get(fs);

@Override
public boolean hasNext() {
log.debug("Current file index is {}. Batch size is {}.", currentFileIndex, batchSize);
return (currentFileIndex < batchSize) && iterator.hasNext();
}

@Override
public FileMetadata next() {
FileMetadata metadata = iterator.next();
currentFileIndex++;
return metadata;
}
};
}

@Override
protected boolean isPolicyCompleted() {
if (innerIterators.size() == 0)
return false;

for (Iterator<FileMetadata> iterator : innerIterators.values()) {
if(iterator.hasNext())
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class SimpleBatchPolicyTest extends PolicyTestBase {

@Override
protected FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories) {
return new FsSourceTaskConfig(buildConfigMap(directories));
}

private Map<String, String> buildConfigMap(List<Path> directories) {
return new HashMap<String, String>() {
{
String[] uris = directories.stream().map(Path::toString).toArray(String[]::new);
put(FsSourceTaskConfig.FS_URIS, String.join(",", uris));
put(FsSourceTaskConfig.TOPIC, "topic_test");
put(FsSourceTaskConfig.POLICY_CLASS, SimpleBatchPolicy.class.getName());
put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName());
put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$");
put(FsSourceTaskConfig.POLICY_PREFIX_FS + "dfs.data.dir", "test");
put(FsSourceTaskConfig.POLICY_PREFIX_FS + "fs.default.name", "hdfs://test/");
}
};
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException {
Map<String, String> configMap = buildConfigMap(fsConfig.getDirectories());
configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap);

Path dir = fsConfig.getDirectories().get(0);

fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt"));

Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), sourceTaskConfig);
fsConfig.setPolicy(policy);

Iterator<FileMetadata> it = fsConfig.getPolicy().execute();

// First batch of files (1 file)
assertFalse(fsConfig.getPolicy().hasEnded());
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());
assertFalse(fsConfig.getPolicy().hasEnded());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());
assertTrue(fsConfig.getPolicy().hasEnded());
}
}

0 comments on commit e655f4e

Please sign in to comment.