Skip to content

Commit

Permalink
fix NullPointerException when CheckpointingOperation.executeCheckpoin…
Browse files Browse the repository at this point in the history
…ting
  • Loading branch information
chenkai committed May 9, 2020
1 parent 4764c8c commit e5122d9
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,8 @@ private boolean performCheckpoint(
if (isRunning) {
actionExecutor.runThrowing(() -> {

final CheckpointMetaData tmpCheckpointMetaData =
new CheckpointMetaData(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp());
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);

Expand All @@ -862,12 +864,12 @@ private boolean performCheckpoint(
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
tmpCheckpointMetaData.getTimestamp(),
checkpointOptions);

// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
checkpointState(tmpCheckpointMetaData, checkpointOptions, checkpointMetrics);

});

Expand Down

0 comments on commit e5122d9

Please sign in to comment.