Skip to content

Commit

Permalink
[STRMCMP-558] Event improvements (#44)
Browse files Browse the repository at this point in the history
* Make events unique across deploys

* Use unique reason for different events
  • Loading branch information
Micah Wylde committed Jul 11, 2019
1 parent 8d847d7 commit 1a62f03
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 38 deletions.
29 changes: 12 additions & 17 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type ControllerInterface interface {
FindExternalizedCheckpoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error)

// Logs an event to the FlinkApplication resource and to the operator log
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string)
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string)

// Compares and updates new cluster status with current cluster status
// Returns true if there is a change in ClusterStatus
Expand Down Expand Up @@ -206,21 +206,24 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1alpha1.Fl
newlyCreatedJm, err := f.jobManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Job manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create job managers: %v", err))
f.LogEvent(ctx, application, corev1.EventTypeWarning, "CreateClusterFailed",
fmt.Sprintf("Failed to create job managers for deploy %s: %v",
HashForApplication(application), err))

return err
}
newlyCreatedTm, err := f.taskManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Task manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create task managers: %v", err))
f.LogEvent(ctx, application, corev1.EventTypeWarning, "CreateClusterFailed",
fmt.Sprintf("Failed to create task managers for deploy %s: %v",
HashForApplication(application), err))
return err
}

if newlyCreatedJm || newlyCreatedTm {
f.LogEvent(ctx, application, corev1.EventTypeNormal, "Flink cluster created")
f.LogEvent(ctx, application, corev1.EventTypeNormal, "CreatingCluster",
fmt.Sprintf("Creating Flink cluster for deploy %s", HashForApplication(application)))
}
return nil
}
Expand Down Expand Up @@ -388,7 +391,8 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1
}

for k := range deletedHashes {
f.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k))
f.LogEvent(ctx, app, corev1.EventTypeNormal, "ToreDownCluster",
fmt.Sprintf("Deleted old cluster with hash %s", k))
}

return nil
Expand All @@ -411,16 +415,7 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application
return checkpoint.ExternalPath, nil
}

func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
reason := "Create"
if app.Status.DeployHash != "" {
// this is not the first deploy
reason = "Update"
}
if app.DeletionTimestamp != nil {
reason = "Delete"
}

func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string) {
f.eventRecorder.Event(app, eventType, reason, message)
logger.Infof(ctx, "Logged %s event: %s: %s", eventType, reason, message)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func (m *FlinkController) FindExternalizedCheckpoint(ctx context.Context, applic
return "", nil
}

func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, reason string, message string) {
m.Events = append(m.Events, corev1.Event{
InvolvedObject: corev1.ObjectReference{
Kind: app.Kind,
Name: app.Name,
Namespace: app.Namespace,
},
Type: eventType,
Reason: "Test",
Reason: reason,
Message: message,
})
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (s *FlinkStateMachine) shouldRollback(ctx context.Context, application *v1a
// Check if the error is retryable
if application.Status.LastSeenError != nil && s.retryHandler.IsErrorRetryable(application.Status.LastSeenError) {
if s.retryHandler.IsRetryRemaining(application.Status.LastSeenError, application.Status.RetryCount) {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application in phase %v retrying with error %v", application.Status.Phase, application.Status.LastSeenError))
logger.Warnf(ctx, "Application in phase %v retrying with error %v",
application.Status.Phase, application.Status.LastSeenError)
return false
}
// Retryable error with retries exhausted
Expand All @@ -99,16 +100,13 @@ func (s *FlinkStateMachine) shouldRollback(ctx context.Context, application *v1a

// For non-retryable errors, always fail fast
if application.Status.LastSeenError != nil {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application failed to progress in the %v phase with error %v", application.Status.Phase, application.Status.LastSeenError))
application.Status.LastSeenError = nil
return true
}

// As a default, use a time based wait to determine whether to rollback or not.
if application.Status.LastUpdatedAt != nil {
if elapsedTime, ok := s.retryHandler.WaitOnError(s.clock, application.Status.LastUpdatedAt.Time); !ok {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Application failed to progress for %v in the %v phase",
elapsedTime, application.Status.Phase))
if _, ok := s.retryHandler.WaitOnError(s.clock, application.Status.LastUpdatedAt.Time); !ok {
application.Status.LastSeenError = nil
return true
}
Expand Down Expand Up @@ -212,8 +210,10 @@ func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application
}

func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1alpha1.FlinkApplication) error {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "Deployment failed, rolled back successfully")
app.Status.FailedDeployHash = flink.HashForApplication(app)
hash := flink.HashForApplication(app)
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "RolledBackDeploy",
fmt.Sprintf("Successfull rolled back deploy %s", hash))
app.Status.FailedDeployHash = hash

// Reset error and retry count
app.Status.LastSeenError = nil
Expand Down Expand Up @@ -266,7 +266,8 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
return err
}

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob",
fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID))

