Skip to content

Commit

Permalink
[BEAM-5195] Decomposition of TopPerKey was fixed. Documentatioin ex…
Browse files Browse the repository at this point in the history
…ample and test added.
  • Loading branch information
VaclavPlajt committed Aug 24, 2018
1 parent 4f3dfbe commit 2e65bb2
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 118 deletions.
Expand Up @@ -20,7 +20,6 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
Expand All @@ -31,18 +30,13 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.StateAwareWindowWiseSingleInputOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.State;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StorageProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.windowing.WindowingDesc;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeAware;
import org.apache.beam.sdk.extensions.euphoria.core.client.type.TypeUtils;
Expand All @@ -59,7 +53,7 @@
* Emits top element for defined keys and windows. The elements are compared by comparable objects
* extracted by user defined function applied on input elements.
*
* <p>Custom {@link Windowing} can be set, otherwise values from input operator are used.
* <p>Custom {@link Windowing} can be set.
*
* <p>Example:
*
Expand Down Expand Up @@ -96,10 +90,10 @@ public class TopPerKey<InputT, K, V, ScoreT extends Comparable<ScoreT>, W extend
InputT, InputT, K, Triple<K, V, ScoreT>, W, TopPerKey<InputT, K, V, ScoreT, W>>
implements TypeAware.Value<V> {

private final UnaryFunction<InputT, V> valueFn;
private final UnaryFunction<InputT, V> valueExtractor;
private final TypeDescriptor<V> valueType;

private final UnaryFunction<InputT, ScoreT> scoreFn;
private final UnaryFunction<InputT, ScoreT> scoreCalculator;
private final TypeDescriptor<ScoreT> scoreType;

TopPerKey(
Expand All @@ -108,9 +102,9 @@ public class TopPerKey<InputT, K, V, ScoreT extends Comparable<ScoreT>, W extend
Dataset<InputT> input,
UnaryFunction<InputT, K> keyExtractor,
@Nullable TypeDescriptor<K> keyType,
UnaryFunction<InputT, V> valueFn,
UnaryFunction<InputT, V> valueExtractor,
TypeDescriptor<V> valueType,
@Nullable UnaryFunction<InputT, ScoreT> scoreFn,
@Nullable UnaryFunction<InputT, ScoreT> scoreCalculator,
TypeDescriptor<ScoreT> scoreType,
@Nullable WindowingDesc<Object, W> windowing,
@Nullable Windowing euphoriaWindowing,
Expand All @@ -127,9 +121,9 @@ public class TopPerKey<InputT, K, V, ScoreT extends Comparable<ScoreT>, W extend
euphoriaWindowing,
outputHints);

this.valueFn = valueFn;
this.valueExtractor = valueExtractor;
this.valueType = valueType;
this.scoreFn = scoreFn;
this.scoreCalculator = scoreCalculator;
this.scoreType = scoreType;
}

Expand Down Expand Up @@ -157,49 +151,61 @@ public static OfBuilder named(String name) {
}

public UnaryFunction<InputT, V> getValueExtractor() {
return valueFn;
return valueExtractor;
}

public UnaryFunction<InputT, ScoreT> getScoreExtractor() {
return scoreFn;
return scoreCalculator;
}

