diff --git a/.gitignore b/.gitignore deleted file mode 100644 index f279b44..0000000 --- a/.gitignore +++ /dev/null @@ -1,7 +0,0 @@ -.idea -target -dist -*.iml -*.ipr -*.iws -*.ids \ No newline at end of file diff --git a/README.md b/README.md index dde00ba..3918cc5 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,18 @@ -# java-utils [![Build Status](https://snap-ci.com/rovats/java-utils/branch/master/build_image)](https://snap-ci.com/rovats/java-utils/branch/master) -A collection of useful Java utilities - -### Batch Collector -A stream collector that allows batch processing of elements with a given `Consumer` (batch processor). - -Use the supplied utility class to get new instances: - -```java -List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); -List output = new ArrayList<>(); - -int batchSize = 3; -Consumer> batchProcessor = xs -> output.addAll(xs); - -input.stream() - .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); -``` +# java-utils [![Build Status](https://snap-ci.com/rovats/java-utils/branch/master/build_image)](https://snap-ci.com/rovats/java-utils/branch/master) +A collection of useful Java utilities + +### Batch Collector +A stream collector that allows batch processing of elements with a given `Consumer` (batch processor). + +Use the supplied utility class to get new instances: + +```java +List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); +List output = new ArrayList<>(); + +int batchSize = 3; +Consumer> batchProcessor = xs -> output.addAll(xs); + +input.stream() + .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); +``` diff --git a/pom.xml b/pom.xml index 00f272c..ec1a47d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,36 +1,36 @@ - - - 4.0.0 - - com.github.rovats.util - com.github.rovats.util - 1.0-SNAPSHOT - - - - - junit - junit - RELEASE - test - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.3 - - 1.8 - 1.8 - false - - - - - + + + 4.0.0 + + com.github.rovats.util + com.github.rovats.util + 1.0-SNAPSHOT + + + + + junit + junit + RELEASE + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + false + + + + + \ No newline at end of file diff --git a/src/main/java/com/github/rovats/utils/stream/BatchCollector.java b/src/main/java/com/github/rovats/utils/stream/BatchCollector.java index 62a54c6..0da46ac 100644 --- a/src/main/java/com/github/rovats/utils/stream/BatchCollector.java +++ b/src/main/java/com/github/rovats/utils/stream/BatchCollector.java @@ -1,80 +1,120 @@ -package com.github.rovats.utils.stream; - - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.function.*; -import java.util.stream.Collector; - -import static java.util.Objects.requireNonNull; - - -/** - * Collects elements in the stream and calls the supplied batch processor - * after the configured batch size is reached. - * - * In case of a parallel stream, the batch processor may be called with - * elements less than the batch size. - * - * The elements are not kept in memory, and the final result will be an - * empty list. - * - * @param Type of the elements being collected - */ -class BatchCollector implements Collector, List> { - - private final int batchSize; - private final Consumer> batchProcessor; - - - /** - * Constructs the batch collector - * - * @param batchSize the batch size after which the batchProcessor should be called - * @param batchProcessor the batch processor which accepts batches of records to process - */ - BatchCollector(int batchSize, Consumer> batchProcessor) { - batchProcessor = requireNonNull(batchProcessor); - - this.batchSize = batchSize; - this.batchProcessor = batchProcessor; - } - - public Supplier> supplier() { - return ArrayList::new; - } - - public BiConsumer, T> accumulator() { - return (ts, t) -> { - ts.add(t); - if (ts.size() >= batchSize) { - batchProcessor.accept(ts); - ts.clear(); - } - }; - } - - public BinaryOperator> combiner() { - return (ts, ots) -> { - // process each parallel list without checking for batch size - // avoids adding all elements of one to another - // can be modified if a strict batching mode is required - batchProcessor.accept(ts); - batchProcessor.accept(ots); - return Collections.emptyList(); - }; - } - - public Function, List> finisher() { - return ts -> { - batchProcessor.accept(ts); - return Collections.emptyList(); - }; - } - - public Set characteristics() { - return Collections.emptySet(); - } -} +package com.github.rovats.utils.stream; + + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.*; +import java.util.stream.Collector; + +import static java.util.Objects.requireNonNull; + + +/** + * Collects elements in the stream and calls the supplied batch processor + * after the configured batch size is reached. + * + * In case of a parallel stream, the batch processor may be called with + * elements less than the batch size. + * + * The elements are not kept in memory, and the final result will be an + * empty list. + * + * @param Type of the elements being collected + */ +class BatchCollector implements Collector, List> { + + private final int batchSize; + private final boolean enforceStrictBatching; + private final Consumer> batchProcessor; + + /** + * Constructs the batch collector + * + * @param batchSize + * the batch size after which the batchProcessor should be called + * @param enforceStrictBatching + * If true, the collector enforces processing in strict batches of + * the specified size + * @param batchProcessor + * the batch processor which accepts batches of records to + * process + */ + BatchCollector(int batchSize, boolean enforceStrictBatching, Consumer> batchProcessor) { + this.batchSize = batchSize; + this.enforceStrictBatching = enforceStrictBatching; + this.batchProcessor = requireNonNull(batchProcessor); + } + + /** + * Constructs the batch collector + * + * @param batchSize the batch size after which the batchProcessor should be called + * @param batchProcessor the batch processor which accepts batches of records to process + */ + BatchCollector(int batchSize, Consumer> batchProcessor) { + this(batchSize, false, batchProcessor); + } + + public Supplier> supplier() { + return ArrayList::new; + } + + public BiConsumer, T> accumulator() { + return (ts, t) -> { + ts.add(t); + if (ts.size() >= batchSize) { + batchProcessor.accept(ts); + ts.clear(); + } + }; + } + + public BinaryOperator> combiner() { + return (ts, ots) -> { + if (enforceStrictBatching) { + // implement strict batching mode is required + List combinedTarget = ts; + int numOther = ots.size(); + for (int iIdx = 0; iIdx < numOther; iIdx++) { + combinedTarget = processCompletedBatch(combinedTarget); + combinedTarget.add(ots.get(iIdx)); + } + return processCompletedBatch(combinedTarget); + } + else { + // process each parallel list without checking for batch size + // avoids adding all elements of one to another + batchProcessor.accept(ts); + batchProcessor.accept(ots); + return Collections.emptyList(); + } + }; + } + + /** + * if the list has reached the 'batchSize', invoke the batchProcessor + * + * @param combinedTarget + * @return + */ + protected List processCompletedBatch(List combinedTarget) { + if (combinedTarget.size() == batchSize) { + batchProcessor.accept(combinedTarget); + return new ArrayList<>(); + } + return combinedTarget; + } + + public Function, List> finisher() { + return ts -> { + batchProcessor.accept(ts); + return Collections.emptyList(); + }; + } + + public Set characteristics() { + return Collections.emptySet(); + } +} diff --git a/src/main/java/com/github/rovats/utils/stream/StreamUtils.java b/src/main/java/com/github/rovats/utils/stream/StreamUtils.java index 7fd554c..b9c5f7c 100644 --- a/src/main/java/com/github/rovats/utils/stream/StreamUtils.java +++ b/src/main/java/com/github/rovats/utils/stream/StreamUtils.java @@ -1,20 +1,24 @@ -package com.github.rovats.utils.stream; - - -import java.util.List; -import java.util.function.Consumer; -import java.util.stream.Collector; - -public class StreamUtils { - - /** - * Creates a new batch collector - * @param batchSize the batch size after which the batchProcessor should be called - * @param batchProcessor the batch processor which accepts batches of records to process - * @param the type of elements being processed - * @return a batch collector instance - */ - public static Collector, List> batchCollector(int batchSize, Consumer> batchProcessor) { - return new BatchCollector(batchSize, batchProcessor); - } -} +package com.github.rovats.utils.stream; + + +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collector; + +public class StreamUtils { + + /** + * Creates a new batch collector + * @param batchSize the batch size after which the batchProcessor should be called + * @param batchProcessor the batch processor which accepts batches of records to process + * @param the type of elements being processed + * @return a batch collector instance + */ + public static Collector, List> batchCollector(int batchSize, boolean enforceStrictBatching, Consumer> batchProcessor) { + return new BatchCollector<>(batchSize, enforceStrictBatching, batchProcessor); + } + + public static Collector, List> batchCollector(int batchSize, Consumer> batchProcessor) { + return batchCollector(batchSize, false, batchProcessor); + } +} diff --git a/src/test/java/com/github/rovats/utils/stream/BatchCollectorTest.java b/src/test/java/com/github/rovats/utils/stream/BatchCollectorTest.java index bdbab1d..c29a9a6 100644 --- a/src/test/java/com/github/rovats/utils/stream/BatchCollectorTest.java +++ b/src/test/java/com/github/rovats/utils/stream/BatchCollectorTest.java @@ -1,60 +1,83 @@ -package com.github.rovats.utils.stream; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import static org.junit.Assert.assertArrayEquals; - -public class BatchCollectorTest { - - @Test - public void serialStream() { - List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - List output = new ArrayList<>(); - - input.stream() - .collect(StreamUtils.batchCollector(3, xs -> output.addAll(xs))); - - assertArrayEquals(input.toArray(), output.toArray()); - } - - @Test - public void parallelStream() { - List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - List output = new CopyOnWriteArrayList<>(); - - input.parallelStream() - .collect(StreamUtils.batchCollector(3, xs -> output.addAll(xs))); - - output.sort(Integer::compareTo); - assertArrayEquals(input.toArray(), output.toArray()); - } - - @Test - public void exactBatchSize() { - List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - List output = new ArrayList<>(); - - input.stream() - .collect(StreamUtils.batchCollector(10, xs -> output.addAll(xs))); - - assertArrayEquals(input.toArray(), output.toArray()); - } - - @Test - public void biggerBatchSize() { - List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - List output = new ArrayList<>(); - - input.stream() - .collect(StreamUtils.batchCollector(20, xs -> output.addAll(xs))); - - assertArrayEquals(input.toArray(), output.toArray()); - } - - +package com.github.rovats.utils.stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.junit.Assert; +import org.junit.Test; + +public class BatchCollectorTest { + + @Test + public void serialStream() { + List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List output = new ArrayList<>(); + + input.stream() + .collect(StreamUtils.batchCollector(3, xs -> output.addAll(xs))); + + assertArrayEquals(input.toArray(), output.toArray()); + } + + @Test + public void parallelStream() { + List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List output = new CopyOnWriteArrayList<>(); + + input.parallelStream() + .collect(StreamUtils.batchCollector(3, xs -> output.addAll(xs))); + + output.sort(Integer::compareTo); + assertArrayEquals(input.toArray(), output.toArray()); + } + + @Test + public void exactBatchSize() { + List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List output = new ArrayList<>(); + + input.stream() + .collect(StreamUtils.batchCollector(10, xs -> output.addAll(xs))); + + assertArrayEquals(input.toArray(), output.toArray()); + } + + @Test + public void biggerBatchSize() { + List input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List output = new ArrayList<>(); + + input.stream() + .collect(StreamUtils.batchCollector(20, xs -> output.addAll(xs))); + + assertArrayEquals(input.toArray(), output.toArray()); + } + + @Test + public void parallelStrictBatchSizeBatchCollector() { + List dataSet = Arrays.asList("S1", "S2", "S3", "S4", "S5", "S6", "S7", "S8", "S9", "S10"); + + int dataSize = dataSet.size(); + int batchSize = 3; + List finalList = dataSet + .stream() + .parallel() + .map(e -> e) + .collect( + StreamUtils.batchCollector( + batchSize, + true, // enforce strict batching + e -> assertTrue(e.size() == batchSize || e.size() == dataSize % batchSize) + ) + ); + Assert.assertTrue(finalList.size() == 0); + + + } + } \ No newline at end of file diff --git a/src/test/java/com/github/rovats/utils/stream/StreamUtilsTest.java b/src/test/java/com/github/rovats/utils/stream/StreamUtilsTest.java new file mode 100644 index 0000000..411fde3 --- /dev/null +++ b/src/test/java/com/github/rovats/utils/stream/StreamUtilsTest.java @@ -0,0 +1,16 @@ +/** + * StreamUtilsTest.java + */ +package com.github.rovats.utils.stream; + +import org.junit.Assert; +import org.junit.Test; + +public class StreamUtilsTest { + + @Test + public void testBatchCollector() { + Assert.assertTrue(StreamUtils.batchCollector(100, e -> {}) != null); + } + +}