application.Spec.SavepointInfo.TriggerID = triggerID
return s.k8Cluster.UpdateK8Object(ctx, application)
Expand All @@ -283,8 +284,9 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
savepointStatusResponse.SavepointStatus.Status != client.SavePointInProgress {
// Savepointing failed
// TODO: we should probably retry this a few times before failing
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, fmt.Sprintf("Failed to take savepoint: %v",
savepointStatusResponse.Operation.FailureCause))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint for job %s: %v",
application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause))

// try to find an externalized checkpoint
path, err := s.flinkController.FindExternalizedCheckpoint(ctx, application, application.Status.DeployHash)
Expand All @@ -296,12 +298,15 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
return s.deployFailed(ctx, application)
}

s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Restoring from externalized checkpoint %s", path))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "RestoringExternalizedCheckpoint",
fmt.Sprintf("Restoring from externalized checkpoint %s for deploy %s",
path, flink.HashForApplication(application)))

restorePath = path
} else if savepointStatusResponse.SavepointStatus.Status == client.SavePointCompleted {
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, fmt.Sprintf("Canceled job with savepoint %s",
savepointStatusResponse.Operation.Location))
s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CanceledJob",
fmt.Sprintf("Canceled job with savepoint %s",
savepointStatusResponse.Operation.Location))
restorePath = savepointStatusResponse.Operation.Location
}

Expand Down Expand Up @@ -339,13 +344,15 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1alpha1
jobID, err := s.flinkController.StartFlinkJob(ctx, app, hash,
jarName, parallelism, entryClass, programArgs)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, fmt.Sprintf("Failed to submit job to cluster: %v", err))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed",
fmt.Sprintf("Failed to submit job to cluster for deploy %s: %v", hash, err))

// TODO: we probably want some kind of back-off here
return nil, err
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Flink job submitted to cluster with id %s", jobID))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "JobSubmitted",
fmt.Sprintf("Flink job submitted to cluster with id %s", jobID))
app.Status.JobStatus.JobID = jobID
activeJob = flink.GetActiveFlinkJob(jobs)
} else {
Expand Down Expand Up @@ -429,7 +436,8 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1alpha1
return s.deployFailed(ctx, app)
}

s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "Deployment failed, rolling back")
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "DeployFailed",
fmt.Sprintf("Deployment %s failed, rolling back", flink.HashForApplication(app)))

// TODO: handle single mode

Expand Down Expand Up @@ -605,7 +613,8 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app *
if err != nil {
return err
}
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Cancelling job with savepoint %v", triggerID))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CancellingJob",
fmt.Sprintf("Cancelling job with savepoint %v", triggerID))
app.Spec.SavepointInfo.TriggerID = triggerID
} else {
// we've already started savepointing; check the status
Expand All @@ -616,12 +625,14 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app *

if status.Operation.Location == "" && status.SavepointStatus.Status != client.SavePointInProgress {
// savepointing failed
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "SavepointFailed",
fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause))
// clear the trigger id so that we can try again
app.Spec.SavepointInfo.TriggerID = ""
} else if status.SavepointStatus.Status == client.SavePointCompleted {
// we're done, clean up
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location))
s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob",
fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location))
app.Spec.SavepointInfo.SavepointLocation = status.Operation.Location
app.Spec.SavepointInfo.TriggerID = ""
}
Expand Down

0 comments on commit 1a62f03

Please sign in to comment.