Skip to content

Commit

Permalink
Setup batching in the AbstractPolicy instead of a seperate Policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
Symbianx committed May 25, 2020
1 parent 04be206 commit dec8c77
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig {
private static final String POLICY_REGEXP_DOC = "Regular expression to filter files from the FS.";
private static final String POLICY_REGEXP_DISPLAY = "File filter regex";

public static final String POLICY_BATCH_SIZE = POLICY_PREFIX + "batch_size";
private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching.";
private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch";

public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs.";

public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class";
Expand Down Expand Up @@ -76,6 +80,16 @@ public static ConfigDef conf() {
++order,
ConfigDef.Width.MEDIUM,
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.LONG,
0l,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
++order,
ConfigDef.Width.MEDIUM,
POLICY_BATCH_SIZE_DISPLAY
).define(
FILE_READER_CLASS,
ConfigDef.Type.CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.BatchIterator;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -35,15 +36,20 @@ abstract class AbstractPolicy implements Policy {

private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final AtomicLong batchesCreated;
private final boolean recursive;
private final long batchSize;
private Iterator<FileMetadata> currentIterator;
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.fileSystems = new ArrayList<>();
this.conf = conf;
this.executions = new AtomicLong(0);
this.batchesCreated = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;

Map<String, Object> customConfigs = customConfigs();
Expand Down Expand Up @@ -105,16 +111,26 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}

if (batchSize > 0 && currentIterator != null && currentIterator.hasNext()) {
batchesCreated.incrementAndGet();
return BatchIterator.chunkIterator(currentIterator, batchSize);
}

preCheck();

executions.incrementAndGet();
Iterator<FileMetadata> files = Collections.emptyIterator();
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}

currentIterator = files;

postCheck();

if(batchSize > 0)
return BatchIterator.chunkIterator(files, batchSize);

return files;
}

Expand Down Expand Up @@ -173,7 +189,13 @@ public FileMetadata next() {

@Override
public final boolean hasEnded() {
return interrupted || isPolicyCompleted();
if (interrupted)
return true;

if (currentIterator == null)
return isPolicyCompleted();

return !currentIterator.hasNext() && isPolicyCompleted();
}

protected abstract boolean isPolicyCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.Map;

public class SimplePolicy extends AbstractPolicy {

public SimplePolicy(FsSourceTaskConfig conf) throws IOException {
super(conf);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.github.mmolimar.kafka.connect.fs.util;

import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchIterator {
private static final Logger log = LoggerFactory.getLogger(BatchIterator.class);

public static <T> Iterator<T> chunkIterator(Iterator<T> iterator, long elementsPerBatch) {
return new Iterator<T>() {
private long count = 0;

@Override
public boolean hasNext() {
log.debug("Current index is {}. Batch size is {}.", count, elementsPerBatch);
return (count < elementsPerBatch) && iterator.hasNext();
}

@Override
public T next() {
T next = iterator.next();
count++;
return next;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,62 @@ public void invalidDynamicURIs(PolicyFsTestConfig fsConfig) throws IOException {
});
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig);

fsConfig.setPolicy(policy);

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}


Iterator<FileMetadata> it = fsConfig.getPolicy().execute();

// First batch of files (1 file)
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertFalse(it.hasNext());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "one");
assertThrows(ConfigException.class, () -> {
new FsSourceTaskConfig(originals);
});

}

protected abstract FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories);

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.github.mmolimar.kafka.connect.fs.policy;

import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -26,4 +36,48 @@ protected FsSourceTaskConfig buildSourceTaskConfig(List<Path> directories) {
return new FsSourceTaskConfig(cfg);
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void execPolicyEndsAfterBatching(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException {
Map<String, String> originals = fsConfig.getSourceTaskConfig().originalsStrings();
originals.put(FsSourceTaskConfig.POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(originals);

Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig);

fsConfig.setPolicy(policy);

FileSystem fs = fsConfig.getFs();
for (Path dir : fsConfig.getDirectories()) {
fs.createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
fs.createNewFile(new Path(dir, System.nanoTime() + ".invalid"));

//we wait till FS has registered the files
Thread.sleep(3000);
}


Iterator<FileMetadata> it = fsConfig.getPolicy().execute();

// First batch of files (1 file)
assertFalse(fsConfig.getPolicy().hasEnded());
assertTrue(it.hasNext());
String firstPath = it.next().getPath();

assertFalse(it.hasNext());
assertFalse(fsConfig.getPolicy().hasEnded());

// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());
assertTrue(fsConfig.getPolicy().hasEnded());
}

}
Loading

0 comments on commit dec8c77

Please sign in to comment.