forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ReduceFnRunner.java
1084 lines (981 loc) · 47.2 KB
/
ReduceFnRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
* {@link PCollection} by key.
*
* <p>The {@link #onTrigger} relies on a {@link TriggerStateMachineRunner} to manage the execution
* of the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
*
* <ul>
* <li>Tracking the windows that are active (have buffered data) as elements arrive and triggers are
* fired.
* <li>Holding the watermark based on the timestamps of elements in a pane and releasing it when the
* trigger fires.
* <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
* firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions such as
* output.
* <li>Scheduling garbage collection of state associated with a specific window, and making that
* happen when the appropriate timer fires.
* </ul>
*
* @param <K> The type of key being processed.
* @param <InputT> The type of values associated with the key.
* @param <OutputT> The output type that will be produced for each key.
* @param <W> The type of windows this operates on.
*/
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
/**
* The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
*
* <ul>
* <li>It runs the trigger from the {@link WindowingStrategy}.</li>
* <li>It merges windows according to the {@link WindowingStrategy}.</li>
* <li>It chooses how to track active windows and clear out expired windows
* according to the {@link WindowingStrategy}, based on the allowed lateness and
* whether windows can merge.</li>
* <li>It decides whether to emit empty final panes according to whether the
* {@link WindowingStrategy} requires it.<li>
* <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
* </ul>
*/
private final WindowingStrategy<Object, W> windowingStrategy;
private final OutputWindowedValue<KV<K, OutputT>> outputter;
private final StateInternals stateInternals;
private final Counter droppedDueToClosedWindow;
public static final String DROPPED_DUE_TO_CLOSED_WINDOW = "droppedDueToClosedWindow";
private final K key;
/**
* Track which windows are still active and the 'state address' windows which hold their state.
*
* <ul>
* <li>State: Global map for all active windows for this computation and key.
* <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
* the active window set until its trigger is closed or the window is garbage collected.
* </ul>
*/
private final ActiveWindowSet<W> activeWindows;
/**
* Always a {@link SystemReduceFn}.
*
* <ul>
* <li>State: A bag of accumulated values, or the intermediate result of a combiner.
* <li>State style: RENAMED
* <li>Merging: Concatenate or otherwise combine the state from each merged window.
* <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
* when trigger is finished or when the window is garbage collected.
* </ul>
*/
private final ReduceFn<K, InputT, OutputT, W> reduceFn;
/**
* Manage the setting and firing of timer events.
*
* <ul>
* <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
* merged away. Timers created by triggers are never garbage collected and are left to
* fire and be ignored.
* <li>Lifetime: Timers automatically disappear after they fire.
* </ul>
*/
private final TimerInternals timerInternals;
/**
* Manage the execution and state for triggers.
*
* <ul>
* <li>State: Tracks which sub-triggers have finished, and any additional state needed to
* determine when the trigger should fire.
* <li>State style: DIRECT
* <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
* needed.
* <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
* the finished bits are left behind and must be cleared when the window is
* garbage collected.
* </ul>
*/
private final TriggerStateMachineRunner<W> triggerRunner;
/**
* Store the output watermark holds for each window.
*
* <ul>
* <li>State: Bag of hold timestamps.
* <li>State style: RENAMED
* <li>Merging: Depending on {@link TimestampCombiner}, may need to be recalculated on merging.
* When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
* hold.
* <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
* </ul>
*/
private final WatermarkHold<W> watermarkHold;
private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
/**
* Store the previously emitted pane (if any) for each window.
*
* <ul>
* <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
* method, if any.
* <li>Style style: DIRECT
* <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
* Cleared when window is merged away.
* <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
* </ul>
*/
private final PaneInfoTracker paneInfoTracker;
/**
* Store whether we've seen any elements for a window since the last pane was emitted.
*
* <ul>
* <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
* <li>State style: RENAMED.
* <li>Merging: Counts are summed when windows are merged.
* <li>Lifetime: Cleared when pane fires or window is garbage collected.
* </ul>
*/
private final NonEmptyPanes<K, W> nonEmptyPanes;
public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
ExecutableTriggerStateMachine triggerStateMachine,
StateInternals stateInternals,
TimerInternals timerInternals,
OutputWindowedValue<KV<K, OutputT>> outputter,
SideInputReader sideInputReader,
ReduceFn<K, InputT, OutputT, W> reduceFn,
PipelineOptions options) {
this.key = key;
this.timerInternals = timerInternals;
this.paneInfoTracker = new PaneInfoTracker(timerInternals);
this.stateInternals = stateInternals;
this.outputter = outputter;
this.reduceFn = reduceFn;
this.droppedDueToClosedWindow = Metrics.counter(ReduceFnRunner.class,
DROPPED_DUE_TO_CLOSED_WINDOW);
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> objectWindowingStrategy =
(WindowingStrategy<Object, W>) windowingStrategy;
this.windowingStrategy = objectWindowingStrategy;
this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
// Note this may incur I/O to load persisted window set data.
this.activeWindows = createActiveWindowSet();
this.contextFactory =
new ReduceFnContextFactory<>(key, reduceFn, this.windowingStrategy,
stateInternals, this.activeWindows, timerInternals, sideInputReader, options);
this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
this.triggerRunner =
new TriggerStateMachineRunner<>(
triggerStateMachine,
new TriggerStateMachineContextFactory<>(
windowingStrategy.getWindowFn(), stateInternals, activeWindows));
}
private ActiveWindowSet<W> createActiveWindowSet() {
return windowingStrategy.getWindowFn().isNonMerging()
? new NonMergingActiveWindowSet<W>()
: new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
}
@VisibleForTesting
boolean isFinished(W window) {
return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
}
@VisibleForTesting
boolean hasNoActiveWindows() {
return activeWindows.getActiveAndNewWindows().isEmpty();
}
private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
window, StateStyle.DIRECT);
if (!triggerRunner.isClosed(directContext.state())) {
result.add(window);
}
}
return result;
}
private Collection<W> windowsThatShouldFire(Set<W> windows) throws Exception {
Collection<W> result = new ArrayList<>();
// Filter out timers that didn't trigger.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
if (triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
result.add(window);
}
}
return result;
}
/**
* Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
* triggers, and window merging.
*
* <p>The general strategy is:
* <ol>
* <li>Use {@link WindowedValue#getWindows} (itself determined using
* {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
* of those windows will already have state associated with them. The rest are considered
* NEW.
* <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
* Each NEW window will become either ACTIVE or be discardedL.
* (See {@link ActiveWindowSet} for definitions of these terms.)
* <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
* windows before any state is associated with the NEW window. In the common case that
* windows for new elements are merged into existing ACTIVE windows then no additional
* storage or merging overhead will be incurred.
* <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
* states can be merged on-demand when a pane fires.
* <li>Process the element for each of the windows it's windows have been merged into according
* to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
* setting holds, and invoking {@link ReduceFn#onTrigger}.
* </ol>
*/
public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
if (!values.iterator().hasNext()) {
return;
}
// Determine all the windows for elements.
Set<W> windows = collectWindows(values);
// If an incoming element introduces a new window, attempt to merge it into an existing
// window eagerly.
Map<W, W> windowToMergeResult = mergeWindows(windows);
if (!windowToMergeResult.isEmpty()) {
// Update windows by removing all windows that were merged away and adding
// the windows they were merged to. We add after completing all the
// removals to avoid removing a window that was also added.
List<W> addedWindows = new ArrayList<>(windowToMergeResult.size());
for (Map.Entry<W, W> entry : windowToMergeResult.entrySet()) {
windows.remove(entry.getKey());
addedWindows.add(entry.getValue());
}
windows.addAll(addedWindows);
}
prefetchWindowsForValues(windows);
// All windows that are open before element processing may need to fire.
Set<W> windowsToConsider = windowsThatAreOpen(windows);
// Process each element, using the updated activeWindows determined by mergeWindows.
for (WindowedValue<InputT> value : values) {
processElement(windowToMergeResult, value);
}
// Now that we've processed the elements, see if any of the windows need to fire.
// Prefetch state necessary to determine if the triggers should fire.
for (W mergedWindow : windowsToConsider) {
triggerRunner.prefetchShouldFire(
mergedWindow, contextFactory.base(mergedWindow, StateStyle.DIRECT).state());
}
// Filter to windows that are firing.
Collection<W> windowsToFire = windowsThatShouldFire(windowsToConsider);
// Prefetch windows that are firing.
for (W window : windowsToFire) {
prefetchEmit(contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED));
}
// Trigger output from firing windows.
for (W window : windowsToFire) {
emit(contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED));
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
// Any windows which are still NEW must have come in on a new element which was then discarded
// due to the window's trigger being closed. We can thus delete them.
activeWindows.cleanupTemporaryWindows();
}
public void persist() {
activeWindows.persist();
}
/**
* Extract the windows associated with the values.
*/
private Set<W> collectWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
Set<W> windows = new HashSet<>();
for (WindowedValue<?> value : values) {
for (BoundedWindow untypedWindow : value.getWindows()) {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
windows.add(window);
}
}
return windows;
}
/**
* Invoke merge for the given windows and return a map from windows to the
* merge result window. Windows that were not merged are not present in the
* map.
*/
private Map<W, W> mergeWindows(Set<W> windows) throws Exception {
if (windowingStrategy.getWindowFn().isNonMerging()) {
// Return an empty map, indicating that every window is not merged.
return Collections.emptyMap();
}
Map<W, W> windowToMergeResult = new HashMap<>();
// Collect the windows from all elements (except those which are too late) and
// make sure they are already in the active window set or are added as NEW windows.
for (W window : windows) {
// For backwards compat with pre 1.4 only.
// We may still have ACTIVE windows with multiple state addresses, representing
// a window who's state has not yet been eagerly merged.
// We'll go ahead and merge that state now so that we don't have to worry about
// this legacy case anywhere else.
if (activeWindows.isActive(window)) {
Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
if (stateAddressWindows.size() > 1) {
// This is a legacy window who's state has not been eagerly merged.
// Do that now.
ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
contextFactory.forPremerge(window);
reduceFn.onMerge(premergeContext);
watermarkHold.onMerge(premergeContext);
activeWindows.merged(window);
}
}
// Add this window as NEW if it is not currently ACTIVE.
// If we had already seen this window and closed its trigger, then the
// window will not be currently ACTIVE. It will then be added as NEW here,
// and fall into the merging logic as usual.
activeWindows.ensureWindowExists(window);
}
// Merge all of the active windows and retain a mapping from source windows to result windows.
activeWindows.merge(new OnMergeCallback(windowToMergeResult));
return windowToMergeResult;
}
private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
private final Map<W, W> windowToMergeResult;
OnMergeCallback(Map<W, W> windowToMergeResult) {
this.windowToMergeResult = windowToMergeResult;
}
/**
* Return the subset of {@code windows} which are currently ACTIVE. We only need to worry
* about merging state from ACTIVE windows. NEW windows by definition have no existing state.
*/
private List<W> activeWindows(Iterable<W> windows) {
List<W> active = new ArrayList<>();
for (W window : windows) {
if (activeWindows.isActive(window)) {
active.add(window);
}
}
return active;
}
/**
* Called from the active window set to indicate {@code toBeMerged} (of which only
* {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
* be merged into {@code mergeResult}.
*/
@Override
public void prefetchOnMerge(
Collection<W> toBeMerged, W mergeResult) throws Exception {
List<W> activeToBeMerged = activeWindows(toBeMerged);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
// Prefetch various state.
triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
reduceFn.prefetchOnMerge(renamedMergeContext.state());
watermarkHold.prefetchOnMerge(renamedMergeContext.state());
nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
}
/**
* Called from the active window set to indicate {@code toBeMerged} (of which only
* {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
* to be merged into {@code mergeResult}.
*/
@Override
public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
// Remember we have merged these windows.
for (W window : toBeMerged) {
windowToMergeResult.put(window, mergeResult);
}
// At this point activeWindows has NOT incorporated the results of the merge.
List<W> activeToBeMerged = activeWindows(toBeMerged);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
// Run the reduceFn to perform any needed merging.
reduceFn.onMerge(renamedMergeContext);
// Merge the watermark holds.
watermarkHold.onMerge(renamedMergeContext);
// Merge non-empty pane state.
nonEmptyPanes.onMerge(renamedMergeContext.state());
// Have the trigger merge state as needed.
triggerRunner.onMerge(
directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
for (W active : activeToBeMerged) {
if (active.equals(mergeResult)) {
// Not merged away.
continue;
}
// Cleanup flavor A: Currently ACTIVE window is about to be merged away.
// Clear any state not already cleared by the onMerge calls above.
WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
contextFactory.base(active, StateStyle.DIRECT);
// No need for the end-of-window or garbage collection timers.
// We will establish a new end-of-window or garbage collection timer for the mergeResult
// window in processElement below. There must be at least one element for the mergeResult
// window since a new element with a new window must have triggered this onMerge.
cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
// We no longer care about any previous panes of merged away windows. The
// merge result window gets to start fresh if it is new.
paneInfoTracker.clear(directClearContext.state());
}
}
}
/**
* Redirect element windows to the ACTIVE windows they have been merged into.
* The compressed representation (value, {window1, window2, ...}) actually represents
* distinct elements (value, window1), (value, window2), ...
* so if window1 and window2 merge, the resulting window will contain both copies
* of the value.
*/
private ImmutableSet<W> toMergedWindows(final Map<W, W> windowToMergeResult,
final Collection<? extends BoundedWindow> windows) {
return ImmutableSet.copyOf(
FluentIterable.from(windows).transform(
new Function<BoundedWindow, W>() {
@Override
public W apply(BoundedWindow untypedWindow) {
@SuppressWarnings("unchecked")
W window = (W) untypedWindow;
W mergedWindow = windowToMergeResult.get(window);
// If the element is not present in the map, the window is unmerged.
return (mergedWindow == null) ? window : mergedWindow;
}
}
));
}
private void prefetchWindowsForValues(Collection<W> windows) {
// Prefetch in each of the windows if we're going to need to process triggers
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(
window, StateStyle.DIRECT);
triggerRunner.prefetchForValue(window, directContext.state());
}
}
/**
* Process an element.
*
* @param windowToMergeResult map of windows to merged windows. If a window is
* not present it is unmerged.
* @param value the value being processed
*/
private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
throws Exception {
ImmutableSet<W> windows = toMergedWindows(windowToMergeResult, value.getWindows());
// Process the element for each (mergeResultWindow, not closed) window it belongs to.
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.inc();
WindowTracing.debug(
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
continue;
}
activeWindows.ensureWindowIsActive(window);
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
nonEmptyPanes.recordContent(renamedContext.state());
// Make sure we've scheduled the end-of-window or garbage collection timer for this window.
Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
// Hold back progress of the output watermark until we have processed the pane this
// element will be included within. If the element is too late for that, place a hold at
// the end-of-window or garbage collection time to allow empty panes to contribute elements
// which won't be dropped due to lateness by a following computation (assuming the following
// computation uses the same allowed lateness value...)
@Nullable Instant hold = watermarkHold.addHolds(renamedContext);
if (hold != null) {
// Assert that holds have a proximate timer.
boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
checkState(
holdInWindow == timerInWindow,
"set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
hold,
timer,
directContext.window());
}
// Execute the reduceFn, which will buffer the value as appropriate
reduceFn.processValue(renamedContext);
// Run the trigger to update its state
triggerRunner.processValue(
directContext.window(),
directContext.timestamp(),
directContext.timers(),
directContext.state());
// At this point, if triggerRunner.shouldFire before the processValue then
// triggerRunner.shouldFire after the processValue. In other words adding values
// cannot take a trigger state from firing to non-firing.
// (We don't actually assert this since it is too slow.)
}
}
/**
* Enriches TimerData with state necessary for processing a timer as well as
* common queries about a timer.
*/
private class EnrichedTimerData {
public final Instant timestamp;
public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
// If this is an end-of-window timer then we may need to set a garbage collection timer
// if allowed lateness is non-zero.
public final boolean isEndOfWindow;
// If this is a garbage collection timer then we should trigger and
// garbage collect the window. We'll consider any timer at or after the
// end-of-window time to be a signal to garbage collect.
public final boolean isGarbageCollection;
EnrichedTimerData(
TimerData timer,
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
this.timestamp = timer.getTimestamp();
this.directContext = directContext;
this.renamedContext = renamedContext;
W window = directContext.window();
this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
&& timer.getTimestamp().equals(window.maxTimestamp());
Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
}
// Has this window had its trigger finish?
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must take conjunction of activeWindows and triggerRunner state.
public boolean windowIsActiveAndOpen() {
return activeWindows.isActive(directContext.window())
&& !triggerRunner.isClosed(directContext.state());
}
}
public void onTimers(Iterable<TimerData> timers) throws Exception {
if (!timers.iterator().hasNext()) {
return;
}
// Create a reusable context for each timer and begin prefetching necessary
// state.
List<EnrichedTimerData> enrichedTimers = new LinkedList();
for (TimerData timer : timers) {
checkArgument(timer.getNamespace() instanceof WindowNamespace,
"Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(window, StateStyle.RENAMED);
EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
enrichedTimers.add(enrichedTimer);
// Perform prefetching of state to determine if the trigger should fire.
if (enrichedTimer.isGarbageCollection) {
triggerRunner.prefetchIsClosed(directContext.state());
} else {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
}
}
// For those windows that are active and open, prefetch the triggering or emitting state.
for (EnrichedTimerData timer : enrichedTimers) {
if (timer.windowIsActiveAndOpen()) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
if (timer.isGarbageCollection) {
prefetchOnTrigger(directContext, timer.renamedContext);
} else if (triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
prefetchEmit(directContext, timer.renamedContext);
}
}
}
// Perform processing now that everything is prefetched.
for (EnrichedTimerData timer : enrichedTimers) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
if (timer.isGarbageCollection) {
WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
key, directContext.window(), timer.timestamp,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
if (windowIsActiveAndOpen) {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
@Nullable
Instant newHold = onTrigger(
directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
}
// Cleanup flavor B: Clear all the remaining state for this window since we'll never
// see elements for it again.
clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
} else {
WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
key, directContext.window(), timer.timestamp,
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
if (timer.windowIsActiveAndOpen()
&& triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
emit(directContext, renamedContext);
}
if (timer.isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emit.
// We could assert this but it is very expensive.
// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
// cleanup event and handled by the above).
// Note we must do this even if the trigger is finished so that we are sure to cleanup
// any final trigger finished bits.
checkState(
windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
"Unexpected zero getAllowedLateness");
Instant cleanupTime =
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy);
WindowTracing.debug(
"ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Cleanup time %s is beyond end-of-time", cleanupTime);
directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
}
}
}
}
/**
* Clear all the state associated with {@code context}'s window.
* Should only be invoked if we know all future elements for this window will be considered
* beyond allowed lateness.
* This is a superset of the clearing done by {@link #emit} below since:
* <ol>
* <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
* closed again.
* <li>We can clear any remaining garbage collection hold.
* </ol>
*/
private void clearAllState(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean windowIsActiveAndOpen)
throws Exception {
if (windowIsActiveAndOpen) {
// Since both the window is in the active window set AND the trigger was not yet closed,
// it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
// These calls work irrespective of whether the window is active or not, but
// are unnecessary if the window is not active.
triggerRunner.clearState(
directContext.window(), directContext.timers(), directContext.state());
paneInfoTracker.clear(directContext.state());
} else {
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
// For (1), if !activeWindows.isActive then the window must be merging and has been
// explicitly removed by emit. But in that case the trigger must have fired
// and been closed, so this case reduces to (2).
// For (2), if triggerRunner.isClosed then the trigger was fired and entered the
// closed state. In that case emit will have cleared all state in
// reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
// We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
// Since the trigger fired the existing watermark holds must have been cleared, and since
// the trigger closed no new end of window or garbage collection hold will have been
// placed by WatermarkHold.extractAndRelease.
// Thus all the state clearing above is unnecessary.
//
// But(!) for backwards compatibility we must allow a pipeline to be updated from
// an sdk version <= 1.3. In that case it is possible we have an end-of-window or
// garbage collection hold keyed by the current window (reached via directContext) rather
// than the state address window (reached via renamedContext).
// However this can only happen if:
// - We have merging windows.
// - We are DISCARDING_FIRED_PANES.
// - A pane has fired.
// - But the trigger is not (yet) closed.
if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
&& !windowingStrategy.getWindowFn().isNonMerging()) {
watermarkHold.clearHolds(directContext);
}
}
// Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
// We'll never need to test for the trigger being closed again.
triggerRunner.clearFinished(directContext.state());
}
/** Should the reduce function state be cleared? */
private boolean shouldDiscardAfterFiring(boolean isFinished) {
if (isFinished) {
// This is the last firing for trigger.
return true;
}
if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
// Nothing should be accumulated between panes.
return true;
}
return false;
}
private void prefetchEmit(ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
triggerRunner.prefetchIsClosed(directContext.state());
prefetchOnTrigger(directContext, renamedContext);
}
/**
* Emit if a trigger is ready to fire or timers require it, and cleanup state.
*/
private void emit(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
checkState(triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state()));
// Inform the trigger of the transition to see if it is finished
triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
boolean isFinished = triggerRunner.isClosed(directContext.state());
// Will be able to clear all element state after triggering?
boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
// Cleanup buffered data if appropriate
if (shouldDiscard) {
// Cleanup flavor C: The user does not want any buffered data to persist between panes.
reduceFn.clearState(renamedContext);
}
if (isFinished) {
// Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
// Clear state not otherwise cleared by onTrigger and clearPane above.
// Remember the trigger is, indeed, closed until the window is garbage collected.
triggerRunner.clearState(
directContext.window(), directContext.timers(), directContext.state());
paneInfoTracker.clear(directContext.state());
activeWindows.remove(directContext.window());
}
}
/**
* Do we need to emit?
*/
private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
// The pane has elements.
return true;
}
if (timing == Timing.ON_TIME) {
// This is the unique ON_TIME pane.
return true;
}
if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
// This is known to be the final pane, and the user has requested it even when empty.
return true;
}
return false;
}
private void prefetchOnTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
paneInfoTracker.prefetchPaneInfo(directContext);
watermarkHold.prefetchExtract(renamedContext);
nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
reduceFn.prefetchOnTrigger(directContext.state());
}
/**
* Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
*
* @return output watermark hold added, or {@literal null} if none.
*/
@Nullable
private Instant onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
boolean isFinished, boolean isEndOfWindow)
throws Exception {
Instant inputWM = timerInternals.currentInputWatermarkTime();
// Calculate the pane info.
final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
// TODO: This isn't accurate if the elements are late. See BEAM-2262
final Instant outputTimestamp = pair.oldHold;
@Nullable Instant newHold = pair.newHold;
final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
if (newHold != null) {
// We can't be finished yet.
checkState(
!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
// The hold cannot be behind the input watermark.
checkState(
!newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
if (newHold.isAfter(directContext.window().maxTimestamp())) {
// The hold must be for garbage collection, which can't have happened yet.
checkState(
newHold.isEqual(
LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy)),
"new hold %s should be at garbage collection for window %s plus %s",
newHold,
directContext.window(),
windowingStrategy.getAllowedLateness());
} else {
// The hold must be for the end-of-window, which can't have happened yet.
checkState(
newHold.isEqual(directContext.window().maxTimestamp()),
"new hold %s should be at end of window %s",
newHold,
directContext.window());
checkState(
!isEndOfWindow,
"new hold at %s for %s but this is the watermark trigger",
newHold,
directContext.window());
}
}
// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.