Skip to content

Commit 14cbe41

Browse files
fix: ensure that first anchored punctuation happens at a combination of startTime + interval
1 parent fdfd8a3 commit 14cbe41

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,15 +1150,15 @@ public Cancellable schedule(final Instant startTime, final long interval, final
11501150
switch (type) {
11511151
case STREAM_TIME:
11521152
// align punctuation to 0L, punctuate as soon as we have data
1153-
return schedule(0L, interval, type, punctuator);
1153+
return schedule(0L, interval, type, punctuator, false);
11541154
case WALL_CLOCK_TIME:
11551155
// align punctuation to now, punctuate after interval has elapsed
1156-
return schedule(time.milliseconds() + interval, interval, type, punctuator);
1156+
return schedule(time.milliseconds() + interval, interval, type, punctuator, false);
11571157
default:
11581158
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
11591159
}
11601160
}
1161-
return schedule(startTime.toEpochMilli(), interval, type, punctuator);
1161+
return schedule(startTime.toEpochMilli(), interval, type, punctuator, true);
11621162
}
11631163

11641164
/**
@@ -1169,12 +1169,12 @@ public Cancellable schedule(final Instant startTime, final long interval, final
11691169
* @param type the punctuation type
11701170
* @throws IllegalStateException if the current node is not null
11711171
*/
1172-
private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator) {
1172+
private Cancellable schedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator, final boolean anchored) {
11731173
if (processorContext.currentNode() == null) {
11741174
throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
11751175
}
11761176

1177-
final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
1177+
final PunctuationSchedule schedule = getInitialSchedule(startTime, interval, type, punctuator, anchored);
11781178

11791179
switch (type) {
11801180
case STREAM_TIME:
@@ -1189,6 +1189,15 @@ private Cancellable schedule(final long startTime, final long interval, final Pu
11891189
}
11901190
}
11911191

1192+
// For anchored schedule, we want to have all punctuations to happen only on times based on combinations of startTime and interval
1193+
// This method ensures that the first anchored punctuation is not fired prematurely due to startTime < now
1194+
private PunctuationSchedule getInitialSchedule(final long startTime, final long interval, final PunctuationType type, final Punctuator punctuator, final boolean anchored) {
1195+
final PunctuationSchedule originalSchedule = new PunctuationSchedule(processorContext.currentNode(), startTime, interval, punctuator);
1196+
final long now = (type == PunctuationType.WALL_CLOCK_TIME) ? time.milliseconds() : streamTime();
1197+
1198+
return (anchored && startTime < now) ? originalSchedule.next(now) : originalSchedule;
1199+
}
1200+
11921201
/**
11931202
* Possibly trigger registered stream-time punctuation functions if
11941203
* current partition group timestamp has reached the defined stamp

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,10 +1760,8 @@ public void shouldPunctuateUsingAnchoredSystemStartTimeWithStartTimeBeforeNow()
17601760
task.initializeIfNeeded();
17611761
task.completeRestoration(noOpResetter -> { });
17621762

1763-
// now is after startTime -> initial punctuation
1764-
assertTrue(task.canPunctuateSystemTime());
1765-
assertTrue(task.maybePunctuateSystemTime());
1766-
1763+
// Expects the punctuations to happen at a time satisfying startTime + n * interval
1764+
// where n is a positive number or 0
17671765
time.sleep(9);
17681766
assertFalse(task.canPunctuateSystemTime());
17691767
assertFalse(task.maybePunctuateSystemTime());
@@ -1773,7 +1771,7 @@ public void shouldPunctuateUsingAnchoredSystemStartTimeWithStartTimeBeforeNow()
17731771
time.sleep(10);
17741772
assertTrue(task.canPunctuateSystemTime());
17751773
assertTrue(task.maybePunctuateSystemTime());
1776-
anchoredProcessorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, testStartTime, testStartTime + 10, testStartTime + 20);
1774+
anchoredProcessorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, testStartTime + 10, testStartTime + 20);
17771775
}
17781776

17791777
@Test

0 commit comments

Comments
 (0)