Skip to content

Commit

Permalink
Use com.google.common.collect.Iterators.partition method instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Symbianx committed Jun 2, 2020
1 parent 4252273 commit 02dd5a4
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/source/config_options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ General config properties for this connector.
``policy.batch_size``
Number of files that should be handled at a time. Non-positive values disable batching.

* Type: long
* Type: int
* Default: ``0``
* Importance: medium

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public static ConfigDef conf() {
POLICY_REGEXP_DISPLAY
).define(
POLICY_BATCH_SIZE,
ConfigDef.Type.LONG,
0l,
ConfigDef.Type.INT,
0,
ConfigDef.Importance.MEDIUM,
POLICY_BATCH_SIZE_DOC,
POLICY_GROUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig;
import com.github.mmolimar.kafka.connect.fs.file.FileMetadata;
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
import com.github.mmolimar.kafka.connect.fs.util.BatchIterator;
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
import com.github.mmolimar.kafka.connect.fs.util.TailCall;
import com.google.common.collect.Iterators;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -37,8 +38,8 @@ abstract class AbstractPolicy implements Policy {
private final FsSourceTaskConfig conf;
private final AtomicLong executions;
private final boolean recursive;
private final long batchSize;
private Iterator<FileMetadata> previous;
private final int batchSize;
private Iterator<List<FileMetadata>> previous;
private boolean interrupted;

public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
Expand All @@ -47,7 +48,7 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException {
this.executions = new AtomicLong(0);
this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE);
this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP));
this.batchSize = conf.getLong(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.batchSize = conf.getInt(FsSourceTaskConfig.POLICY_BATCH_SIZE);
this.interrupted = false;
this.previous = Collections.emptyIterator();

Expand Down Expand Up @@ -110,9 +111,9 @@ public final Iterator<FileMetadata> execute() throws IOException {
if (hasEnded()) {
throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
}
if (batchSize > 0 && previous != null && previous.hasNext()) {
return BatchIterator.batchIterator(previous, batchSize);

if (batchSize > 0 && previous.hasNext()) {
return previous.next().iterator();
}

preCheck();
Expand All @@ -122,12 +123,14 @@ public final Iterator<FileMetadata> execute() throws IOException {
for (FileSystem fs : fileSystems) {
files = concat(files, listFiles(fs));
}
previous = files;

postCheck();

if(batchSize > 0)
return BatchIterator.batchIterator(files, batchSize);
if (batchSize > 0) {
previous = Iterators.partition(files, batchSize);
if (!previous.hasNext())
return Collections.emptyIterator();
return previous.next().iterator();
}

return files;
}
Expand Down Expand Up @@ -157,8 +160,7 @@ private TailCall<Boolean> hasNextRec() {
current = it.next();
return this::hasNextRec;
}
if (current.isFile() &
fileRegexp.matcher(current.getPath().getName()).find()) {
if (current.isFile() & fileRegexp.matcher(current.getPath().getName()).find()) {
return TailCall.done(true);
}
current = null;
Expand Down Expand Up @@ -214,13 +216,12 @@ FileMetadata toMetadata(LocatedFileStatus fileStatus) {
@Override
public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorageReader) {
FileSystem current = fileSystems.stream()
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString())).findFirst()
.orElse(null);
try {
FileReader reader = ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
current, new Path(metadata.getPath()), conf.originals());
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS), current,
new Path(metadata.getPath()), conf.originals());
Map<String, Object> partition = Collections.singletonMap("path", metadata.getPath());
Map<String, Object> offset = offsetStorageReader.offset(partition);
if (offset != null && offset.get("offset") != null) {
Expand All @@ -233,8 +234,7 @@ public FileReader offer(FileMetadata metadata, OffsetStorageReader offsetStorage
}
}

private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1,
final Iterator<FileMetadata> it2) {
private Iterator<FileMetadata> concat(final Iterator<FileMetadata> it1, final Iterator<FileMetadata> it2) {
return new Iterator<FileMetadata>() {

@Override
Expand Down

This file was deleted.

0 comments on commit 02dd5a4

Please sign in to comment.