Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .gitignore

This file was deleted.

36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# java-utils [![Build Status](https://snap-ci.com/rovats/java-utils/branch/master/build_image)](https://snap-ci.com/rovats/java-utils/branch/master)
A collection of useful Java utilities

### Batch Collector
A stream collector that allows batch processing of elements with a given `Consumer` (batch processor).

Use the supplied utility class to get new instances:

```java
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
```
# java-utils [![Build Status](https://snap-ci.com/rovats/java-utils/branch/master/build_image)](https://snap-ci.com/rovats/java-utils/branch/master)
A collection of useful Java utilities
### Batch Collector
A stream collector that allows batch processing of elements with a given `Consumer` (batch processor).
Use the supplied utility class to get new instances:
```java
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();
int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);
input.stream()
.collect(StreamUtils.batchCollector(batchSize, batchProcessor));
```
70 changes: 35 additions & 35 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.rovats.util</groupId>
<artifactId>com.github.rovats.util</artifactId>
<version>1.0-SNAPSHOT</version>


<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
</configuration>
</plugin>
</plugins>
</build>

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.rovats.util</groupId>
<artifactId>com.github.rovats.util</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
</configuration>
</plugin>
</plugins>
</build>
</project>
200 changes: 120 additions & 80 deletions src/main/java/com/github/rovats/utils/stream/BatchCollector.java
Original file line number Diff line number Diff line change
@@ -1,80 +1,120 @@
package com.github.rovats.utils.stream;


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

private final int batchSize;
private final Consumer<List<T>> batchProcessor;


/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
batchProcessor = requireNonNull(batchProcessor);

this.batchSize = batchSize;
this.batchProcessor = batchProcessor;
}

public Supplier<List<T>> supplier() {
return ArrayList::new;
}

public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}

public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
// can be modified if a strict batching mode is required
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
};
}

public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}

public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
package com.github.rovats.utils.stream;


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
* Collects elements in the stream and calls the supplied batch processor
* after the configured batch size is reached.
*
* In case of a parallel stream, the batch processor may be called with
* elements less than the batch size.
*
* The elements are not kept in memory, and the final result will be an
* empty list.
*
* @param <T> Type of the elements being collected
*/
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

private final int batchSize;
private final boolean enforceStrictBatching;
private final Consumer<List<T>> batchProcessor;

/**
* Constructs the batch collector
*
* @param batchSize
* the batch size after which the batchProcessor should be called
* @param enforceStrictBatching
* If true, the collector enforces processing in strict batches of
* the specified size
* @param batchProcessor
* the batch processor which accepts batches of records to
* process
*/
BatchCollector(int batchSize, boolean enforceStrictBatching, Consumer<List<T>> batchProcessor) {
this.batchSize = batchSize;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to have lost the non-null check:
batchProcessor = requireNonNull(batchProcessor);

this.enforceStrictBatching = enforceStrictBatching;
this.batchProcessor = requireNonNull(batchProcessor);
}

/**
* Constructs the batch collector
*
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
*/
BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
this(batchSize, false, batchProcessor);
}

public Supplier<List<T>> supplier() {
return ArrayList::new;
}

public BiConsumer<List<T>, T> accumulator() {
return (ts, t) -> {
ts.add(t);
if (ts.size() >= batchSize) {
batchProcessor.accept(ts);
ts.clear();
}
};
}

public BinaryOperator<List<T>> combiner() {
return (ts, ots) -> {
if (enforceStrictBatching) {
// implement strict batching mode is required
List<T> combinedTarget = ts;
int numOther = ots.size();
for (int iIdx = 0; iIdx < numOther; iIdx++) {
combinedTarget = processCompletedBatch(combinedTarget);
combinedTarget.add(ots.get(iIdx));
}
return processCompletedBatch(combinedTarget);
}
else {
// process each parallel list without checking for batch size
// avoids adding all elements of one to another
batchProcessor.accept(ts);
batchProcessor.accept(ots);
return Collections.emptyList();
}
};
}

/**
* if the list has reached the 'batchSize', invoke the batchProcessor
*
* @param combinedTarget
* @return
*/
protected List<T> processCompletedBatch(List<T> combinedTarget) {
if (combinedTarget.size() == batchSize) {
batchProcessor.accept(combinedTarget);
return new ArrayList<>();
}
return combinedTarget;
}

public Function<List<T>, List<T>> finisher() {
return ts -> {
batchProcessor.accept(ts);
return Collections.emptyList();
};
}

public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
44 changes: 24 additions & 20 deletions src/main/java/com/github/rovats/utils/stream/StreamUtils.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package com.github.rovats.utils.stream;


import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return new BatchCollector<T>(batchSize, batchProcessor);
}
}
package com.github.rovats.utils.stream;


import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

/**
* Creates a new batch collector
* @param batchSize the batch size after which the batchProcessor should be called
* @param batchProcessor the batch processor which accepts batches of records to process
* @param <T> the type of elements being processed
* @return a batch collector instance
*/
public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, boolean enforceStrictBatching, Consumer<List<T>> batchProcessor) {
return new BatchCollector<>(batchSize, enforceStrictBatching, batchProcessor);
}

public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
return batchCollector(batchSize, false, batchProcessor);
}
}
Loading