Skip to content

Commit

Permalink
Merge pull request apache#171 from dhalperi/backport-30
Browse files Browse the repository at this point in the history
Backport apache#30
  • Loading branch information
davorbonaci committed Mar 25, 2016
2 parents 4690c4a + c2e002a commit 339eb0e
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -994,7 +995,7 @@ private TransformWatermarks(
* Returns the input watermark of the {@link AppliedPTransform}.
*/
public Instant getInputWatermark() {
return inputWatermark.get();
return Preconditions.checkNotNull(inputWatermark.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public Instant currentSynchronizedProcessingTime() {
}

@Override
@Nullable
public Instant currentInputWatermarkTime() {
return watermarks.getInputWatermark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ public boolean apply(WindowedValue<InputT> input) {
/** Is {@code window} expired w.r.t. the garbage collection watermark? */
private boolean canDropDueToExpiredWindow(BoundedWindow window) {
Instant inputWM = timerInternals.currentInputWatermarkTime();
return inputWM != null
&& window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private <W> PaneInfo describePane(
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;

// True is the input watermark hasn't passed the window's max timestamp.
boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);

Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ public Instant currentSynchronizedProcessingTime() {
}

@Override
@Nullable
public Instant currentEventTime() {
return timerInternals.currentInputWatermarkTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ private Collection<W> processElement(WindowedValue<InputT> value) throws Excepti
directContext.timestamp(),
directContext.timers(),
directContext.state());

// At this point, if triggerRunner.shouldFire before the processValue then
// triggerRunner.shouldFire after the processValue. In other words adding values
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}

return windows;
Expand Down Expand Up @@ -532,6 +537,10 @@ public void onTimer(TimerData timer) throws Exception {
"ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
}

// If this is an end-of-window timer then, we need to set a GC timer
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());

// If this is a garbage collection timer then we should trigger and garbage collect the window.
Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
boolean isGarbageCollection =
Expand All @@ -548,7 +557,7 @@ public void onTimer(TimerData timer) throws Exception {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
onTrigger(directContext, renamedContext, true/* isFinished */);
onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
}

// Cleanup flavor B: Clear all the remaining state for this window since we'll never
Expand All @@ -564,10 +573,12 @@ public void onTimer(TimerData timer) throws Exception {
emitIfAppropriate(directContext, renamedContext);
}

// If this is an end-of-window timer then, we need to set a GC timer
boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
// We could assert this but it is very expensive.

// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
Expand Down Expand Up @@ -666,7 +677,7 @@ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directCon
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isFinished);
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);

// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
Expand Down Expand Up @@ -713,10 +724,12 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing
private void onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isFinished)
boolean isFinished, boolean isEndOfWindow)
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();

