diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java index 325491699209e..1e51bca6e0780 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java @@ -20,6 +20,7 @@ package org.apache.flink.api.connector.sink; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.eventtime.Watermark; import java.io.IOException; import java.util.List; @@ -46,6 +47,23 @@ public interface SinkWriter extends AutoCloseable { */ void write(InputT element, Context context) throws IOException; + /** + * Add a watermark to the writer. + * + * @param watermark The watermark. + * @throws IOException if fail to add a watermark. + */ + default void writeWatermark(Watermark watermark) throws IOException {} + + /** + * Marks the writer as idle. This function is called when all sink inputs are idle. + * + *

A writer becomes active again as soon as a record or watermark is received. + * + * @throws Exception if fail to mark idle. + */ + default void markIdle() throws Exception {} + /** * Prepare for a commit. * diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java index 0f78c0f4c83b0..b51570805ff6d 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java @@ -89,6 +89,11 @@ public void processWatermark(Watermark mark) throws Exception { operator.processWatermark(mark); } + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + operator.processStreamStatus(status); + } + @Override public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { operator.processLatencyMarker(latencyMarker); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java index 11786540aad82..6c22c4178e967 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java @@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -145,5 +147,10 @@ public Object getCurrentKey() { ACTUAL_ORDER_TRACKING.add("getCurrentKey"); return null; } + + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + ACTUAL_ORDER_TRACKING.add("processStreamStatus"); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java index 1a8dced629725..e78812af9b34e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.functions.Function; import java.io.Serializable; @@ -49,6 +50,25 @@ default void invoke(IN value, Context context) throws Exception { invoke(value); } + /** + * Writes the given watermark to the sink. This function is called for every watermark. + * + * @param watermark The watermark. + * @throws Exception This method may throw exceptions. Throwing an exception will cause the + * operation to fail and may trigger recovery. + */ + default void writeWatermark(Watermark watermark) throws Exception {} + + /** + * Marks the sink as idle. This function is called when all sink inputs are idle. + * + *

A sink becomes active again as soon as a record or watermark is received. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the + * operation to fail and may trigger recovery. + */ + default void markIdle() throws Exception {} + /** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about an * input record. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index e4b491b31d2e6..c8d0d1b8424b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -639,7 +639,7 @@ public void processWatermark2(Watermark mark) throws Exception { } public final void emitStreamStatus(StreamStatus streamStatus) throws Exception { - output.emitStreamStatus(streamStatus); + processStreamStatus(streamStatus); } private void emitStreamStatus(StreamStatus streamStatus, int index) throws Exception { @@ -648,7 +648,7 @@ private void emitStreamStatus(StreamStatus streamStatus, int index) throws Excep processWatermark(new Watermark(combinedWatermark.getCombinedWatermark())); } if (wasIdle != combinedWatermark.isIdle()) { - output.emitStreamStatus(streamStatus); + processStreamStatus(streamStatus); } } @@ -660,6 +660,11 @@ public final void emitStreamStatus2(StreamStatus streamStatus) throws Exception emitStreamStatus(streamStatus, 1); } + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + output.emitStreamStatus(status); + } + @Override public OperatorID getOperatorID() { return config.getOperatorID(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 7862a527fa3ae..5c9678722591c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -536,10 +536,15 @@ public final void emitStreamStatus(StreamStatus streamStatus, int inputId) throw processWatermark(new Watermark(combinedWatermark.getCombinedWatermark())); } if (wasIdle != combinedWatermark.isIdle()) { - output.emitStreamStatus(streamStatus); + processStreamStatus(streamStatus); } } + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + output.emitStreamStatus(status); + } + @Override public OperatorID getOperatorID() { return config.getOperatorID(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index c4a90be549479..666273178e89c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.util.Disposable; import java.io.Serializable; @@ -133,6 +134,20 @@ OperatorSnapshotFutures snapshotState( /** Provides a context to initialize all state in the operator. */ void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception; + + // ------------------------------------------------------------------------ + // Watermark handling + // ------------------------------------------------------------------------ + + /** + * This method is called when a stream status change on the operator input(s) causes + * an overall status change to the operator, e.g. when all inputs transition to idle. + * + * @throws Exception Throwing an exception here causes the operator to fail and go into + * recovery. + */ + void processStreamStatus(StreamStatus status) throws Exception; + // ------------------------------------------------------------------------ // miscellaneous // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index c714a46856d1b..19cc146e92ea9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; /** A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}. */ @@ -66,6 +67,16 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) { public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); + userFunction.writeWatermark( + new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp())); + } + + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + super.processStreamStatus(status); + if (status.isIdle()) { + userFunction.markIdle(); + } } private class SimpleContext implements SinkFunction.Context { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java index 065c564492460..915791aa0a659 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractSinkWriterOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.util.List; @@ -90,6 +91,16 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); + sinkWriter.writeWatermark( + new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp())); + } + + @Override + public void processStreamStatus(StreamStatus status) throws Exception { + super.processStreamStatus(status); + if (status.isIdle()) { + sinkWriter.markIdle(); + } } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java index a9b279f8ab95a..497d9d3fd28b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java @@ -79,6 +79,15 @@ public void testTimeQuerying() throws Exception { new Tuple4<>(42L, 15L, 13L, "Ciao"), new Tuple4<>(42L, 15L, null, "Ciao"))); + assertThat(bufferingSink.watermarks.size(), is(3)); + + assertThat( + bufferingSink.watermarks, + contains( + new org.apache.flink.api.common.eventtime.Watermark(17L), + new org.apache.flink.api.common.eventtime.Watermark(42L), + new org.apache.flink.api.common.eventtime.Watermark(42L))); + testHarness.close(); } @@ -87,8 +96,11 @@ private static class BufferingQueryingSink implements SinkFunction { // watermark, processing-time, timestamp, event private final List> data; + private final List watermarks; + public BufferingQueryingSink() { data = new ArrayList<>(); + watermarks = new ArrayList<>(); } @Override @@ -110,5 +122,11 @@ public void invoke(T value, Context context) throws Exception { value)); } } + + @Override + public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) + throws Exception { + watermarks.add(watermark); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index d2ac008a9539e..2b3b74308405a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -180,6 +180,29 @@ public void timeBasedBufferingSinkWriter() throws Exception { Tuple3.of(2, initialTime + 2, Long.MIN_VALUE).toString()))); } + @Test + public void watermarkPropagatedToSinkWriter() throws Exception { + final long initialTime = 0; + + final TestSink.DefaultSinkWriter writer = new TestSink.DefaultSinkWriter(); + final OneInputStreamOperatorTestHarness testHarness = + createTestHarness( + TestSink.newBuilder().setWriter(writer).withWriterState().build()); + testHarness.open(); + + testHarness.processWatermark(initialTime); + testHarness.processWatermark(initialTime + 1); + + assertThat( + testHarness.getOutput(), + contains(new Watermark(initialTime), new Watermark(initialTime + 1))); + assertThat( + writer.watermarks, + contains( + new org.apache.flink.api.common.eventtime.Watermark(initialTime), + new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1))); + } + /** * A {@link SinkWriter} that only returns committables from {@link #prepareCommit(boolean)} when * {@code flush} is {@code true}. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java index 32296cb69f586..cb27a192ddd88 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.sink; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.sink.Committer; import org.apache.flink.api.connector.sink.GlobalCommitter; import org.apache.flink.api.connector.sink.Sink; @@ -199,10 +200,13 @@ static class DefaultSinkWriter implements SinkWriter, S protected List elements; + protected List watermarks; + protected ProcessingTimeService processingTimerService; DefaultSinkWriter() { this.elements = new ArrayList<>(); + this.watermarks = new ArrayList<>(); } @Override @@ -211,6 +215,11 @@ public void write(Integer element, Context context) { Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString()); } + @Override + public void writeWatermark(Watermark watermark) throws IOException { + watermarks.add(watermark); + } + @Override public List prepareCommit(boolean flush) { List result = elements; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index 92c63f6e279b1..e2a23a5eb45b2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -606,6 +606,9 @@ public void processElement(StreamRecord element) throws Exception {} @Override public void processWatermark(Watermark mark) throws Exception {} + @Override + public void processStreamStatus(StreamStatus status) throws Exception {} + @Override public void processLatencyMarker(LatencyMarker latencyMarker) {} diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java index cc4825d7e9df1..c574464934e61 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/FileReadingWatermarkITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -104,9 +105,12 @@ public void close() throws Exception { } @Override - public void invoke(String value, SinkFunction.Context context) { - if (context.currentWatermark() != lastWatermark) { - lastWatermark = context.currentWatermark(); + public void invoke(String value, SinkFunction.Context context) {} + + @Override + public void writeWatermark(Watermark watermark) throws Exception { + if (watermark.getTimestamp() != lastWatermark) { + lastWatermark = watermark.getTimestamp(); numWatermarks.add(1); } }