@Override
public DAG<Operator<?, ?>> getBasicOps() {
Flow flow = getFlow();

StateSupport.MergeFromStateMerger<KV<V, ScoreT>, KV<V, ScoreT>, MaxScored<V, ScoreT>>
stateCombiner = new StateSupport.MergeFromStateMerger<>();

TypeDescriptor<KV<V, ScoreT>> rsbkValueType = TypeUtils.keyValues(valueType, scoreType);

ReduceStateByKey<InputT, K, KV<V, ScoreT>, KV<V, ScoreT>, MaxScored<V, ScoreT>, W> reduce =
new ReduceStateByKey<>(
getName() + "::ReduceStateByKey",
// Firs we need to remap input elements to elements containing value and score
// in order to make following ReduceByKey combinable
MapElements<InputT, Triple<K, V, ScoreT>> inputMapperToScoredKvs =
new MapElements<>(
getName() + ":: ExtractKeyValueAndScore",
flow,
input,
keyExtractor,
(InputT element) ->
Triple.of(
keyExtractor.apply(element),
valueExtractor.apply(element),
scoreCalculator.apply(element)),
outputType);

ReduceByKey<Triple<K, V, ScoreT>, K, Triple<K, V, ScoreT>, Triple<K, V, ScoreT>, W> reduce =
new ReduceByKey<>(
getName() + ":: ReduceByKey",
flow,
inputMapperToScoredKvs.output(),
Triple::getFirst,
keyType,
e -> KV.of(valueFn.apply(e), scoreFn.apply(e)),
rsbkValueType,
UnaryFunction.identity(),
outputType,
windowing,
euphoriaWindowing,
(StateContext context, Collector<KV<V, ScoreT>> collector) ->
new MaxScored<>(context.getStorageProvider()),
stateCombiner,
TypeUtils.keyValues(keyType, rsbkValueType),
Collections.emptySet());
(CombinableReduceFunction<Triple<K, V, ScoreT>>)
(triplets) ->
triplets
.reduce((a, b) -> a.getThird().compareTo(b.getThird()) > 0 ? a : b)
.orElseThrow(IllegalStateException::new),
getHints(),
TypeUtils.keyValues(keyType, outputType));

MapElements<KV<K, KV<V, ScoreT>>, Triple<K, V, ScoreT>> format =
MapElements<KV<K, Triple<K, V, ScoreT>>, Triple<K, V, ScoreT>> format =
new MapElements<>(
getName() + "::MapElements",
getName() + "::MapToOutputFormat",
flow,
reduce.output(),
e -> Triple.of(e.getKey(), e.getValue().getKey(), e.getValue().getValue()),
KV::getValue,
getHints(),
outputType);

DAG<Operator<?, ?>> dag = DAG.of(reduce);
DAG<Operator<?, ?>> dag = DAG.of(inputMapperToScoredKvs);
dag.add(reduce, inputMapperToScoredKvs);
dag.add(format, reduce);

return dag;
Expand Down Expand Up @@ -228,60 +234,13 @@ private static final class BuiderParams<
UnaryFunction<InputT, ScoreT> scoreFn;
TypeDescriptor<ScoreT> scoreType;

public BuiderParams(String name, Dataset<InputT> input) {
BuiderParams(String name, Dataset<InputT> input) {
this.name = name;
this.input = input;
}
}

/** TODO: complete javadoc. */
private static final class MaxScored<V, CompareT extends Comparable<CompareT>>
implements State<KV<V, CompareT>, KV<V, CompareT>>,
StateSupport.MergeFrom<MaxScored<V, CompareT>> {

static final ValueStorageDescriptor<KV> MAX_STATE_DESCR =
ValueStorageDescriptor.of("max", KV.class, KV.of(null, null));

final ValueStorage<KV<V, CompareT>> curr;

@SuppressWarnings("unchecked")
MaxScored(StorageProvider storageProvider) {
curr = (ValueStorage) storageProvider.getValueStorage(MAX_STATE_DESCR);
}

@Override
public void add(KV<V, CompareT> element) {
KV<V, CompareT> c = curr.get();
if (c.getKey() == null || element.getValue().compareTo(c.getValue()) > 0) {
curr.set(element);
}
}

@Override
public void flush(Collector<KV<V, CompareT>> context) {
KV<V, CompareT> c = curr.get();
if (c.getKey() != null) {
context.collect(c);
}
}

@Override
public void close() {
curr.clear();
}

@Override
public void mergeFrom(MaxScored<V, CompareT> other) {
KV<V, CompareT> o = other.curr.get();
if (o.getKey() != null) {
this.add(o);
}
}
}

// ~ -----------------------------------------------------------------------------

/** TODO: complete javadoc. */
/** Star of builders chain. */
public static class OfBuilder implements Builders.Of {

private final String name;
Expand All @@ -296,7 +255,7 @@ public <InputT> KeyByBuilder<InputT> of(Dataset<InputT> input) {
}
}

/** TODO: complete javadoc. */
/** Key extractor defining builder. */
public static class KeyByBuilder<InputT> implements Builders.KeyBy<InputT> {

private final BuiderParams<InputT, ?, ?, ?, ?> params;
Expand All @@ -323,7 +282,7 @@ public <K> ValueByBuilder<InputT, K> keyBy(
}
}

/** TODO: complete javadoc. */
/** Value extractor defining builder. */
public static class ValueByBuilder<InputT, K> {

private final BuiderParams<InputT, K, ?, ?, ?> params;
Expand All @@ -347,7 +306,7 @@ public <V> ScoreByBuilder<InputT, K, V> valueBy(
}
}

/** TODO: complete javadoc. */
/** Score calculator defining builder. */
public static class ScoreByBuilder<InputT, K, V> {

private final BuiderParams<InputT, K, V, ?, ?> params;
Expand All @@ -374,7 +333,7 @@ public <ScoreT extends Comparable<ScoreT>> WindowByBuilder<InputT, K, V, ScoreT>
}
}