// Prefetch necessary states
ReadableState<Instant> outputTimestampFuture =
ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
Expand All @@ -729,7 +742,41 @@ private void onTrigger(
// Calculate the pane info.
final PaneInfo pane = paneFuture.read();
// Extract the window hold, and as a side effect clear it.
final Instant outputTimestamp = outputTimestampFuture.read();

WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;

if (newHold != null) {
// We can't be finished yet.
Preconditions.checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
Preconditions.checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(
directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
Preconditions.checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
Preconditions.checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}

// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
Expand Down Expand Up @@ -778,7 +825,7 @@ private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
Instant endOfWindow = directContext.window().maxTimestamp();
Instant fireTime;
String which;
if (inputWM != null && endOfWindow.isBefore(inputWM)) {
if (endOfWindow.isBefore(inputWM)) {
fireTime = endOfWindow.plus(windowingStrategy.getAllowedLateness());
which = "garbage collection";
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ public interface TimerInternals {

/**
* Return the current, local input watermark timestamp for this computation
* in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
* in the {@link TimeDomain#EVENT_TIME} time domain.
*
* <p>This value:
* <ol>
* <li>Is never {@literal null}, but may be {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
* <li>Is monotonically increasing.
* <li>May differ between workers due to network and other delays.
* <li>Will never be ahead of the global input watermark for this computation. But it
Expand All @@ -95,7 +96,6 @@ public interface TimerInternals {
* it is possible for an element to be considered locally on-time even though it is
* globally late.
*/
@Nullable
Instant currentInputWatermarkTime();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public interface Timers {
@Nullable
public abstract Instant currentSynchronizedProcessingTime();

/** Returns the current event time or {@code null} if unknown. */
@Nullable
/** Returns the current event time. */
public abstract Instant currentEventTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public Instant currentSynchronizedProcessingTime() {
}

@Override
@Nullable
public Instant currentEventTime() {
return timers.currentEventTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throw
}

public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
// shouldFire should be false.
// However it is too expensive to assert.
FinishedTriggersBitSet finishedSet =
readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context)
if (outputWM != null && elementHold.isBefore(outputWM)) {
which = "too late to effect output watermark";
tooLate = true;
} else if (inputWM != null && context.window().maxTimestamp().isBefore(inputWM)) {
} else if (context.window().maxTimestamp().isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
Expand Down Expand Up @@ -287,10 +287,11 @@ private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context) {
// by the end of window (ie the end of window is at or ahead of the input watermark).
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();

String which;
boolean tooLate;
Instant eowHold = context.window().maxTimestamp();
if (inputWM != null && eowHold.isBefore(inputWM)) {
if (eowHold.isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
Expand Down Expand Up @@ -329,11 +330,12 @@ private Instant addGarbageCollectionHold(ReduceFn<?, ?, ?, W>.Context context) {
Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();

WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
Preconditions.checkState(inputWM == null || !gcHold.isBefore(inputWM),
Preconditions.checkState(!gcHold.isBefore(inputWM),
"Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
context.state().access(EXTRA_HOLD_TAG).add(gcHold);
return gcHold;
Expand Down Expand Up @@ -368,6 +370,19 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
addEndOfWindowOrGarbageCollectionHolds(context);
}

/**
* Result of {@link #extractAndRelease}.
*/
public static class OldAndNewHolds {
public final Instant oldHold;
@Nullable public final Instant newHold;

public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
this.oldHold = oldHold;
this.newHold = newHold;
}
}

/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
Expand All @@ -377,46 +392,46 @@ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
*/
public ReadableState<Instant> extractAndRelease(
public ReadableState<OldAndNewHolds> extractAndRelease(
final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
WindowTracing.debug(
"extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<Instant>() {
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<Instant> readLater() {
public ReadableState<OldAndNewHolds> readLater() {
elementHoldState.readLater();
extraHoldState.readLater();
return this;
}

@Override
public Instant read() {
public OldAndNewHolds read() {
// Read both the element and extra holds.
Instant elementHold = elementHoldState.read();
Instant extraHold = extraHoldState.read();
Instant hold;
Instant oldHold;
// Find the minimum, accounting for null.
if (elementHold == null) {
hold = extraHold;
oldHold = extraHold;
} else if (extraHold == null) {
hold = elementHold;
oldHold = elementHold;
} else if (elementHold.isBefore(extraHold)) {
hold = elementHold;
oldHold = elementHold;
} else {
hold = extraHold;
oldHold = extraHold;
}
if (hold == null || hold.isAfter(context.window().maxTimestamp())) {
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in behind the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ "for key:{}; window:{}",
hold, context.key(), context.window());
hold = context.window().maxTimestamp();
oldHold, context.key(), context.window());
oldHold = context.window().maxTimestamp();
}
WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
context.key(), context.window());
Expand All @@ -425,13 +440,14 @@ public Instant read() {
elementHoldState.clear();
extraHoldState.clear();

@Nullable Instant newHold = null;
if (!isFinished) {
// Only need to leave behind an end-of-window or garbage collection hold
// if future elements will be processed.
addEndOfWindowOrGarbageCollectionHolds(context);
newHold = addEndOfWindowOrGarbageCollectionHolds(context);
}

return hold;
return new OldAndNewHolds(oldHold, newHold);
}
};
}
Expand All @@ -447,4 +463,12 @@ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(elementHoldTag).clear();
context.state().access(EXTRA_HOLD_TAG).clear();
}

/**
* Return the current data hold, or null if none. Does not clear. For debugging only.
*/
@Nullable
public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright (C) 2016 Google Inc.
*
Expand Down
Loading

0 comments on commit 339eb0e

Please sign in to comment.