Skip to content

Commit

Permalink
[FLINK-21132] [runtime] Don't end input on stop with savepoint
Browse files Browse the repository at this point in the history
This code was copied from apache#14815 with
formatting changes to accomdate git am.
  • Loading branch information
Ufuk Celebi committed Feb 3, 2021
1 parent e8cfe67 commit 98790cd
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea

private final OperatorEventDispatcherImpl operatorEventDispatcher;

private boolean isStoppingBySyncSavepoint;

/**
* Current status of the input stream of the operator chain.
* Watermarks explicitly generated by operators in the chain (i.e. timestamp
Expand Down Expand Up @@ -274,7 +276,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
* @param inputId the input ID starts from 1 which indicates the first input.
*/
public void endHeadOperatorInput(int inputId) throws Exception {
if (headOperatorWrapper != null) {
if (headOperatorWrapper != null && !isStoppingBySyncSavepoint) {
headOperatorWrapper.endOperatorInput(inputId);
}
}
Expand All @@ -299,7 +301,7 @@ protected void initializeStateAndOpenOperators(StreamTaskStateInitializer stream
*/
protected void closeOperators(StreamTaskActionExecutor actionExecutor) throws Exception {
if (headOperatorWrapper != null) {
headOperatorWrapper.close(actionExecutor);
headOperatorWrapper.close(actionExecutor, isStoppingBySyncSavepoint);
}
}

Expand Down Expand Up @@ -547,6 +549,10 @@ StreamOperator<?> getTailOperator() {
return (tailOperatorWrapper == null) ? null : tailOperatorWrapper.getStreamOperator();
}

public void setIsStoppingBySyncSavepoint(boolean stoppingBySyncSavepoint) {
this.isStoppingBySyncSavepoint = stoppingBySyncSavepoint;
}

// ------------------------------------------------------------------------
// Collectors for output chaining
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
* Indicates whether this Task was purposefully finished (by finishTask()), in this case we
* want to ignore exceptions thrown after finishing, to ensure shutdown works smoothly.
*/
private volatile boolean isFinished = false;
private volatile boolean wasStoppedExternally = false;

public SourceStreamTask(Environment env) throws Exception {
this(env, new Object());
Expand Down Expand Up @@ -136,9 +136,16 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {
mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));
} else if (!isFinished && sourceThreadThrowable != null) {
} else if (!wasStoppedExternally && sourceThreadThrowable != null) {
mailboxProcessor.reportThrowable(sourceThreadThrowable);
} else if (sourceThreadThrowable != null || isCanceled() || wasStoppedExternally) {
mailboxProcessor.allActionsCompleted();
} else {
// this is a "true" end of input regardless of whether
// stop-with-savepoint was issued or not
synchronized (lock) {
operatorChain.setIsStoppingBySyncSavepoint(false);
}
mailboxProcessor.allActionsCompleted();
}
});
Expand All @@ -163,7 +170,7 @@ protected void cancelTask() {

@Override
protected void finishTask() throws Exception {
isFinished = true;
wasStoppedExternally = true;
cancelTask();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
* This class handles the close, endInput and other related logic of a {@link StreamOperator}.
* It also automatically propagates the close operation to the next wrapper that the {@link #next}
* points to, so we can use {@link #next} to link all operator wrappers in the operator chain and
* close all operators only by calling the {@link #close(StreamTaskActionExecutor)} method of the
* header operator wrapper.
* close all operators only by calling the {@link #close(StreamTaskActionExecutor, boolean, boolean)}
* method of the header operator wrapper.
*/
@Internal
public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
Expand Down Expand Up @@ -76,8 +76,8 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
* {@link MailboxExecutor#yield()} to take the mails of closing operator and running timers and
* run them.
*/
public void close(StreamTaskActionExecutor actionExecutor) throws Exception {
close(actionExecutor, false);
public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint) throws Exception {
close(actionExecutor, false, isStoppingBySyncSavepoint);
}

/**
Expand Down Expand Up @@ -120,8 +120,8 @@ void setNext(StreamOperatorWrapper next) {
this.next = next;
}

private void close(StreamTaskActionExecutor actionExecutor, boolean invokingEndInput) throws Exception {
if (invokingEndInput) {
private void close(StreamTaskActionExecutor actionExecutor, boolean invokingEndInput, boolean isStoppingBySyncSavepoint) throws Exception {
if (invokingEndInput && !isStoppingBySyncSavepoint) {
// NOTE: This only do for the case where the operator is one-input operator. At present,
// any non-head operator on the operator chain is one-input operator.
actionExecutor.runThrowing(() -> endOperatorInput(1));
Expand All @@ -131,7 +131,7 @@ private void close(StreamTaskActionExecutor actionExecutor, boolean invokingEndI

// propagate the close operation to the next wrapper
if (next != null) {
next.close(actionExecutor, true);
next.close(actionExecutor, true, isStoppingBySyncSavepoint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final ExecutorService channelIOExecutor;

private Long syncSavepointId = null;
private Long activeSyncSavepointId = null;

private long latestAsyncCheckpointStartDelayNanos;

Expand Down Expand Up @@ -378,14 +379,21 @@ CompletableFuture<?> getInputOutputJointFuture(InputStatus status) {
}
}

private void resetSynchronousSavepointId() {
private void resetSynchronousSavepointId(long id, boolean succeeded) {
if (!succeeded && activeSyncSavepointId != null && activeSyncSavepointId == id) {
// allow to process further EndOfPartition events
activeSyncSavepointId = null;
operatorChain.setIsStoppingBySyncSavepoint(false);
}
syncSavepointId = null;
}

private void setSynchronousSavepointId(long checkpointId) {
Preconditions.checkState(
syncSavepointId == null, "at most one stop-with-savepoint checkpoint at a time is allowed");
syncSavepointId = checkpointId;
activeSyncSavepointId = checkpointId;
operatorChain.setIsStoppingBySyncSavepoint(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -898,6 +906,7 @@ public void triggerCheckpointOnBarrier(

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}

Expand All @@ -919,6 +928,10 @@ private boolean performCheckpoint(
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
} else if (activeSyncSavepointId != null
&& activeSyncSavepointId < checkpointMetaData.getCheckpointId()) {
activeSyncSavepointId = null;
operatorChain.setIsStoppingBySyncSavepoint(false);
}

subtaskCheckpointCoordinator.checkpointState(
Expand Down Expand Up @@ -965,7 +978,13 @@ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
@Override
public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
return notifyCheckpointOperation(
() -> subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning),
() -> {
resetSynchronousSavepointId(checkpointId, false);
subtaskCheckpointCoordinator.notifyCheckpointAborted(
checkpointId,
operatorChain,
this::isRunning);
},
String.format("checkpoint %d aborted", checkpointId));
}

Expand All @@ -991,7 +1010,7 @@ private void notifyCheckpointComplete(long checkpointId) throws Exception {
if (isRunning && isSynchronousSavepointId(checkpointId)) {
finishTask();
// Reset to "notify" the internal synchronous savepoint mailbox loop.
resetSynchronousSavepointId();
resetSynchronousSavepointId(checkpointId, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void teardown() throws Exception {
@Test
public void testClose() throws Exception {
output.clear();
operatorWrappers.get(0).close(containingTask.getActionExecutor());
operatorWrappers.get(0).close(containingTask.getActionExecutor(), false);

List<Object> expected = new ArrayList<>();
for (int i = 0; i < operatorWrappers.size(); i++) {
Expand Down Expand Up @@ -160,7 +160,7 @@ public void close() throws Exception {
containingTask.getMailboxExecutorFactory().createExecutor(Integer.MAX_VALUE - 1));

try {
operatorWrapper.close(containingTask.getActionExecutor());
operatorWrapper.close(containingTask.getActionExecutor(), false);
fail("should throw an exception");
} catch (Throwable t) {
Optional<Throwable> optional = ExceptionUtils.findThrowableWithMessage(t, "test exception at closing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException;

Expand Down Expand Up @@ -152,7 +153,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.runtime.checkpoint.CheckpointType.SYNC_SAVEPOINT;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -187,12 +192,94 @@ public class StreamTaskTest extends TestLogger {
@Rule
public final Timeout timeoutPerTest = Timeout.seconds(30);

@Test
public void testSyncSavepointCompleted() throws Exception {
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
}

@Test
public void testSyncSavepointAborted() throws Exception {
testSyncSavepointWithEndInput(
(task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
}

@Test
public void testSyncSavepointAbortedAsync() throws Exception {
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointAbortAsync, true);
}

/**
* Test for SyncSavepoint and EndInput interactions. Targets following scenarios scenarios:
*
* <ol>
* <li>Thread1: notify sync savepoint
* <li>Thread2: endInput
* <li>Thread1: confirm/abort/abortAsync
* <li>assert inputEnded: confirmed - no, abort/abortAsync - yes
* </ol>
*/
private void testSyncSavepointWithEndInput(
BiConsumerWithException<StreamTask<?, ?>, Long, IOException> savepointResult,
boolean expectEndInput)
throws Exception {
StreamTaskMailboxTestHarness<String> harness =
new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
.addInput(STRING_TYPE_INFO)
.setupOutputForSingletonOperatorChain(
new TestBoundedOneInputStreamOperator())
.build();

final long checkpointId = 1L;
CountDownLatch savepointTriggeredLatch = new CountDownLatch(1);
CountDownLatch inputEndedLatch = new CountDownLatch(1);

MailboxExecutor executor =
harness.streamTask.getMailboxExecutorFactory().createExecutor(MAX_PRIORITY);
executor.execute(
() -> {
try {
harness.streamTask.triggerCheckpointOnBarrier(
new CheckpointMetaData(checkpointId, checkpointId),
new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
new CheckpointMetrics());
} catch (IOException e) {
fail(e.getMessage());
}
},
"triggerCheckpointOnBarrier");
new Thread(
() -> {
try {
savepointTriggeredLatch.await();
harness.endInput();
inputEndedLatch.countDown();
} catch (InterruptedException e) {
fail(e.getMessage());
}
})
.start();
// this mails should be executed from the one above (from triggerCheckpointOnBarrier)
executor.execute(savepointTriggeredLatch::countDown, "savepointTriggeredLatch");
executor.execute(
() -> {
inputEndedLatch.await();
savepointResult.accept(harness.streamTask, checkpointId);
},
"savepointResult");

while (harness.streamTask.isMailboxLoopRunning()) {
harness.streamTask.runMailboxStep();
}

Assert.assertEquals(expectEndInput, TestBoundedOneInputStreamOperator.isInputEnded());
}

@Test
public void testCleanUpExceptionSuppressing() throws Exception {
OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
OneInputStreamTask::new,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
STRING_TYPE_INFO,
STRING_TYPE_INFO);

testHarness.setupOutputForSingletonOperatorChain();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ public class TestBoundedOneInputStreamOperator extends AbstractStreamOperator<St
private static final long serialVersionUID = 1L;

private final String name;
private static volatile boolean inputEnded = false;

public TestBoundedOneInputStreamOperator() {
this("test");
}

public TestBoundedOneInputStreamOperator(String name) {
this.name = name;
inputEnded = false;
}

@Override
Expand All @@ -44,6 +50,7 @@ public void processElement(StreamRecord<String> element) {

@Override
public void endInput() {
inputEnded = true;
output("[" + name + "]: End of input");
}

Expand All @@ -59,4 +66,8 @@ public void close() throws Exception {
private void output(String record) {
output.collect(new StreamRecord<>(record));
}

public static boolean isInputEnded() {
return inputEnded;
}
}
Loading

0 comments on commit 98790cd

Please sign in to comment.