Skip to content

Commit

Permalink
Fixes to ensure Delete phase is always handled and prevent deploys fr…
Browse files Browse the repository at this point in the history
…om being stuck in a Savepointing phase (#142)

* [WIP] Fix for deploy getting stuck

* fix lint

* Refine comment

* Add more asserts

* Fix log event

* Fix nil response checks
  • Loading branch information
lrao100 committed Dec 10, 2019
1 parent c2158e7 commit 4edffc5
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 12 deletions.
7 changes: 4 additions & 3 deletions pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const httpPatch = "PATCH"
const retryCount = 3
const httpGetTimeOut = 5 * time.Second
const defaultTimeOut = 1 * time.Minute
const checkSavepointStatusRetries = 3

// ProgramInvocationException is thrown when the entry class doesn't exist or throws an exception
const programInvocationException = "org.apache.flink.client.program.ProgramInvocationException"
Expand Down Expand Up @@ -264,17 +265,17 @@ func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url st
response, err := c.executeRequest(ctx, httpGet, url, nil)
if err != nil {
c.metrics.checkSavepointFailureCounter.Inc(ctx)
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, DefaultRetries)
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, GlobalFailure, checkSavepointStatusRetries)
}
if response != nil && !response.IsSuccess() {
c.metrics.checkSavepointFailureCounter.Inc(ctx)
logger.Errorf(ctx, fmt.Sprintf("Check savepoint status failed with response %v", response))
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), DefaultRetries)
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, response.Status(), checkSavepointStatusRetries)
}
var savepointResponse SavepointResponse
if err = json.Unmarshal(response.Body(), &savepointResponse); err != nil {
logger.Errorf(ctx, "Unable to Unmarshal savepointResponse %v, err: %v", response, err)
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, DefaultRetries)
return nil, GetRetryableError(err, v1beta1.CheckSavepointStatus, JSONUnmarshalError, checkSavepointStatusRetries)
}
c.metrics.cancelJobSuccessCounter.Inc(ctx)
return &savepointResponse, nil
Expand Down
39 changes: 30 additions & 9 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
return statusChanged, nil
}

if s.IsTimeToHandlePhase(application) {
if s.IsTimeToHandlePhase(application, appPhase) {
if !v1beta1.IsRunningPhase(application.Status.Phase) {
logger.Infof(ctx, "Handling state for application")
}
Expand Down Expand Up @@ -193,7 +193,14 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
return updateApplication || updateLastSeenError, appErr
}

func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication) bool {
func (s *FlinkStateMachine) IsTimeToHandlePhase(application *v1beta1.FlinkApplication, phase v1beta1.FlinkApplicationPhase) bool {
if phase == v1beta1.FlinkApplicationDeleting {
// reset lastSeenError and retryCount in case the application was failing in its previous phase
// We always want a Deleting phase to be handled
application.Status.LastSeenError = nil
application.Status.RetryCount = 0
return true
}
lastSeenError := application.Status.LastSeenError
if application.Status.LastSeenError == nil || !s.retryHandler.IsErrorRetryable(application.Status.LastSeenError) {
return true
Expand Down Expand Up @@ -312,18 +319,32 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a

// check the savepoints in progress
savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash)
if err != nil {
// Here shouldRollback() is used as a proxy to identify if we have exhausted all retries trying to GetSavepointStatus
// The application is NOT actually rolled back because we're in a spot where there may be no application running and we don't
// have information on the last successful savepoint.
// In the event that rollback evaluates to True, we don't return immediately but allow for the remaining steps to proceed as normal.
rollback, reason := s.shouldRollback(ctx, application)

if err != nil && !rollback {
return statusUnchanged, err
}

if err != nil && rollback {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointStatusCheckFailed",
fmt.Sprintf("Exhausted retries trying to get status for savepoint for job %s: %v",
application.Status.JobStatus.JobID, reason))
}

var restorePath string
if savepointStatusResponse.Operation.Location == "" &&
savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress {
if rollback || (savepointStatusResponse.Operation.Location == "" &&
savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress) {
// Savepointing failed
// TODO: we should probably retry this a few times before failing
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint for job %s: %v",
application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause))
if savepointStatusResponse != nil {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint for job %s: %v",
application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause))
}

