Skip to content

Commit

Permalink
More work on changing reducers.
Browse files Browse the repository at this point in the history
  • Loading branch information
nigelduffy committed Feb 10, 2014
1 parent 7575d4b commit 4f0ec6a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 42 deletions.
106 changes: 69 additions & 37 deletions src/main/java/org/nickelproject/nickel/dataflow/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,90 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;

public final class Collector<S, T, V> implements CollectorInterface<S, V> {
private static final long serialVersionUID = 1L;
private final Function<S, T> preFunctor;
private final Reducer<T> reducer;
private final Function<T, V> postFunctor;
/**
* A wrapper to hide the irrelevant intermediate class in CollectorBase.
*
* @param <T>
* @param <U>
*/
public final class Collector<T, U> implements CollectorInterface<T, U> {
private static final long serialVersionUID = 1L;
private final CollectorBase<T, ?, U> collectorBase;

private Collector(final Function<S, T> preFunctor, final Reducer<T> reducer,
final Function<T, V> postFunctor) {
this.preFunctor = preFunctor;
this.reducer = reducer;
this.postFunctor = postFunctor;
private Collector(final CollectorBase<T, ?, U> collectorBase) {
this.collectorBase = collectorBase;
}

public static <S, T, V> Collector<S, T, V> create(final Function<S, T> preFunctor,
public static <S, T, V> Collector<S, V> create(final Function<S, T> preFunctor,
final Reducer<T> reducer,
final Function<T, V> postFunctor) {
return new Collector<S, T, V>(preFunctor, reducer, postFunctor);
return new Collector<S, V>(new CollectorBase<S, T, V>(preFunctor, reducer, postFunctor));
}

public static <S, T, X, Y, V> Collector<S, X, V> create(final Function<S, T> preFunctor,
final Collector<T, X, Y> collector,
public static <S, T> Collector<S, T> create(final Function<S, T> preFunction, final Reducer<T> reducer) {
return create(preFunction, reducer, Functions.<T>identity());
}

public static <S, T, X, Y, V> Collector<S, V> create(final Function<S, T> preFunctor,
final Collector<T, Y> collector,
final Function<Y, V> postFunctor) {
final Function<S, X> newPreFunction = Functions.compose(collector.preFunctor, preFunctor);
final Function<X, V> newPostFunction = Functions.compose(postFunctor, collector.postFunctor);
return new Collector<S, X, V>(newPreFunction, collector.reducer, newPostFunction);
@SuppressWarnings("unchecked")
final CollectorBase<T, X, Y> wrapped = (CollectorBase<T, X, Y>) collector.collectorBase;
final Function<S, X> newPreFunction = Functions.compose(wrapped.preFunctor, preFunctor);
final Function<X, V> newPostFunction = Functions.compose(postFunctor, wrapped.postFunctor);
return new Collector<S, V>(
new CollectorBase<S, X, V>(newPreFunction, wrapped.reducer, newPostFunction));
}


public static <S, T, V> Collector<S, V> create(final Function<S, T> preFunction,
final Collector<T, V> collector) {
return create(preFunction, collector, Functions.<V>identity());
}

@Override
public Reductor<S, V> reductor() {
return new ComposingReductor<S, T, V>(preFunctor, reducer.reductor(), postFunctor);
public Reductor<T, U> reductor() {
return collectorBase.reductor();
}

private static final class CollectorBase<S, T, V> implements CollectorInterface<S, V> {
private static final long serialVersionUID = 1L;
private final Function<S, T> preFunctor;
private final Reducer<T> reducer;
private final Function<T, V> postFunctor;

private static final class ComposingReductor<S, T, V> implements Reductor<S, V> {
private final Function<S, T> preFunction;
private final Function<T, V> postFunction;
private final Reductor<T, T> reductor;

ComposingReductor(final Function<S, T> preFunction, final Reductor<T, T> reductor,
final Function<T, V> postFunction) {
this.preFunction = preFunction;
this.postFunction = postFunction;
this.reductor = reductor;
private CollectorBase(final Function<S, T> preFunctor, final Reducer<T> reducer,
final Function<T, V> postFunctor) {
this.preFunctor = preFunctor;
this.reducer = reducer;
this.postFunctor = postFunctor;
}

@Override
public void collect(final S pVal) {
reductor.collect(preFunction.apply(pVal));
public Reductor<S, V> reductor() {
return new ComposingReductor<S, T, V>(preFunctor, reducer.reductor(), postFunctor);
}

@Override
public V reduce() {
return postFunction.apply(reductor.reduce());
private static final class ComposingReductor<S, T, V> implements Reductor<S, V> {
private final Function<S, T> preFunction;
private final Function<T, V> postFunction;
private final Reductor<T, T> reductor;

ComposingReductor(final Function<S, T> preFunction, final Reductor<T, T> reductor,
final Function<T, V> postFunction) {
this.preFunction = preFunction;
this.postFunction = postFunction;
this.reductor = reductor;
}

@Override
public void collect(final S pVal) {
reductor.collect(preFunction.apply(pVal));
}

@Override
public V reduce() {
return postFunction.apply(reductor.reduce());
}
}
}
}
16 changes: 16 additions & 0 deletions src/main/java/org/nickelproject/util/functions/ToListFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.nickelproject.util.functions;

import java.util.Collections;
import java.util.List;

import javax.annotation.Nonnull;

import com.google.common.base.Function;

public final class ToListFunction<T> implements Function<T, List<T>> {

@Override
public List<T> apply(@Nonnull final T input) {
return Collections.singletonList(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.google.common.collect.Lists;

public final class ToListReducer<T> implements Reducer<List<T>> {
public final class MergeListReducer<T> implements Reducer<List<T>> {
private static final long serialVersionUID = 1L;

@Override
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/nickelproject/util/reducers/ReducerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package org.nickelproject.util.reducers;

import java.util.List;

import javax.annotation.Nonnull;

import org.nickelproject.nickel.dataflow.Collector;
import org.nickelproject.nickel.dataflow.Reducer;
import org.nickelproject.util.functions.FunctionUtil;
import org.nickelproject.util.functions.PairFunction;
import org.nickelproject.util.functions.ToListFunction;
import org.nickelproject.util.tuple.Pair;

import com.google.common.base.Function;
Expand All @@ -32,11 +35,11 @@ private ReducerUtil() {
// Prevents construction
}

public static Collector<Object, Integer, Integer> count() {
return Collector.create(FunctionUtil.constant(1), new IntegerSumReducer(), Functions.<Integer>identity());
public static <T> Collector<T, Integer> count() {
return Collector.create(FunctionUtil.<T, Integer>constant(1), new IntegerSumReducer());
}

public static Collector<Double, Pair<Integer, Double>, Double> average() {
public static Collector<Double, Double> average() {
final Reducer<Pair<Integer, Double>> reducer =
new PairReducer<Integer, Double>(new IntegerSumReducer(), new DoubleSumReducer());
final Function<Double, Pair<Integer, Double>> function =
Expand All @@ -49,4 +52,8 @@ public Double apply(@Nonnull final Pair<Integer, Double> from) {
};
return Collector.create(function, reducer, divide);
}

public static <T> Collector<T, List<T>> toList() {
return Collector.create(new ToListFunction<T>(), new MergeListReducer<T>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void integerSumReducerTest2() {

@Test
public void toListReducerTest() {
final Reductor<List<Integer>, List<Integer>> toListReductor = new ToListReducer<Integer>().reductor();
final Reductor<List<Integer>, List<Integer>> toListReductor = new MergeListReducer<Integer>().reductor();
for (final Integer datum : Sequences.integer(0, max)) {
toListReductor.collect(Collections.singletonList(datum));
}
Expand Down

0 comments on commit 4f0ec6a

Please sign in to comment.