Skip to content

Commit

Permalink
Reworking of reducers / sinks.
Browse files Browse the repository at this point in the history
  • Loading branch information
nigelduffy committed Feb 8, 2014
1 parent 404cd7a commit 21ac3ab
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 12 deletions.
Expand Up @@ -30,7 +30,7 @@
import com.google.inject.name.Named;

public class S3BlobStore extends BlobStoreBase {
private final static int NotFoundCode = 404;
private static final int notFoundCode = 404;
private final AmazonS3 s3Client;
private final String bucketName;

Expand Down Expand Up @@ -68,7 +68,7 @@ public final boolean contains(final BlobRef blobRef) {
try {
s3Client.getObjectMetadata(bucketName, blobRef.toString());
} catch (final AmazonS3Exception e) {
if(e.getStatusCode() == NotFoundCode) {
if (e.getStatusCode() == notFoundCode) {
retVal = false;
} else {
throw RethrownException.rethrow(e);
Expand Down
@@ -0,0 +1,50 @@
package org.nickelproject.nickel.collections;

import java.util.List;

import org.nickelproject.nickel.dataflow.Reductor;
import org.nickelproject.nickel.dataflow.Sink;

import com.google.common.collect.Lists;

public final class DistributedCollectionReducer<T> implements Sink<T, DistributedCollection<T>> {
private static final long serialVersionUID = 1L;
private final int nodeSize;

public DistributedCollectionReducer(final int nodeSize) {
this.nodeSize = nodeSize;
}

@Override
public Reductor<T, DistributedCollection<T>> reductor() {
return new DistributedCollectionReductor<T>(nodeSize);
}

private static final class DistributedCollectionReductor<T>
implements Reductor<T, DistributedCollection<T>> {
private final List<T> data = Lists.newArrayList();
private final List<DistributedCollection<T>> nodes = Lists.newArrayList();
private final int nodeSize;

public DistributedCollectionReductor(final int nodeSize) {
this.nodeSize = nodeSize;
}

@Override
public void collect(final T pVal) {
data.add(pVal);
if (data.size() > nodeSize) {
nodes.add(DistributedCollectionUtil.from(data));
data.clear();
}
}

@Override
public DistributedCollection<T> reduce() {
if (data.size() > 0) {
nodes.add(DistributedCollectionUtil.from(data));
}
return DistributedCollectionUtil.concat(nodes);
}
}
}
Expand Up @@ -29,7 +29,9 @@

final class InnerNode<T> implements DistributedCollection<T> {
private static final long serialVersionUID = 1L;
//CHECKSTYLE:OFF
private final ExternalReference<DistributedCollection<T>[]> nodes;
//CHECKSTYLE:ON
private final int size;

public InnerNode(final DistributedCollection<T>[] nodes) {
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/org/nickelproject/nickel/dataflow/Collector.java
@@ -0,0 +1,60 @@
package org.nickelproject.nickel.dataflow;

import com.google.common.base.Function;
import com.google.common.base.Functions;

public final class Collector<S, T, V> implements CollectorInterface<S, V> {
private static final long serialVersionUID = 1L;
private final Function<S, T> preFunctor;
private final Reducer<T> reducer;
private final Function<T, V> postFunctor;

private Collector(final Function<S, T> preFunctor, final Reducer<T> reducer,
final Function<T, V> postFunctor) {
this.preFunctor = preFunctor;
this.reducer = reducer;
this.postFunctor = postFunctor;
}

public static <S, T, V> Collector<S, T, V> create(final Function<S, T> preFunctor,
final Reducer<T> reducer,
final Function<T, V> postFunctor) {
return new Collector<S, T, V>(preFunctor, reducer, postFunctor);
}

public static <S, T, X, Y, V> Collector<S, X, V> create(final Function<S, T> preFunctor,
final Collector<T, X, Y> collector,
final Function<Y, V> postFunctor) {
final Function<S, X> newPreFunction = Functions.compose(collector.preFunctor, preFunctor);
final Function<X, V> newPostFunction = Functions.compose(postFunctor, collector.postFunctor);
return new Collector<S, X, V>(newPreFunction, collector.reducer, newPostFunction);
}

@Override
public Reductor<S, V> reductor() {
return new ComposingReductor<S, T, V>(preFunctor, reducer.reductor(), postFunctor);
}

private static final class ComposingReductor<S, T, V> implements Reductor<S, V> {
private final Function<S, T> preFunction;
private final Function<T, V> postFunction;
private final Reductor<T, T> reductor;

ComposingReductor(final Function<S, T> preFunction, final Reductor<T, T> reductor,
final Function<T, V> postFunction) {
this.preFunction = preFunction;
this.postFunction = postFunction;
this.reductor = reductor;
}

@Override
public void collect(final S pVal) {
reductor.collect(preFunction.apply(pVal));
}

@Override
public V reduce() {
return postFunction.apply(reductor.reduce());
}
}
}
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2013 Nigel Duffy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.nickelproject.nickel.dataflow;

import java.io.Serializable;

/**
* Do not extend this class.
*
* @param <T>
* @param <U>
*/
interface CollectorInterface<T, U> extends Serializable {
Reductor<T, U> reductor();
}
@@ -0,0 +1,5 @@
package org.nickelproject.nickel.dataflow;

public interface Partitionable<T> {
Source<? extends Source<T>> partition(final int partitionSize);
}
26 changes: 26 additions & 0 deletions src/main/java/org/nickelproject/nickel/dataflow/Sink.java
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2013 Nigel Duffy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.nickelproject.nickel.dataflow;

/**
* A Sink is a Collector that cannot be nested.
* Usually, this should involve output to a file, terminal, database, etc.
*
* @param <T>
* @param <U>
*/
public interface Sink<T, U> extends CollectorInterface<T, U> {
}
Expand Up @@ -18,7 +18,7 @@
import java.io.Serializable;

public interface Source<T> extends Iterable<T>, Serializable {
public final static int UNKOWN_SIZE = -1;
int unknownSize = -1;

// Size is just a guideline here.
Source<? extends Source<T>> partition(final int sizeGuideline);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/nickelproject/nickel/dataflow/Sources.java
Expand Up @@ -77,7 +77,7 @@ public ConcatSource(final Source<Source<S>> sources) {

@Override
public int size() {
return Source.UNKOWN_SIZE;
return Source.unknownSize;
}

@Override
Expand Down Expand Up @@ -109,7 +109,7 @@ public IterableSource(final Iterable<S> iterable, final int size) {
}

public IterableSource(final Iterable<S> iterable) {
this(iterable, Source.UNKOWN_SIZE);
this(iterable, Source.unknownSize);
}

@Override
Expand Down Expand Up @@ -180,7 +180,7 @@ public FilteredSource(final Source<T> wrappedSource, final Predicate<T> predicat

@Override
public int size() {
return Source.UNKOWN_SIZE;
return Source.unknownSize;
}

@Override
Expand Down
Expand Up @@ -42,7 +42,7 @@ public CsvSource(final InputStreamFactory inputStreamFactory, final RecordDataTy

@Override
public int size() {
return Source.UNKOWN_SIZE;
return Source.unknownSize;
}

@Override
Expand Down
Expand Up @@ -38,7 +38,7 @@ public FileLineSource(final InputStreamFactory inputStreamFactory) {

@Override
public int size() {
return Source.UNKOWN_SIZE;
return Source.unknownSize;
}

@Override
Expand Down
Expand Up @@ -48,7 +48,7 @@ public S3MultiFileSource(final String bucketName, final String key, final Record

@Override
public int size() {
return Source.UNKOWN_SIZE;
return Source.unknownSize;
}

@Override
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.nickelproject.nickel.dataflow.MapReduceUtil;
import org.nickelproject.nickel.dataflow.Source;
import org.nickelproject.nickel.dataflow.Sources;
import org.nickelproject.nickel.mapReduce.Mapper;
import org.nickelproject.util.reducers.IntegerSumReducer;
import org.nickelproject.util.sources.Sequences;
import org.nickelproject.util.testUtil.UnitAnnotation;
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package org.nickelproject.nickel.mapReduce;

import org.nickelproject.nickel.dataflow.MapReduceUtil;
import org.nickelproject.util.testUtil.UnitAnnotation;


Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package org.nickelproject.nickel.mapReduce;

import org.nickelproject.nickel.dataflow.MapReduceUtil;
import org.nickelproject.util.testUtil.UnitAnnotation;


Expand Down

0 comments on commit 21ac3ab

Please sign in to comment.