/** TODO: complete javadoc. */
/** First of windowing defining builders. */
public static class WindowByBuilder<InputT, K, V, ScoreT extends Comparable<ScoreT>>
implements Builders.WindowBy<TriggerByBuilder<InputT, K, V, ScoreT, ?>>,
Builders.Output<Triple<K, V, ScoreT>>,
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.testkit.ReduceByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.ReduceWindowTest;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.SumByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.TopPerKeyTest;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.UnionTest;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.junit.TestSuiteRunner;
import org.junit.runner.RunWith;
Expand All @@ -44,7 +45,7 @@
ReduceByKeyTest.class,
// ReduceStateByKeyTest.class,
SumByKeyTest.class,
// TopPerKeyTest.class, - uncomment when ReduceStateByKey is supported
TopPerKeyTest.class,
UnionTest.class,
// WindowingTest.class,
ReduceWindowTest.class
Expand Down
Expand Up @@ -44,9 +44,11 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.TopPerKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Util;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Fold;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
import org.apache.beam.sdk.extensions.euphoria.core.translate.BeamFlow;
import org.apache.beam.sdk.extensions.euphoria.core.translate.EuphoriaPTransform;
import org.apache.beam.sdk.extensions.euphoria.core.translate.coder.KryoCoder;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class DocumentationExamplesTest {
"Quisque ipsum fermentum nisl at libero accumsan consectetur.",
"Praesent lobortis ex eget ex rhoncus, quis malesuada risus tristique.",
"Aliquam risus at orci, porttitor eu turpis et, porttitor semper ligula.");

@Rule public final TestPipeline pipeline = TestPipeline.create();

@Ignore("We do not want to actually write output files from this test.")
Expand Down Expand Up @@ -843,4 +846,43 @@ public void testReduceWithWindowOperator() {

pipeline.run();
}

@Test
public void testTopPerKeyOperator() {
BeamFlow flow = BeamFlow.of(pipeline);

Dataset<String> animals =
flow.createInput(
ListDataSource.bounded(
asList(
"mouse",
"elk",
"rat",
"mule",
"elephant",
"dinosaur",
"cat",
"duck",
"caterpillar")));

// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
Dataset<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
.scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
.output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]

PAssert.that(flow.unwrapped(longestNamesByLetter))
.containsInAnyOrder(
Triple.of('m', "mouse", 5),
Triple.of('r', "rat", 3),
Triple.of('e', "elephant", 8),
Triple.of('d', "dinosaur", 8),
Triple.of('c', "caterpillar", 11));

pipeline.run();
}
}

0 comments on commit 2e65bb2

Please sign in to comment.