From 21ac3ab218918ebefa9137c98225e0063377ab32 Mon Sep 17 00:00:00 2001 From: nigelduffy Date: Sat, 8 Feb 2014 14:15:27 -0800 Subject: [PATCH] Reworking of reducers / sinks. --- .../nickel/blobStore/S3BlobStore.java | 4 +- .../DistributedCollectionReducer.java | 50 ++++++++++++++++ .../nickel/collections/InnerNode.java | 2 + .../nickel/dataflow/Collector.java | 60 +++++++++++++++++++ .../nickel/dataflow/CollectorInterface.java | 28 +++++++++ .../nickel/dataflow/Partitionable.java | 5 ++ .../nickelproject/nickel/dataflow/Sink.java | 26 ++++++++ .../nickelproject/nickel/dataflow/Source.java | 2 +- .../nickel/dataflow/Sources.java | 6 +- .../nickelproject/util/csvUtil/CsvSource.java | 2 +- .../util/sources/FileLineSource.java | 2 +- .../util/sources/S3MultiFileSource.java | 2 +- .../mapReduce/BaseMapReduceUtilTest.java | 1 - .../SynchronousMapReduceUtilTest.java | 1 - .../mapReduce/ThreadedMapReduceUtilTest.java | 1 - 15 files changed, 180 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/nickelproject/nickel/collections/DistributedCollectionReducer.java create mode 100644 src/main/java/org/nickelproject/nickel/dataflow/Collector.java create mode 100644 src/main/java/org/nickelproject/nickel/dataflow/CollectorInterface.java create mode 100644 src/main/java/org/nickelproject/nickel/dataflow/Partitionable.java create mode 100644 src/main/java/org/nickelproject/nickel/dataflow/Sink.java diff --git a/src/main/java/org/nickelproject/nickel/blobStore/S3BlobStore.java b/src/main/java/org/nickelproject/nickel/blobStore/S3BlobStore.java index 866861a..8ca3208 100644 --- a/src/main/java/org/nickelproject/nickel/blobStore/S3BlobStore.java +++ b/src/main/java/org/nickelproject/nickel/blobStore/S3BlobStore.java @@ -30,7 +30,7 @@ import com.google.inject.name.Named; public class S3BlobStore extends BlobStoreBase { - private final static int NotFoundCode = 404; + private static final int notFoundCode = 404; private final AmazonS3 s3Client; private final String bucketName; @@ -68,7 +68,7 @@ public final boolean contains(final BlobRef blobRef) { try { s3Client.getObjectMetadata(bucketName, blobRef.toString()); } catch (final AmazonS3Exception e) { - if(e.getStatusCode() == NotFoundCode) { + if (e.getStatusCode() == notFoundCode) { retVal = false; } else { throw RethrownException.rethrow(e); diff --git a/src/main/java/org/nickelproject/nickel/collections/DistributedCollectionReducer.java b/src/main/java/org/nickelproject/nickel/collections/DistributedCollectionReducer.java new file mode 100644 index 0000000..0290e87 --- /dev/null +++ b/src/main/java/org/nickelproject/nickel/collections/DistributedCollectionReducer.java @@ -0,0 +1,50 @@ +package org.nickelproject.nickel.collections; + +import java.util.List; + +import org.nickelproject.nickel.dataflow.Reductor; +import org.nickelproject.nickel.dataflow.Sink; + +import com.google.common.collect.Lists; + +public final class DistributedCollectionReducer implements Sink> { + private static final long serialVersionUID = 1L; + private final int nodeSize; + + public DistributedCollectionReducer(final int nodeSize) { + this.nodeSize = nodeSize; + } + + @Override + public Reductor> reductor() { + return new DistributedCollectionReductor(nodeSize); + } + + private static final class DistributedCollectionReductor + implements Reductor> { + private final List data = Lists.newArrayList(); + private final List> nodes = Lists.newArrayList(); + private final int nodeSize; + + public DistributedCollectionReductor(final int nodeSize) { + this.nodeSize = nodeSize; + } + + @Override + public void collect(final T pVal) { + data.add(pVal); + if (data.size() > nodeSize) { + nodes.add(DistributedCollectionUtil.from(data)); + data.clear(); + } + } + + @Override + public DistributedCollection reduce() { + if (data.size() > 0) { + nodes.add(DistributedCollectionUtil.from(data)); + } + return DistributedCollectionUtil.concat(nodes); + } + } +} diff --git a/src/main/java/org/nickelproject/nickel/collections/InnerNode.java b/src/main/java/org/nickelproject/nickel/collections/InnerNode.java index c82aaa1..8745737 100644 --- a/src/main/java/org/nickelproject/nickel/collections/InnerNode.java +++ b/src/main/java/org/nickelproject/nickel/collections/InnerNode.java @@ -29,7 +29,9 @@ final class InnerNode implements DistributedCollection { private static final long serialVersionUID = 1L; + //CHECKSTYLE:OFF private final ExternalReference[]> nodes; + //CHECKSTYLE:ON private final int size; public InnerNode(final DistributedCollection[] nodes) { diff --git a/src/main/java/org/nickelproject/nickel/dataflow/Collector.java b/src/main/java/org/nickelproject/nickel/dataflow/Collector.java new file mode 100644 index 0000000..1bb0f55 --- /dev/null +++ b/src/main/java/org/nickelproject/nickel/dataflow/Collector.java @@ -0,0 +1,60 @@ +package org.nickelproject.nickel.dataflow; + +import com.google.common.base.Function; +import com.google.common.base.Functions; + +public final class Collector implements CollectorInterface { + private static final long serialVersionUID = 1L; + private final Function preFunctor; + private final Reducer reducer; + private final Function postFunctor; + + private Collector(final Function preFunctor, final Reducer reducer, + final Function postFunctor) { + this.preFunctor = preFunctor; + this.reducer = reducer; + this.postFunctor = postFunctor; + } + + public static Collector create(final Function preFunctor, + final Reducer reducer, + final Function postFunctor) { + return new Collector(preFunctor, reducer, postFunctor); + } + + public static Collector create(final Function preFunctor, + final Collector collector, + final Function postFunctor) { + final Function newPreFunction = Functions.compose(collector.preFunctor, preFunctor); + final Function newPostFunction = Functions.compose(postFunctor, collector.postFunctor); + return new Collector(newPreFunction, collector.reducer, newPostFunction); + } + + @Override + public Reductor reductor() { + return new ComposingReductor(preFunctor, reducer.reductor(), postFunctor); + } + + private static final class ComposingReductor implements Reductor { + private final Function preFunction; + private final Function postFunction; + private final Reductor reductor; + + ComposingReductor(final Function preFunction, final Reductor reductor, + final Function postFunction) { + this.preFunction = preFunction; + this.postFunction = postFunction; + this.reductor = reductor; + } + + @Override + public void collect(final S pVal) { + reductor.collect(preFunction.apply(pVal)); + } + + @Override + public V reduce() { + return postFunction.apply(reductor.reduce()); + } + } +} diff --git a/src/main/java/org/nickelproject/nickel/dataflow/CollectorInterface.java b/src/main/java/org/nickelproject/nickel/dataflow/CollectorInterface.java new file mode 100644 index 0000000..46c0151 --- /dev/null +++ b/src/main/java/org/nickelproject/nickel/dataflow/CollectorInterface.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2013 Nigel Duffy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.nickelproject.nickel.dataflow; + +import java.io.Serializable; + +/** + * Do not extend this class. + * + * @param + * @param + */ +interface CollectorInterface extends Serializable { + Reductor reductor(); +} diff --git a/src/main/java/org/nickelproject/nickel/dataflow/Partitionable.java b/src/main/java/org/nickelproject/nickel/dataflow/Partitionable.java new file mode 100644 index 0000000..39a46a2 --- /dev/null +++ b/src/main/java/org/nickelproject/nickel/dataflow/Partitionable.java @@ -0,0 +1,5 @@ +package org.nickelproject.nickel.dataflow; + +public interface Partitionable { + Source> partition(final int partitionSize); +} diff --git a/src/main/java/org/nickelproject/nickel/dataflow/Sink.java b/src/main/java/org/nickelproject/nickel/dataflow/Sink.java new file mode 100644 index 0000000..17bb5b0 --- /dev/null +++ b/src/main/java/org/nickelproject/nickel/dataflow/Sink.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2013 Nigel Duffy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.nickelproject.nickel.dataflow; + +/** + * A Sink is a Collector that cannot be nested. + * Usually, this should involve output to a file, terminal, database, etc. + * + * @param + * @param + */ +public interface Sink extends CollectorInterface { +} diff --git a/src/main/java/org/nickelproject/nickel/dataflow/Source.java b/src/main/java/org/nickelproject/nickel/dataflow/Source.java index 88ff805..2b27dc2 100644 --- a/src/main/java/org/nickelproject/nickel/dataflow/Source.java +++ b/src/main/java/org/nickelproject/nickel/dataflow/Source.java @@ -18,7 +18,7 @@ import java.io.Serializable; public interface Source extends Iterable, Serializable { - public final static int UNKOWN_SIZE = -1; + int unknownSize = -1; // Size is just a guideline here. Source> partition(final int sizeGuideline); diff --git a/src/main/java/org/nickelproject/nickel/dataflow/Sources.java b/src/main/java/org/nickelproject/nickel/dataflow/Sources.java index fb5d764..b6efe73 100644 --- a/src/main/java/org/nickelproject/nickel/dataflow/Sources.java +++ b/src/main/java/org/nickelproject/nickel/dataflow/Sources.java @@ -77,7 +77,7 @@ public ConcatSource(final Source> sources) { @Override public int size() { - return Source.UNKOWN_SIZE; + return Source.unknownSize; } @Override @@ -109,7 +109,7 @@ public IterableSource(final Iterable iterable, final int size) { } public IterableSource(final Iterable iterable) { - this(iterable, Source.UNKOWN_SIZE); + this(iterable, Source.unknownSize); } @Override @@ -180,7 +180,7 @@ public FilteredSource(final Source wrappedSource, final Predicate predicat @Override public int size() { - return Source.UNKOWN_SIZE; + return Source.unknownSize; } @Override diff --git a/src/main/java/org/nickelproject/util/csvUtil/CsvSource.java b/src/main/java/org/nickelproject/util/csvUtil/CsvSource.java index ef5cf5e..56602df 100644 --- a/src/main/java/org/nickelproject/util/csvUtil/CsvSource.java +++ b/src/main/java/org/nickelproject/util/csvUtil/CsvSource.java @@ -42,7 +42,7 @@ public CsvSource(final InputStreamFactory inputStreamFactory, final RecordDataTy @Override public int size() { - return Source.UNKOWN_SIZE; + return Source.unknownSize; } @Override diff --git a/src/main/java/org/nickelproject/util/sources/FileLineSource.java b/src/main/java/org/nickelproject/util/sources/FileLineSource.java index 4bc69f4..21a345c 100755 --- a/src/main/java/org/nickelproject/util/sources/FileLineSource.java +++ b/src/main/java/org/nickelproject/util/sources/FileLineSource.java @@ -38,7 +38,7 @@ public FileLineSource(final InputStreamFactory inputStreamFactory) { @Override public int size() { - return Source.UNKOWN_SIZE; + return Source.unknownSize; } @Override diff --git a/src/main/java/org/nickelproject/util/sources/S3MultiFileSource.java b/src/main/java/org/nickelproject/util/sources/S3MultiFileSource.java index 4f32971..74cf326 100644 --- a/src/main/java/org/nickelproject/util/sources/S3MultiFileSource.java +++ b/src/main/java/org/nickelproject/util/sources/S3MultiFileSource.java @@ -48,7 +48,7 @@ public S3MultiFileSource(final String bucketName, final String key, final Record @Override public int size() { - return Source.UNKOWN_SIZE; + return Source.unknownSize; } @Override diff --git a/src/test/java/org/nickelproject/nickel/mapReduce/BaseMapReduceUtilTest.java b/src/test/java/org/nickelproject/nickel/mapReduce/BaseMapReduceUtilTest.java index 63c72cb..690e585 100644 --- a/src/test/java/org/nickelproject/nickel/mapReduce/BaseMapReduceUtilTest.java +++ b/src/test/java/org/nickelproject/nickel/mapReduce/BaseMapReduceUtilTest.java @@ -25,7 +25,6 @@ import org.nickelproject.nickel.dataflow.MapReduceUtil; import org.nickelproject.nickel.dataflow.Source; import org.nickelproject.nickel.dataflow.Sources; -import org.nickelproject.nickel.mapReduce.Mapper; import org.nickelproject.util.reducers.IntegerSumReducer; import org.nickelproject.util.sources.Sequences; import org.nickelproject.util.testUtil.UnitAnnotation; diff --git a/src/test/java/org/nickelproject/nickel/mapReduce/SynchronousMapReduceUtilTest.java b/src/test/java/org/nickelproject/nickel/mapReduce/SynchronousMapReduceUtilTest.java index 1c82649..fcba6ff 100644 --- a/src/test/java/org/nickelproject/nickel/mapReduce/SynchronousMapReduceUtilTest.java +++ b/src/test/java/org/nickelproject/nickel/mapReduce/SynchronousMapReduceUtilTest.java @@ -15,7 +15,6 @@ */ package org.nickelproject.nickel.mapReduce; -import org.nickelproject.nickel.dataflow.MapReduceUtil; import org.nickelproject.util.testUtil.UnitAnnotation; diff --git a/src/test/java/org/nickelproject/nickel/mapReduce/ThreadedMapReduceUtilTest.java b/src/test/java/org/nickelproject/nickel/mapReduce/ThreadedMapReduceUtilTest.java index c37d488..537460a 100644 --- a/src/test/java/org/nickelproject/nickel/mapReduce/ThreadedMapReduceUtilTest.java +++ b/src/test/java/org/nickelproject/nickel/mapReduce/ThreadedMapReduceUtilTest.java @@ -15,7 +15,6 @@ */ package org.nickelproject.nickel.mapReduce; -import org.nickelproject.nickel.dataflow.MapReduceUtil; import org.nickelproject.util.testUtil.UnitAnnotation;