Skip to content

Commit

Permalink
[STRMCMP-844] Fix rollback when SubmittingJob times out (#172)
Browse files Browse the repository at this point in the history
If we time out after the new job has been submitted (generally because the tasks are never all running, possibly due to parallelism being misconfigured), we will enter the rollback phase. However, because the new job was submitted, the jobID is set in our status, and so [this check|https://github.com/lyft/flinkk8soperator/blob/4142437353666b8692e62acf075a9b2c70514dd9/pkg/controller/flinkapplication/flink_state_machine.go#L396] prevents us from submitting the job to the old cluster.

This PR fixes that issue by clearing the jobId before moving to the RollingBack phase.
  • Loading branch information
mwylde committed Feb 28, 2020
1 parent ec67117 commit be521f6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta
// Something's gone wrong; roll back
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed",
fmt.Sprintf("Failed to submit job: %s", reason))
app.Status.JobStatus.JobID = ""
s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob)
return statusChanged, nil
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,39 @@ func TestRollbackWithFailFastError(t *testing.T) {
assert.Nil(t, app.Status.LastSeenError)
}

func TestRollbackAfterJobSubmission(t *testing.T) {
app := v1beta1.FlinkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1beta1.FlinkApplicationSpec{
JarName: "job.jar",
Parallelism: 5,
EntryClass: "com.my.Class",
ProgramArgs: "--test",

// force a rollback
ForceRollback: true,
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationSubmittingJob,
DeployHash: "old-hash-retry-err",
JobStatus: v1beta1.FlinkJobStatus{
JobID: "jobid",
},
},
}

stateMachineForTest := getTestStateMachine()

err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)

assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, app.Status.Phase)
assert.Equal(t, "", app.Status.JobStatus.JobID)
}

func TestErrorHandlingInRunningPhase(t *testing.T) {
app := v1beta1.FlinkApplication{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit be521f6

Please sign in to comment.