Skip to content

Commit

Permalink
Merge pull request apache#211 from tgroh/explode_windowed_value
Browse files Browse the repository at this point in the history
Add WindowedValue#explodeWindows
  • Loading branch information
bjchambers committed Apr 20, 2016
2 parents ef0738b + 1f75fa9 commit 3279684
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ public void processElement(WindowedValue<InputT> elem) {
} else {
// We could modify the windowed value (and the processContext) to
// avoid repeated allocations, but this is more straightforward.
for (BoundedWindow window : elem.getWindows()) {
invokeProcessElement(WindowedValue.of(
elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
invokeProcessElement(windowedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -171,6 +172,18 @@ public T getValue() {
*/
public abstract Collection<? extends BoundedWindow> getWindows();

/**
* Returns a collection of {@link WindowedValue WindowedValues} identical to this one, except each
* is in exactly one of the windows that this {@link WindowedValue} is in.
*/
public Iterable<WindowedValue<T>> explodeWindows() {
ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder();
for (BoundedWindow w : getWindows()) {
windowedValues.add(of(getValue(), getTimestamp(), w, getPane()));
}
return windowedValues.build();
}

/**
* Returns the pane of this {@code WindowedValue} in its window.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@

package com.google.cloud.dataflow.sdk.util;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import org.joda.time.Instant;
import org.junit.Assert;
Expand Down Expand Up @@ -54,4 +63,47 @@ public void testWindowedValueCoder() throws CoderException {
Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp());
Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray());
}

@Test
public void testExplodeWindowsInNoWindowsEmptyIterable() {
WindowedValue<String> value =
WindowedValue.of(
"foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);

assertThat(value.explodeWindows(), emptyIterable());
}

@Test
public void testExplodeWindowsInOneWindowEquals() {
Instant now = Instant.now();
BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));
WindowedValue<String> value =
WindowedValue.of("foo", now, window, PaneInfo.ON_TIME_AND_ONLY_FIRING);

assertThat(Iterables.getOnlyElement(value.explodeWindows()), equalTo(value));
}

@Test
public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
Instant now = Instant.now();
BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L));
BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L));
BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L));
BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L));
PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L);
WindowedValue<String> value =
WindowedValue.of(
"foo",
now,
ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow),
pane);

assertThat(
value.explodeWindows(),
containsInAnyOrder(
WindowedValue.of("foo", now, futureFutureWindow, pane),
WindowedValue.of("foo", now, futureWindow, pane),
WindowedValue.of("foo", now, centerWindow, pane),
WindowedValue.of("foo", now, pastWindow, pane)));
}
}

0 comments on commit 3279684

Please sign in to comment.