diff --git a/docs/source/config_options.rst b/docs/source/config_options.rst index 17b19e0..eee5812 100644 --- a/docs/source/config_options.rst +++ b/docs/source/config_options.rst @@ -91,6 +91,13 @@ General config properties for this connector. * Default: ``false`` * Importance: medium +``policy.batch_size`` + Number of files that should be handled at a time. Non-positive values disable batching. + + * Type: long + * Default: ``0`` + * Importance: medium + ``policy..`` This represents custom properties you can include based on the policy class specified. @@ -202,20 +209,6 @@ In order to configure custom properties for this policy, the name you must use i * Default: ``20000`` * Importance: medium -.. _config_options-policies-simple_batch: - -Simple Batch --------------------------------------------- - -In order to configure custom properties for this policy, the name you must use is ``simple_batch``. - -``policy.simple_batch.batch_size`` - Number of files to process per execution. - - * Type: int - * Default: ``200`` - * Importance: High - .. _config_options-filereaders: File readers diff --git a/docs/source/policies.rst b/docs/source/policies.rst index 4d7d88a..1a5f654 100644 --- a/docs/source/policies.rst +++ b/docs/source/policies.rst @@ -36,11 +36,3 @@ You can learn more about the properties of this policy :ref:`here`. diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java index 58231fd..fccc0a5 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig { private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS."; private static final String POLICY_REGEXP_DISPLAY = "File filter regex"; + public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size"; + private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching."; + private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch"; + public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs."; public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class"; @@ -76,6 +80,16 @@ public static ConfigDef conf() { ++order, ConfigDef.Width.MEDIUM, POLICY_REGEXP_DISPLAY + ).define( + POLICY_BATCH_SIZE, + ConfigDef.Type.LONG, + 0l, + ConfigDef.Importance.MEDIUM, + POLICY_BATCH_SIZE_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_BATCH_SIZE_DISPLAY ).define( FILE_READER_CLASS, ConfigDef.Type.CLASS, diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java index d250e76..6e18d61 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java @@ -3,6 +3,7 @@ import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader; +import com.github.mmolimar.kafka.connect.fs.util.BatchIterator; import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import com.github.mmolimar.kafka.connect.fs.util.TailCall; import org.apache.hadoop.conf.Configuration; @@ -36,6 +37,8 @@ abstract class AbstractPolicy implements Policy { private final FsSourceTaskConfig conf; private final AtomicLong executions; private final boolean recursive; + private final long batchSize; + private Iterator currentIterator; private boolean interrupted; public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { @@ -44,6 +47,7 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { this.executions = new AtomicLong(0); this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE); this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP)); + this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE); this.interrupted = false; Map customConfigs = customConfigs(); @@ -105,6 +109,11 @@ public final Iterator execute() throws IOException { if (hasEnded()) { throw new IllegalWorkerStateException("Policy has ended. Cannot be retried."); } + + if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) { + return BatchIterator.batchIterator(currentIterator, batchSize); + } + preCheck(); executions.incrementAndGet(); @@ -112,9 +121,13 @@ public final Iterator execute() throws IOException { for (FileSystem fs : fileSystems) { files = concat(files, listFiles(fs)); } - + currentIterator = files; + postCheck(); + if(batchSize > 0) + return BatchIterator.batchIterator(files, batchSize); + return files; } @@ -173,7 +186,13 @@ public FileMetadata next() { @Override public final boolean hasEnded() { - return interrupted || isPolicyCompleted(); + if (interrupted) + return true; + + if (currentIterator == null) + return isPolicyCompleted(); + + return !currentIterator.hasNext() && isPolicyCompleted(); } protected abstract boolean isPolicyCompleted(); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicy.java deleted file mode 100644 index fe53521..0000000 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicy.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.github.mmolimar.kafka.connect.fs.policy; - -import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; -import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.kafka.common.config.ConfigException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -public class SimpleBatchPolicy extends AbstractPolicy { - - private static final Logger log = LoggerFactory.getLogger(SimpleBatchPolicy.class); - - private static final int DEFAULT_BATCH_SIZE = 200; - - private static final String BATCH_POLICY_PREFIX = FsSourceTaskConfig.POLICY_PREFIX + "simple_batch."; - public static final String BATCH_POLICY_BATCH_SIZE = BATCH_POLICY_PREFIX + "batch_size"; - - private int batchSize; - private Map> innerIterators = new HashMap<>(); - - public SimpleBatchPolicy(FsSourceTaskConfig conf) throws IOException { - super(conf); - } - - @Override - protected void configPolicy(Map customConfigs) { - try { - this.batchSize = Integer.parseInt( - (String) customConfigs.getOrDefault(BATCH_POLICY_BATCH_SIZE, String.valueOf(DEFAULT_BATCH_SIZE))); - } catch (NumberFormatException nfe) { - throw new ConfigException(BATCH_POLICY_BATCH_SIZE + " property is required and must be a " - + "number (int). Got: " + customConfigs.get(BATCH_POLICY_BATCH_SIZE)); - } - } - - @Override - public Iterator listFiles(final FileSystem fs) throws IOException { - if (!innerIterators.containsKey(fs)) { - innerIterators.put(fs, super.listFiles(fs)); - } - - return new Iterator() { - private int currentFileIndex = 0; - private Iterator iterator = innerIterators.get(fs); - - @Override - public boolean hasNext() { - log.debug("Current file index is {}. Batch size is {}.", currentFileIndex, batchSize); - return (currentFileIndex < batchSize) && iterator.hasNext(); - } - - @Override - public FileMetadata next() { - FileMetadata metadata = iterator.next(); - currentFileIndex++; - return metadata; - } - }; - } - - @Override - protected boolean isPolicyCompleted() { - if (innerIterators.size() == 0) - return false; - - for (Iterator iterator : innerIterators.values()) { - if(iterator.hasNext()) - return false; - } - - return true; - } -} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java new file mode 100644 index 0000000..499f985 --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/util/BatchIterator.java @@ -0,0 +1,29 @@ +package com.github.mmolimar.kafka.connect.fs.util; + +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchIterator { + private static final Logger log = LoggerFactory.getLogger(BatchIterator.class); + + public static Iterator batchIterator(Iterator iterator, long elementsPerBatch) { + return new Iterator() { + private long count = 0; + + @Override + public boolean hasNext() { + log.debug("Current index is {}. Batch size is {}.", count, elementsPerBatch); + return (count < elementsPerBatch) && iterator.hasNext(); + } + + @Override + public T next() { + T next = iterator.next(); + count++; + return next; + } + }; + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java index ba77775..7b944fc 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java @@ -221,6 +221,62 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException { }); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig); + + fsConfig.setPolicy(policy); + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + //we wait till FS has registered the files + Thread.sleep(3000); + } + + + Iterator it = fsConfig.getPolicy().execute(); + + // First batch of files (1 file) + assertTrue(it.hasNext()); + String firstPath = it.next().getPath(); + + assertFalse(it.hasNext()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertTrue(it.hasNext()); + + assertNotEquals(firstPath, it.next().getPath()); + + assertFalse(it.hasNext()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertFalse(it.hasNext()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidBatchSize(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one"); + assertThrows(ConfigException.class, () -> { + new FsSourceTaskConfig(originals); + }); + + } + protected abstract FsSourceTaskConfig buildSourceTaskConfig(List directories); } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java deleted file mode 100644 index 1b94e8c..0000000 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimpleBatchPolicyTest.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.github.mmolimar.kafka.connect.fs.policy; - -import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; -import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; -import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; -import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kafka.common.config.ConfigException; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class SimpleBatchPolicyTest extends PolicyTestBase { - - @Override - protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { - return new FsSourceTaskConfig(buildConfigMap(directories)); - } - - private Map buildConfigMap(List directories) { - return new HashMap() { - { - String[] uris = directories.stream().map(Path::toString).toArray(String[]::new); - put(FsSourceTaskConfig.FS_URIS, String.join(",", uris)); - put(FsSourceTaskConfig.TOPIC, "topic_test"); - put(FsSourceTaskConfig.POLICY_CLASS, SimpleBatchPolicy.class.getName()); - put(FsSourceTaskConfig.FILE_READER_CLASS, TextFileReader.class.getName()); - put(FsSourceTaskConfig.POLICY_REGEXP, "^[0-9]*\\.txt$"); - put(FsSourceTaskConfig.POLICY_PREFIX_FS + "dfs.data.dir", "test"); - put(FsSourceTaskConfig.POLICY_PREFIX_FS + "fs.default.name", "hdfs://test/"); - } - }; - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException { - Map configMap = buildConfigMap(fsConfig.getDirectories()); - configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "1"); - FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap); - - Path dir = fsConfig.getDirectories().get(0); - - fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); - // this file does not match the regexp - fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); - - Policy policy = ReflectionUtils.makePolicy( - (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), - sourceTaskConfig); - fsConfig.setPolicy(policy); - - Iterator it = fsConfig.getPolicy().execute(); - - // First batch of files (1 file) - assertFalse(fsConfig.getPolicy().hasEnded()); - assertTrue(it.hasNext()); - String firstPath = it.next().getPath(); - - assertFalse(it.hasNext()); - assertFalse(fsConfig.getPolicy().hasEnded()); - - // Second batch of files (1 file) - it = fsConfig.getPolicy().execute(); - assertTrue(it.hasNext()); - - assertNotEquals(firstPath, it.next().getPath()); - - assertFalse(it.hasNext()); - assertTrue(fsConfig.getPolicy().hasEnded()); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void invalidBatchSize(PolicyFsTestConfig fsConfig) { - Map configMap = buildConfigMap(fsConfig.getDirectories()); - configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "one"); - FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap); - assertThrows(ConfigException.class, () -> { - try { - ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() - .getClass(FsSourceTaskConfig.POLICY_CLASS), sourceTaskConfig); - } catch (Exception e) { - throw e.getCause(); - } - }); - - } -} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java index 279a775..97c2b1f 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/SimplePolicyTest.java @@ -1,10 +1,20 @@ package com.github.mmolimar.kafka.connect.fs.policy; import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig; +import com.github.mmolimar.kafka.connect.fs.file.FileMetadata; import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader; +import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; + +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.*; +import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -26,4 +36,48 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List directories) { return new FsSourceTaskConfig(cfg); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals); + + Policy policy = ReflectionUtils.makePolicy( + (Class) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS), + sourceTaskConfig); + + fsConfig.setPolicy(policy); + + FileSystem fs = fsConfig.getFs(); + for (Path dir : fsConfig.getDirectories()) { + fs.createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid")); + + //we wait till FS has registered the files + Thread.sleep(3000); + } + + + Iterator it = fsConfig.getPolicy().execute(); + + // First batch of files (1 file) + assertFalse(fsConfig.getPolicy().hasEnded()); + assertTrue(it.hasNext()); + String firstPath = it.next().getPath(); + + assertFalse(it.hasNext()); + assertFalse(fsConfig.getPolicy().hasEnded()); + + // Second batch of files (1 file) + it = fsConfig.getPolicy().execute(); + assertTrue(it.hasNext()); + + assertNotEquals(firstPath, it.next().getPath()); + + assertFalse(it.hasNext()); + assertTrue(fsConfig.getPolicy().hasEnded()); + } + } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java index b4b5a4e..ed08dec 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/task/FsSourceTaskTest.java @@ -28,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.time.Duration; import java.util.*; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -270,6 +271,106 @@ public void checkVersion(TaskFsTestConfig fsConfig) { assertFalse("unknown".equalsIgnoreCase(fsConfig.getTask().version())); } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void pollNoDataWithBatch(TaskFsTestConfig fsConfig) { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + assertEquals(0, fsConfig.getTask().poll().size()); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFilesToProcessWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt")); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + do { + List fresh = fsConfig.getTask().poll(); + + if (fresh == null) + break; + records.addAll(fresh); + + } while (true); + + assertEquals(0, records.size()); + + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithBatch(TaskFsTestConfig fsConfig) throws IOException { + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + do { + List fresh = fsConfig.getTask().poll(); + + if (fresh == null) + break; + records.addAll(fresh); + + } while (true); + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void shouldNotSleepBetweenBatches(TaskFsTestConfig fsConfig) throws IOException { + Map props = new HashMap<>(fsConfig.getTaskConfig()); + props.put(FsSourceTaskConfig.POLL_INTERVAL_MS, "10000"); + props.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1"); + + for (Path dir : fsConfig.getDirectories()) { + Path dataFile = new Path(dir, System.nanoTime() + ".txt"); + createDataFile(fsConfig.getFs(), dataFile); + //this file does not match the regexp + fsConfig.getFs().createNewFile(new Path(dir, String.valueOf(System.nanoTime()))); + } + + fsConfig.getTask().start(props); + + List records = new ArrayList<>(); + assertTimeoutPreemptively(Duration.ofSeconds(2), () -> { + records.addAll(fsConfig.getTask().poll()); + records.addAll(fsConfig.getTask().poll()); + }); + + assertEquals((NUM_RECORDS * fsConfig.getDirectories().size()) / 2, records.size()); + checkRecords(records); + //policy has ended + assertNull(fsConfig.getTask().poll()); + } + + protected void checkRecords(List records) { records.forEach(record -> { assertEquals("topic_test", record.topic());