forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
WatermarkHold.java
546 lines (512 loc) · 24.6 KB
/
WatermarkHold.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* Implements the logic to hold the output watermark for a computation back
* until it has seen all the elements it needs based on the input watermark for the
* computation.
*
* <p>The backend ensures the output watermark can never progress beyond the
* input watermark for a computation. GroupAlsoByWindows computations may add a 'hold'
* to the output watermark in order to prevent it progressing beyond a time within a window.
* The hold will be 'cleared' when the associated pane is emitted.
*
* <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and
* will likely break any other uses.
*
* @param <W> The kind of {@link BoundedWindow} the hold is for.
*/
class WatermarkHold<W extends BoundedWindow> implements Serializable {
/**
* Return tag for state containing the output watermark hold
* used for elements.
*/
public static <W extends BoundedWindow>
StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(
TimestampCombiner timestampCombiner) {
return StateTags.<WatermarkHoldState>makeSystemTagInternal(
StateTags.<W>watermarkStateInternal("hold", timestampCombiner));
}
/**
* Tag for state containing end-of-window and garbage collection output watermark holds.
* (We can't piggy-back on the data hold state since the timestampCombiner may be
* {@link TimestampCombiner#EARLIEST}, in which case every pane will
* would take the end-of-window time as its element time.)
*/
@VisibleForTesting
public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG =
StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal(
"extra", TimestampCombiner.EARLIEST));
private final TimerInternals timerInternals;
private final WindowingStrategy<?, W> windowingStrategy;
private final StateTag<WatermarkHoldState> elementHoldTag;
public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
this.timerInternals = timerInternals;
this.windowingStrategy = windowingStrategy;
this.elementHoldTag =
watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
}
/**
* Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp
* of the element in {@code context}. We allow the actual hold time to be shifted later by the
* {@link TimestampCombiner}, but no further than the end of the window. The hold will
* remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold
* was placed, or {@literal null} if no hold was placed.
*
* <p>In the following we'll write {@code E} to represent an element's timestamp after passing
* through the window strategy's output time function, {@code IWM} for the local input watermark,
* {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection
* watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right,
* and we write {@code [ ... ]} to denote a bounded window with implied lower bound.
*
* <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness}
* is {@code ZERO}.
*
* <p>Here are the cases we need to handle. They are conceptually considered in the
* sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM.
* <ol>
* <li>(Normal)
* <pre>
* |
* [ | E ]
* |
* IWM
* </pre>
* This is, hopefully, the common and happy case. The element is locally on-time and can
* definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer
* for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's
* timestamp (depending on the output time function). Thus the OWM will not proceed past E
* until the next pane fires.
*
* <li>(Discard - no target window)
* <pre>
* | |
* [ E ] | |
* | |
* GCWM <-getAllowedLateness-> IWM
* </pre>
* The element is very locally late. The window has been garbage collected, thus there
* is no target pane E could be assigned to. We discard E.
*
* <li>(Unobservably late)
* <pre>
* | |
* [ | E | ]
* | |
* OWM IWM
* </pre>
* The element is locally late, however we can still treat this case as for 'Normal' above
* since the IWM has not yet passed the end of the window and the element is ahead of the
* OWM. In effect, we get to 'launder' the locally late element and consider it as locally
* on-time because no downstream computation can observe the difference.
*
* <li>(Maybe late 1)
* <pre>
* | |
* [ | E ] |
* | |
* OWM IWM
* </pre>
* The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME}
* pane may have already been emitted. However, if timer firings have been delayed then it
* is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element
* hold since we can't be sure if it will be cleared promptly. Thus this element *may* find
* its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's
* timestamp. We may however set a garbage collection hold if required.
*
* <li>(Maybe late 2)
* <pre>
* | |
* [ E | | ]
* | |
* OWM IWM
* </pre>
* The end-of-window timer has not yet fired, so this element may still appear in an
* {@code ON_TIME} pane. However the element is too late to contribute to the output
* watermark hold, and thus won't contribute to the pane's timestamp. We can still place an
* end-of-window hold.
*
* <li>(Maybe late 3)
* <pre>
* | |
* [ E | ] |
* | |
* OWM IWM
* </pre>
* As for the (Maybe late 2) case, however we don't even know if the end-of-window timer
* has already fired, or it is about to fire. We can place only the garbage collection hold,
* if required.
*
* <li>(Definitely late)
* <pre>
* | |
* [ E ] | |
* | |
* OWM IWM
* </pre>
* The element is definitely too late to make an {@code ON_TIME} pane. We are too late to
* place an end-of-window hold. We can still place a garbage collection hold if required.
*
* </ol>
*/
@Nullable
public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
Instant hold = addElementHold(context);
if (hold == null) {
hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/);
}
return hold;
}
/**
* Return {@code timestamp}, possibly shifted forward in time according to the window
* strategy's output time function.
*/
private Instant shift(Instant timestamp, W window) {
Instant shifted =
windowingStrategy
.getTimestampCombiner()
.assign(window, windowingStrategy.getWindowFn().getOutputTime(timestamp, window));
checkState(!shifted.isBefore(timestamp),
"TimestampCombiner moved element from %s to earlier time %s for window %s",
BoundedWindow.formatTimestamp(timestamp),
BoundedWindow.formatTimestamp(shifted),
window);
checkState(timestamp.isAfter(window.maxTimestamp())
|| !shifted.isAfter(window.maxTimestamp()),
"TimestampCombiner moved element from %s to %s which is beyond end of "
+ "window %s",
timestamp, shifted, window);
return shifted;
}
/**
* Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was
* added (ie the element timestamp plus any forward shift requested by the
* {@link WindowingStrategy#getTimestampCombiner}), or {@literal null} if no hold was added.
* The hold is only added if both:
* <ol>
* <li>The backend will be able to respect it. In other words the output watermark cannot
* be ahead of the proposed hold time.
* <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the
* window. In other words the input watermark cannot be ahead of the end of the window.
* </ol>
* The hold ensures the pane which incorporates the element is will not be considered late by
* any downstream computation when it is eventually emitted.
*/
@Nullable
private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) {
// Give the window function a chance to move the hold timestamp forward to encourage progress.
// (A later hold implies less impediment to the output watermark making progress, which in
// turn encourages end-of-window triggers to fire earlier in following computations.)
Instant elementHold = shift(context.timestamp(), context.window());
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
String which;
boolean tooLate;
// TODO: These case labels could be tightened.
// See the case analysis in addHolds above for the motivation.
if (outputWM != null && elementHold.isBefore(outputWM)) {
which = "too late to effect output watermark";
tooLate = true;
} else if (context.window().maxTimestamp().isBefore(inputWM)) {
which = "too late for end-of-window timer";
tooLate = true;
} else {
which = "on time";
tooLate = false;
checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Element hold %s is beyond end-of-time", elementHold);
context.state().access(elementHoldTag).add(elementHold);
}
WindowTracing.trace(
"WatermarkHold.addHolds: element hold at {} is {} for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
elementHold, which, context.key(), context.window(), inputWM,
outputWM);
return tooLate ? null : elementHold;
}
/**
* Add an end-of-window hold or, if too late for that, a garbage collection hold (if required).
* Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added.
*/
@Nullable
private Instant addEndOfWindowOrGarbageCollectionHolds(
ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant hold = addEndOfWindowHold(context, paneIsEmpty);
if (hold == null) {
hold = addGarbageCollectionHold(context, paneIsEmpty);
}
return hold;
}
/**
* Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added
* (ie the end of window time), or {@literal null} if no end of window hold is possible and we
* should fallback to a garbage collection hold.
*
* <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner})
* to clear it. In other words, the input watermark cannot be ahead of the end of window time.
*
* <p>An end-of-window hold is added in two situations:
* <ol>
* <li>An incoming element came in behind the output watermark (so we are too late for placing
* the usual element hold), but it may still be possible to include the element in an
* {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will
* not be considered late by any downstream computation.
* <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at
* least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in
* a pane are processed due to a fired trigger we must set both an end of window timer and an end
* of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered
* late by any downstream computation.
* </ol>
*/
@Nullable
private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant eowHold = context.window().maxTimestamp();
if (eowHold.isBefore(inputWM)) {
WindowTracing.trace(
"WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for "
+ "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
eowHold, context.key(), context.window(), inputWM, outputWM);
return null;
}
checkState(outputWM == null || !eowHold.isBefore(outputWM),
"End-of-window hold %s cannot be before output watermark %s",
eowHold, outputWM);
checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"End-of-window hold %s is beyond end-of-time", eowHold);
// If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep
// the hold away from the combining function in elementHoldTag.
// However if !paneIsEmpty then it could make sense to use the elementHoldTag here.
// Alas, onMerge is forced to add an end of window or garbage collection hold without
// knowing whether an element hold is already in place (stopping to check is too expensive).
// This it would end up adding an element hold at the end of the window which could
// upset the elementHoldTag combining function.
context.state().access(EXTRA_HOLD_TAG).add(eowHold);
WindowTracing.trace(
"WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
eowHold, context.key(), context.window(), inputWM, outputWM);
return eowHold;
}
/**
* Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at
* which the hold was added (ie the end of window time plus allowed lateness),
* or {@literal null} if no hold was added.
*
* <p>We only add the hold if it is distinct from what would be added by
* {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness}
* must be non-zero.
*
* <p>A garbage collection hold is added in two situations:
* <ol>
* <li>An incoming element came in behind the output watermark, and was too late for placing
* the usual element hold or an end of window hold. Place the garbage collection hold so that
* we can guarantee when the pane is finally triggered its output will not be dropped due to
* excessive lateness by any downstream computation.
* <li>The {@link WindowingStrategy#getClosingBehavior()} is
* {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted
* for all windows which saw at least one element. Again, the garbage collection hold guarantees
* that any empty final pane can be given a timestamp which will not be considered beyond
* allowed lateness by any downstream computation.
* </ol>
*
* <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2.
*/
@Nullable
private Instant addGarbageCollectionHold(
ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
Instant outputWM = timerInternals.currentOutputWatermarkTime();
Instant inputWM = timerInternals.currentInputWatermarkTime();
Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy);
if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+ "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
return null;
}
if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
== ClosingBehavior.FIRE_IF_NON_EMPTY) {
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary "
+ "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
return null;
}
if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// If the garbage collection hold is past the timestamp we can represent, instead truncate
// to the maximum timestamp that is not positive infinity. This ensures all windows will
// eventually be garbage collected.
gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
}
checkState(!gcHold.isBefore(inputWM),
"Garbage collection hold %s cannot be before input watermark %s",
gcHold, inputWM);
checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Garbage collection hold %s is beyond end-of-time", gcHold);
// Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above.
context.state().access(EXTRA_HOLD_TAG).add(gcHold);
WindowTracing.trace(
"WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for "
+ "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
gcHold, context.key(), context.window(), inputWM, outputWM);
return gcHold;
}
/**
* Prefetch watermark holds in preparation for merging.
*/
public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
StateMerging.prefetchWatermarks(state, elementHoldTag);
}
/**
* Updates the watermark hold when windows merge if it is possible the merged value does
* not equal all of the existing holds. For example, if the new window implies a later
* watermark hold, then earlier holds may be released.
*/
public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window());
// If we had a cheap way to determine if we have an element hold then we could
// avoid adding an unnecessary end-of-window or garbage collection hold.
// Simply reading the above merged watermark would impose an additional read for the
// common case that the active window has just one underlying state address window and
// the hold depends on the min of the element timestamps.
// At least one merged window must be non-empty for the merge to have been triggered.
StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/);
}
/**
* Result of {@link #extractAndRelease}.
*/
public static class OldAndNewHolds {
public final Instant oldHold;
@Nullable
public final Instant newHold;
public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
this.oldHold = oldHold;
this.newHold = newHold;
}
}
public void prefetchExtract(final ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(elementHoldTag).readLater();
context.state().access(EXTRA_HOLD_TAG).readLater();
}
/**
* Return (a future for) the earliest hold for {@code context}. Clear all the holds after
* reading, but add/restore an end-of-window or garbage collection hold if required.
*
* <p>The returned timestamp is the output timestamp according to the {@link TimestampCombiner}
* from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late
* elements in the current pane. If there is no such value the timestamp is the end
* of the window.
*/
public ReadableState<OldAndNewHolds> extractAndRelease(
final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
WindowTracing.debug(
"WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; "
+ "outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag);
final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
return new ReadableState<OldAndNewHolds>() {
@Override
public ReadableState<OldAndNewHolds> readLater() {
elementHoldState.readLater();
extraHoldState.readLater();
return this;
}
@Override
public OldAndNewHolds read() {
// Read both the element and extra holds.
Instant elementHold = elementHoldState.read();
Instant extraHold = extraHoldState.read();
Instant oldHold;
// Find the minimum, accounting for null.
if (elementHold == null) {
oldHold = extraHold;
} else if (extraHold == null) {
oldHold = elementHold;
} else if (elementHold.isBefore(extraHold)) {
oldHold = elementHold;
} else {
oldHold = extraHold;
}
if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) {
// If no hold (eg because all elements came in behind the output watermark), or
// the hold was for garbage collection, take the end of window as the result.
WindowTracing.debug(
"WatermarkHold.extractAndRelease.read: clipping from {} to end of window "
+ "for key:{}; window:{}",
oldHold, context.key(), context.window());
oldHold = context.window().maxTimestamp();
}
WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}",
context.key(), context.window());
// Clear the underlying state to allow the output watermark to progress.
elementHoldState.clear();
extraHoldState.clear();
@Nullable Instant newHold = null;
if (!isFinished) {
// Only need to leave behind an end-of-window or garbage collection hold
// if future elements will be processed.
newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/);
}
return new OldAndNewHolds(oldHold, newHold);
}
};
}
/**
* Clear any remaining holds.
*/
public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
WindowTracing.debug(
"WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(), context.window(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
context.state().access(elementHoldTag).clear();
context.state().access(EXTRA_HOLD_TAG).clear();
}
/**
* Return the current data hold, or null if none. Does not clear. For debugging only.
*/
@Nullable
public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
}
}