Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance sinks to support idleness. #2

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.Watermark;

import java.io.IOException;
import java.util.List;
Expand All @@ -46,6 +47,23 @@ public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {
*/
void write(InputT element, Context context) throws IOException;

/**
* Add a watermark to the writer.
*
* @param watermark The watermark.
* @throws IOException if fail to add a watermark.
*/
default void writeWatermark(Watermark watermark) throws IOException {}

/**
* Marks the writer as idle. This function is called when all sink inputs are idle.
*
* <p>A writer becomes active again as soon as a record or watermark is received.
*
* @throws Exception if fail to mark idle.
*/
default void markIdle() throws Exception {}

/**
* Prepare for a commit.
*
Expand Down
Expand Up @@ -89,6 +89,11 @@ public void processWatermark(Watermark mark) throws Exception {
operator.processWatermark(mark);
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
operator.processStreamStatus(status);
}

@Override
public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
operator.processLatencyMarker(latencyMarker);
Expand Down
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -145,5 +147,10 @@ public Object getCurrentKey() {
ACTUAL_ORDER_TRACKING.add("getCurrentKey");
return null;
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
ACTUAL_ORDER_TRACKING.add("processStreamStatus");
}
}
}
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;
Expand Down Expand Up @@ -49,6 +50,25 @@ default void invoke(IN value, Context context) throws Exception {
invoke(value);
}

/**
* Writes the given watermark to the sink. This function is called for every watermark.
*
* @param watermark The watermark.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
default void writeWatermark(Watermark watermark) throws Exception {}

/**
* Marks the sink as idle. This function is called when all sink inputs are idle.
*
* <p>A sink becomes active again as soon as a record or watermark is received.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
default void markIdle() throws Exception {}

/**
* Context that {@link SinkFunction SinkFunctions } can use for getting additional data about an
* input record.
Expand Down
Expand Up @@ -639,7 +639,7 @@ public void processWatermark2(Watermark mark) throws Exception {
}

public final void emitStreamStatus(StreamStatus streamStatus) throws Exception {
output.emitStreamStatus(streamStatus);
processStreamStatus(streamStatus);
}

private void emitStreamStatus(StreamStatus streamStatus, int index) throws Exception {
Expand All @@ -648,7 +648,7 @@ private void emitStreamStatus(StreamStatus streamStatus, int index) throws Excep
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
output.emitStreamStatus(streamStatus);
processStreamStatus(streamStatus);
}
}

Expand All @@ -660,6 +660,11 @@ public final void emitStreamStatus2(StreamStatus streamStatus) throws Exception
emitStreamStatus(streamStatus, 1);
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
output.emitStreamStatus(status);
}

@Override
public OperatorID getOperatorID() {
return config.getOperatorID();
Expand Down
Expand Up @@ -536,10 +536,15 @@ public final void emitStreamStatus(StreamStatus streamStatus, int inputId) throw
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
output.emitStreamStatus(streamStatus);
processStreamStatus(streamStatus);
}
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
output.emitStreamStatus(status);
}

@Override
public OperatorID getOperatorID() {
return config.getOperatorID();
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.util.Disposable;

import java.io.Serializable;
Expand Down Expand Up @@ -133,6 +134,20 @@ OperatorSnapshotFutures snapshotState(
/** Provides a context to initialize all state in the operator. */
void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception;


// ------------------------------------------------------------------------
// Watermark handling
// ------------------------------------------------------------------------

/**
* This method is called when a stream status change on the operator input(s) causes
* an overall status change to the operator, e.g. when all inputs transition to idle.
*
* @throws Exception Throwing an exception here causes the operator to fail and go into
* recovery.
*/
void processStreamStatus(StreamStatus status) throws Exception;

// ------------------------------------------------------------------------
// miscellaneous
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

/** A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}. */
Expand Down Expand Up @@ -66,6 +67,16 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
userFunction.writeWatermark(
new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
super.processStreamStatus(status);
if (status.isIdle()) {
userFunction.markIdle();
}
}

private class SimpleContext<IN> implements SinkFunction.Context {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

import java.util.List;
Expand Down Expand Up @@ -90,6 +91,16 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
sinkWriter.writeWatermark(
new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {
super.processStreamStatus(status);
if (status.isIdle()) {
sinkWriter.markIdle();
}
}

@Override
Expand Down
Expand Up @@ -79,6 +79,15 @@ public void testTimeQuerying() throws Exception {
new Tuple4<>(42L, 15L, 13L, "Ciao"),
new Tuple4<>(42L, 15L, null, "Ciao")));

assertThat(bufferingSink.watermarks.size(), is(3));

assertThat(
bufferingSink.watermarks,
contains(
new org.apache.flink.api.common.eventtime.Watermark(17L),
new org.apache.flink.api.common.eventtime.Watermark(42L),
new org.apache.flink.api.common.eventtime.Watermark(42L)));

testHarness.close();
}

Expand All @@ -87,8 +96,11 @@ private static class BufferingQueryingSink<T> implements SinkFunction<T> {
// watermark, processing-time, timestamp, event
private final List<Tuple4<Long, Long, Long, T>> data;

private final List<org.apache.flink.api.common.eventtime.Watermark> watermarks;

public BufferingQueryingSink() {
data = new ArrayList<>();
watermarks = new ArrayList<>();
}

@Override
Expand All @@ -110,5 +122,11 @@ public void invoke(T value, Context context) throws Exception {
value));
}
}

