From 45c0709b6b3bb3170938e6a2290e780b0c3eb0c2 Mon Sep 17 00:00:00 2001 From: Miguel Alexandre Date: Mon, 25 May 2020 17:56:35 +0200 Subject: [PATCH] Setup batching in the AbstractPolicy instead of a seperate Policy. --- .../kafka/connect/fs/FsSourceTaskConfig.java | 14 +++ .../connect/fs/policy/AbstractPolicy.java | 26 ++++- .../kafka/connect/fs/policy/SimplePolicy.java | 1 - .../kafka/connect/fs/util/BatchIterator.java | 29 +++++ .../connect/fs/policy/PolicyTestBase.java | 56 ++++++++++ .../fs/policy/SimpleBatchPolicyTest.java | 101 ------------------ .../connect/fs/policy/SimplePolicyTest.java | 54 ++++++++++ .../connect/fs/task/FsSourceTaskTest.java | 101 ++++++++++++++++++ 8 files changed, 278 insertions(+), 104 deletions(-) create mode 100644 src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java delete mode 100644 src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java index 58231fd..fccc0a5 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig { private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS."; private static final String POLICY_REGEXP_DISPLAY = "File filter regex"; + public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size"; + private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching."; + private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch"; + public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs."; public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class"; @@ -76,6 +80,16 @@ public static ConfigDef conf() { ++order, ConfigDef.Width.MEDIUM, POLICY_REGEXP_DISPLAY + ).define( + POLICY_BATCH_SIZE, + ConfigDef.Type.LONG, + 0l, + ConfigDef.Importance.MEDIUM, + POLICY_BATCH_SIZE_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_BATCH_SIZE_DISPLAY ).define( FILE_READER_CLASS, ConfigDef.Type.CLASS, diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java index d250e76..6ef082c 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java @@ -3,6 +3,7 @@ import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; +import com.github.mmolimar.kafka.connect.fs.util.BatchIterator; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import com.github.mmolimar.kafka.connect.fs.util.TailCall; import org.apache.hadoop.conf.Configuration; @@ -35,15 +36,20 @@ abstract class AbstractPolicy implements Policy { private final FsSourceTaskConfig conf; private final AtomicLong executions; + private final AtomicLong batchesCreated; private final boolean recursive; + private final long batchSize; + private Iterator currentIterator; private boolean interrupted; public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { this.fileSystems = new ArrayList<>(); this.conf = conf; this.executions = new AtomicLong(0); + this.batchesCreated = new AtomicLong(0); this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE); this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP)); + this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE); this.interrupted = false; Map customConfigs = customConfigs(); @@ -105,6 +111,12 @@ public final Iterator execute() throws IOException { if (hasEnded()) { throw new IllegalWorkerStateException("Policy has ended. Cannot be retried."); } + + if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) { + batchesCreated.incrementAndGet(); + return BatchIterator.chunkIterator(currentIterator, batchSize); + } + preCheck(); executions.incrementAndGet(); @@ -112,9 +124,13 @@ public final Iterator execute() throws IOException { for (FileSystem fs : fileSystems) { files = concat(files, listFiles(fs)); } - + currentIterator = files; + postCheck(); + if(batchSize > 0) + return BatchIterator.chunkIterator(files, batchSize); + return files; } @@ -173,7 +189,13 @@ public FileMetadata next() { @Override public final boolean hasEnded() { - return interrupted || isPolicyCompleted(); + if (interrupted) + return true; + + if (currentIterator == null) + return isPolicyCompleted(); + + return !currentIterator.hasNext() && isPolicyCompleted(); } protected abstract boolean isPolicyCompleted(); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicy.java index 1f27c25..bbbefb8 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicy.java @@ -6,7 +6,6 @@ import java.util.Map; public class SimplePolicy extends AbstractPolicy { - public SimplePolicy(FsSourceTaskConfig conf) throws IOException { super(conf); } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java new file mode 100644 index 0000000..bbe809d --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java @@ -0,0 +1,29 @@ +package com.github.mmolimar.kafka.connect.fs.util; + +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchIterator { + private static final Logger log = LoggerFactory.getLogger(BatchIterator.class); + + public static Iterator chunkIterator(Iterator iterator, long elementsPerBatch) { + return new Iterator() { + private long count = 0; + + @Override + public boolean hasNext() { + log.debug("Current index is {}. Batch size is {}.", count, elementsPerBatch); + return (count < elementsPerBatch) && iterator.hasNext(); + } + + @Override + public T next() { + T next = iterator.next(); + count++; + return next; + } + }; + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java index ba77775..7b944fc 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java @@ -221,6 +221,62 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { }); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig); + + fsConfig.setPolicy(policy); + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + //we wait till FS has registered the files + Thread.sleep(3000); + } + + + Iterator it = fsConfig.getPolicy().execute(); + + // First batch of files (1 file) + assertTrue(it.hasNext()); + String firstPath = it.next().getPath(); + + assertFalse(it.hasNext()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertTrue(it.hasNext()); + + assertNotEquals(firstPath, it.next().getPath()); + + assertFalse(it.hasNext()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertFalse(it.hasNext()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidBatchSize(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one"); + assertThrows(ConfigException.class, () -> { + new FsSourceTaskConfig(originals); + }); + + } + protected abstract FsSourceTaskConfig buildSourceTaskConfig(List directories); } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java deleted file mode 100644 index 1b94e8c..0000000 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.github.mmolimar.kafka.connect.fs.policy; - -import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; -import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; -import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; -import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class SimpleBatchPolicyTest extends PolicyTestBase { - - @Override - protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { - return new FsSourceTaskConfig(buildConfigMap(directories)); - } - - private Map buildConfigMap(List directories) { - return new HashMap() { - { - String[] uris = directories.stream().map(Path::toString).toArray(String[]::new); - put(FsSourceTaskConfig.FS_URIS, String.join(",", uris)); - put(FsSourceTaskConfig.TOPIC, "topic_test"); - put(FsSourceTaskConfig.POLICY_CLASS, SimpleBatchPolicy.class.getName()); - put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName()); - put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$"); - put(FsSourceTaskConfig.POLICY_PREFIX_FS + "dfs.data.dir", "test"); - put(FsSourceTaskConfig.POLICY_PREFIX_FS + "fs.default.name", "hdfs://test/"); - } - }; - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException { - Map configMap = buildConfigMap(fsConfig.getDirectories()); - configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "1"); - FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap); - - Path dir = fsConfig.getDirectories().get(0); - - fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); - // this file does not match the regexp - fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); - - Policy policy = ReflectionUtils.makePolicy( - (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), - sourceTaskConfig); - fsConfig.setPolicy(policy); - - Iterator it = fsConfig.getPolicy().execute(); - - // First batch of files (1 file) - assertFalse(fsConfig.getPolicy().hasEnded()); - assertTrue(it.hasNext()); - String firstPath = it.next().getPath(); - - assertFalse(it.hasNext()); - assertFalse(fsConfig.getPolicy().hasEnded()); - - // Second batch of files (1 file) - it = fsConfig.getPolicy().execute(); - assertTrue(it.hasNext()); - - assertNotEquals(firstPath, it.next().getPath()); - - assertFalse(it.hasNext()); - assertTrue(fsConfig.getPolicy().hasEnded()); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void invalidBatchSize(PolicyFsTestConfig fsConfig) { - Map configMap = buildConfigMap(fsConfig.getDirectories()); - configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "one"); - FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap); - assertThrows(ConfigException.class, () -> { - try { - ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), sourceTaskConfig); - } catch (Exception e) { - throw e.getCause(); - } - }); - - } -} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java index 279a775..97c2b1f 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java @@ -1,10 +1,20 @@ package com.github.mmolimar.kafka.connect.fs.policy; import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; +import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; + +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.*; +import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -26,4 +36,48 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { return new FsSourceTaskConfig(cfg); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig); + + fsConfig.setPolicy(policy); + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + //we wait till FS has registered the files + Thread.sleep(3000); + } + + + Iterator it = fsConfig.getPolicy().execute(); + + // First batch of files (1 file) + assertFalse(fsConfig.getPolicy().hasEnded()); + assertTrue(it.hasNext()); + String firstPath = it.next().getPath(); + + assertFalse(it.hasNext()); + assertFalse(fsConfig.getPolicy().hasEnded()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertTrue(it.hasNext()); + + assertNotEquals(firstPath, it.next().getPath()); + + assertFalse(it.hasNext()); + assertTrue(fsConfig.getPolicy().hasEnded()); + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java index b4b5a4e..ed08dec 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java @@ -28,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.time.Duration; import java.util.*; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -270,6 +271,106 @@ public void checkVersion(TaskFsTestConfig fsConfig) { assertFalse("unknown".equalsIgnoreCase(fsConfig.getTask().version())); } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void pollNoDataWithBatch(TaskFsTestConfig fsConfig) { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + assertEquals(0, fsConfig.getTask().poll().size()); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFilesToProcessWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + do { + List fresh = fsConfig.getTask().poll(); + + if (fresh == null) + break; + records.addAll(fresh); + + } while (true); + + assertEquals(0, records.size()); + + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + do { + List fresh = fsConfig.getTask().poll(); + + if (fresh == null) + break; + records.addAll(fresh); + + } while (true); + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void shouldNotSleepBetweenBatches(TaskFsTestConfig fsConfig) throws IOException { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLL_INTERVAL_MS, "10000"); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> { + records.addAll(fsConfig.getTask().poll()); + records.addAll(fsConfig.getTask().poll()); + }); + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + protected void checkRecords(List records) { records.forEach(record -> { assertEquals("topic_test", record.topic());