// try to find an externalized checkpoint
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, application, application.Status.DeployHash)
Expand All @@ -340,7 +361,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
path, flink.HashForApplication(application)))

restorePath = path
} else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted {
} else if savepointStatusResponse != nil && savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob",
fmt.Sprintf("Canceled job with savepoint %s",
savepointStatusResponse.Operation.Location))
Expand Down
123 changes: 123 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,3 +1263,126 @@ func TestLastSeenErrTimeIsNil(t *testing.T) {
assert.Nil(t, err)

}

func TestCheckSavepointStatusFailing(t *testing.T) {
oldHash := "old-hash-fail"
maxRetries := int32(1)
retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1)
app := v1beta1.FlinkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1beta1.FlinkApplicationSpec{
JarName: "job.jar",
Parallelism: 5,
EntryClass: "com.my.Class",
ProgramArgs: "--test",
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationSavepointing,
DeployHash: oldHash,
LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError),
SavepointTriggerID: "trigger",
},
}
app.Status.LastSeenError.LastErrorUpdateTime = nil

stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) {
return nil, retryableErr.(*v1beta1.FlinkApplicationError)
}

mockFlinkController.FindExternalizedCheckpointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) {
return "/tmp/checkpoint", nil
}
mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler)
mockRetryHandler.IsErrorRetryableFunc = func(err error) bool {
return true
}
mockRetryHandler.IsTimeToRetryFunc = func(clock clock.Clock, lastUpdatedTime time.Time, retryCount int32) bool {
return true
}
mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool {
return retryCount < maxRetries
}

err := stateMachineForTest.Handle(context.Background(), &app)
// 1 retry left
assert.NotNil(t, err)
assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase)

// No retries left for CheckSavepointStatus
// The app should hence try to recover from an externalized checkpoint
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, app.Status.Phase)
assert.Equal(t, "/tmp/checkpoint", app.Status.SavepointPath)
assert.Equal(t, "", app.Status.JobStatus.JobID)
}

func TestDeleteWhenCheckSavepointStatusFailing(t *testing.T) {
retryableErr := client.GetRetryableError(errors.New("blah"), "CheckSavepointStatus", "FAILED", 1)
app := v1beta1.FlinkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1beta1.FlinkApplicationSpec{
JarName: "job.jar",
Parallelism: 5,
EntryClass: "com.my.Class",
ProgramArgs: "--test",
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationSavepointing,
DeployHash: "appHash",
LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError),
SavepointTriggerID: "trigger",
},
}
app.Status.LastSeenError.LastErrorUpdateTime = nil

stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) {
return nil, retryableErr.(*v1beta1.FlinkApplicationError)
}
mockFlinkController.CancelWithSavepointFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (s string, e error) {
return "triggerId", nil
}
mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler)
mockRetryHandler.IsErrorRetryableFunc = func(err error) bool {
return true
}
mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool {
return true
}
err := stateMachineForTest.Handle(context.Background(), &app)
assert.NotNil(t, err)
assert.Equal(t, v1beta1.FlinkApplicationSavepointing, app.Status.Phase)
assert.NotNil(t, app.Status.LastSeenError)
// Try to force delete the app while it's in a savepointing state (with errors)
// We should handle the delete here
app.Status.Phase = v1beta1.FlinkApplicationDeleting
app.Spec.DeleteMode = v1beta1.DeleteModeForceCancel

mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) {
assert.Equal(t, "appHash", hash)
return &client.FlinkJobOverview{
JobID: "jobID",
State: client.Failing,
}, nil
}

mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error {
return nil
}
err = stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
assert.Nil(t, app.Status.LastSeenError)
assert.Equal(t, int32(0), app.Status.RetryCount)
assert.Nil(t, app.GetFinalizers())

}

0 comments on commit 4edffc5

Please sign in to comment.