@Override
public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark)
throws Exception {
watermarks.add(watermark);
}
}
}
Expand Up @@ -180,6 +180,29 @@ public void timeBasedBufferingSinkWriter() throws Exception {
Tuple3.of(2, initialTime + 2, Long.MIN_VALUE).toString())));
}

@Test
public void watermarkPropagatedToSinkWriter() throws Exception {
final long initialTime = 0;

final TestSink.DefaultSinkWriter writer = new TestSink.DefaultSinkWriter();
final OneInputStreamOperatorTestHarness<Integer, String> testHarness =
createTestHarness(
TestSink.newBuilder().setWriter(writer).withWriterState().build());
testHarness.open();

testHarness.processWatermark(initialTime);
testHarness.processWatermark(initialTime + 1);

assertThat(
testHarness.getOutput(),
contains(new Watermark(initialTime), new Watermark(initialTime + 1)));
assertThat(
writer.watermarks,
contains(
new org.apache.flink.api.common.eventtime.Watermark(initialTime),
new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)));
}

/**
* A {@link SinkWriter} that only returns committables from {@link #prepareCommit(boolean)} when
* {@code flush} is {@code true}.
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
Expand Down Expand Up @@ -199,10 +200,13 @@ static class DefaultSinkWriter implements SinkWriter<Integer, String, String>, S

protected List<String> elements;

protected List<Watermark> watermarks;

protected ProcessingTimeService processingTimerService;

DefaultSinkWriter() {
this.elements = new ArrayList<>();
this.watermarks = new ArrayList<>();
}

@Override
Expand All @@ -211,6 +215,11 @@ public void write(Integer element, Context context) {
Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString());
}

@Override
public void writeWatermark(Watermark watermark) throws IOException {
watermarks.add(watermark);
}

@Override
public List<String> prepareCommit(boolean flush) {
List<String> result = elements;
Expand Down
Expand Up @@ -606,6 +606,9 @@ public void processElement(StreamRecord<String> element) throws Exception {}
@Override
public void processWatermark(Watermark mark) throws Exception {}

@Override
public void processStreamStatus(StreamStatus status) throws Exception {}

@Override
public void processLatencyMarker(LatencyMarker latencyMarker) {}

Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand Down Expand Up @@ -104,9 +105,12 @@ public void close() throws Exception {
}

@Override
public void invoke(String value, SinkFunction.Context context) {
if (context.currentWatermark() != lastWatermark) {
lastWatermark = context.currentWatermark();
public void invoke(String value, SinkFunction.Context context) {}

@Override
public void writeWatermark(Watermark watermark) throws Exception {
if (watermark.getTimestamp() != lastWatermark) {
lastWatermark = watermark.getTimestamp();
numWatermarks.add(1);
}
}
Expand Down