Skip to content

Commit

Permalink
Gate statemachine progress on taskmanager availability (#144)
Browse files Browse the repository at this point in the history
Currently, the only condition for moving from ClusterStarting to Savepointing (at which point the job goes down) is that all JM/TM pods are up according to the deployment. However, various issues with the TM process or configuration can prevent them from actually registering with the JobManager and becoming available to run tasks. This can lead to extended downtime and can require manual intervention to fix. I've also added a check from the SubmittingJob -> Running transition that the tasks are actually running, which gives us a chance to automatically roll back if the job never successfully starts.

This PR also adds some more visibility into task-level status, so that users can tell if the job is really running (in Flink, a job can be in the Running state even if none of its tasks are running). I've added two new fields to the JobStatus (TotalTasks and RunningTasks) and updated the JobHealth logic to take into account whether tasks are actually running.
  • Loading branch information
mwylde committed Dec 13, 2019
1 parent 11eec01 commit a42556a
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 66 deletions.
2 changes: 1 addition & 1 deletion integ/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (f *TestUtil) CreateCRD() error {
func (f *TestUtil) CreateOperator() error {
configValue := make(map[string]string)
configValue["development"] = "operator:\n containerNameFormat: \"%s-unknown\"\n resyncPeriod: 5s\n" +
" baseBackoffDuration: 50ms\n maxBackoffDuration: 2s\n maxErrDuration: 40s\n" +
" baseBackoffDuration: 50ms\n maxBackoffDuration: 2s\n maxErrDuration: 90s\n" +
"logger:\n formatter:\n type: text\n"

configMap := v1.ConfigMap{
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type FlinkJobStatus struct {
RestorePath string `json:"restorePath,omitempty"`
RestoreTime *metav1.Time `json:"restoreTime,omitempty"`
LastFailingTime *metav1.Time `json:"lastFailingTime,omitempty"`

RunningTasks int32 `json:"runningTasks,omitempty"`
TotalTasks int32 `json:"totalTasks,omitempty"`
}

type FlinkApplicationStatus struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *FlinkJobManagerClient) GetClusterOverview(ctx context.Context, url stri
}
if response != nil && !response.IsSuccess() {
c.metrics.getClusterFailureCounter.Inc(ctx)
if response.StatusCode() != int(http.StatusNotFound) || response.StatusCode() != int(http.StatusServiceUnavailable) {
if response.StatusCode() != int(http.StatusNotFound) && response.StatusCode() != int(http.StatusServiceUnavailable) {
logger.Errorf(ctx, fmt.Sprintf("Get cluster overview failed with response %v", response))
}
return nil, GetRetryableError(err, v1beta1.GetClusterOverview, response.Status(), DefaultRetries)
Expand Down
21 changes: 17 additions & 4 deletions pkg/controller/flink/client/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,24 @@ type FlinkJob struct {
Status JobState `json:"status"`
}

type FlinkJobVertex struct {
ID string `json:"id"`
Name string `json:"name"`
Parallelism int64 `json:"parallelism"`
Status JobState `json:"status"`
StartTime int64 `json:"start-time"`
EndTime int64 `json:"end-time"`
Duration int64 `json:"duration"`
Tasks map[string]int64 `json:"tasks"`
Metrics map[string]interface{} `json:"metrics"`
}

type FlinkJobOverview struct {
JobID string `json:"jid"`
State JobState `json:"state"`
StartTime int64 `json:"start-time"`
EndTime int64 `json:"end-time"`
JobID string `json:"jid"`
State JobState `json:"state"`
StartTime int64 `json:"start-time"`
EndTime int64 `json:"end-time"`
Vertices []FlinkJobVertex `json:"vertices"`
}

type ClusterOverviewResponse struct {
Expand Down
35 changes: 31 additions & 4 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,17 @@ func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.Fl
}

func (f *Controller) IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) {
_, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash))
resp, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash))
if err != nil {
logger.Infof(ctx, "Error response indicating flink API is not ready to handle request %v", err)
return false, err
}

// check that we have enough task slots to run the application
if resp.NumberOfTaskSlots < application.Spec.Parallelism {
return false, nil
}

return true, nil
}

Expand Down Expand Up @@ -560,18 +566,39 @@ func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1
app.Status.JobStatus.RestorePath = checkpoints.Latest.Restored.ExternalPath
restoreTime := metav1.NewTime(time.Unix(checkpoints.Latest.Restored.RestoredTimeStamp/1000, 0))
app.Status.JobStatus.RestoreTime = &restoreTime
}

runningTasks := int32(0)
totalTasks := int32(0)
verticesInCreated := int32(0)

for _, v := range jobResponse.Vertices {
if v.Status == client.Created {
verticesInCreated++
}

for k, v := range v.Tasks {
if k == "RUNNING" {
runningTasks += int32(v)
}
totalTasks += int32(v)
}
}

app.Status.JobStatus.RunningTasks = runningTasks
app.Status.JobStatus.TotalTasks = totalTasks

// Health Status for job
// Job is in FAILING state --> RED
// Time since last successful checkpoint > maxCheckpointTime --> YELLOW
// Else --> Green

if app.Status.JobStatus.State == v1beta1.Failing || time.Since(app.Status.JobStatus.LastFailingTime.Time) <
failingIntervalThreshold {
if app.Status.JobStatus.State == v1beta1.Failing ||
time.Since(app.Status.JobStatus.LastFailingTime.Time) < failingIntervalThreshold ||
verticesInCreated > 0 {
app.Status.JobStatus.Health = v1beta1.Red
} else if time.Since(time.Unix(int64(lastCheckpointAgeSeconds), 0)) < maxCheckpointTime {
} else if time.Since(time.Unix(int64(lastCheckpointAgeSeconds), 0)) < maxCheckpointTime ||
runningTasks < totalTasks {
app.Status.JobStatus.Health = v1beta1.Yellow
} else {
app.Status.JobStatus.Health = v1beta1.Green
Expand Down
26 changes: 24 additions & 2 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func TestFlinkIsServiceReady(t *testing.T) {
mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) {
assert.Equal(t, url, "http://app-name-hash.ns:8081")
return &client.ClusterOverviewResponse{
TaskManagerCount: 3,
TaskManagerCount: 3,
NumberOfTaskSlots: flinkApp.Spec.Parallelism + 6,
SlotsAvailable: flinkApp.Spec.Parallelism + 6,
}, nil
}
isReady, err := flinkControllerForTest.IsServiceReady(context.Background(), &flinkApp, "hash")
Expand Down Expand Up @@ -763,6 +765,22 @@ func TestJobStatusUpdated(t *testing.T) {
JobID: "abc",
State: client.Running,
StartTime: startTime,
Vertices: []client.FlinkJobVertex{
{
Status: "RUNNING",
Tasks: map[string]int64{
"SCHEDULED": 4,
"FINISHED": 0,
"CANCELED": 0,
"CANCELING": 0,
"DEPLOYING": 1,
"RUNNING": 2,
"RECONCILING": 0,
"FAILED": 0,
"CREATED": 0,
},
},
},
}, nil
}

Expand Down Expand Up @@ -794,16 +812,20 @@ func TestJobStatusUpdated(t *testing.T) {

assert.Equal(t, v1beta1.Running, flinkApp.Status.JobStatus.State)
assert.Equal(t, &expectedTime, flinkApp.Status.JobStatus.StartTime)
assert.Equal(t, v1beta1.Green, flinkApp.Status.JobStatus.Health)
assert.Equal(t, v1beta1.Yellow, flinkApp.Status.JobStatus.Health)

assert.Equal(t, int32(0), flinkApp.Status.JobStatus.FailedCheckpointCount)
assert.Equal(t, int32(4), flinkApp.Status.JobStatus.CompletedCheckpointCount)
assert.Equal(t, int32(1), flinkApp.Status.JobStatus.JobRestartCount)
assert.Equal(t, &expectedTime, flinkApp.Status.JobStatus.RestoreTime)

assert.Equal(t, "/test/externalpath", flinkApp.Status.JobStatus.RestorePath)
assert.Equal(t, &expectedTime, flinkApp.Status.JobStatus.LastCheckpointTime)
assert.Equal(t, "app-name.lyft.xyz/#/jobs/abc", flinkApp.Status.JobStatus.JobOverviewURL)

assert.Equal(t, int32(2), flinkApp.Status.JobStatus.RunningTasks)
assert.Equal(t, int32(7), flinkApp.Status.JobStatus.TotalTasks)

}

func TestNoJobStatusChange(t *testing.T) {
Expand Down
22 changes: 13 additions & 9 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,14 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati
}

// Wait for all to be running
ready, err := s.flinkController.IsClusterReady(ctx, application)
if err != nil {
clusterReady, err := s.flinkController.IsClusterReady(ctx, application)
if err != nil || !clusterReady {
return statusUnchanged, err
}
if !ready {

// ignore the error, we just care whether it's ready or not
serviceReady, _ := s.flinkController.IsServiceReady(ctx, application, flink.HashForApplication(application))
if !serviceReady {
return statusUnchanged, nil
}

Expand Down Expand Up @@ -381,11 +384,6 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool,
savepointPath string) (string, error) {
isReady, _ := s.flinkController.IsServiceReady(ctx, app, hash)
// Ignore errors
if !isReady {
return "", nil
}

// add the job running finalizer if necessary
if err := s.addFinalizerIfMissing(ctx, app, jobFinalizer); err != nil {
Expand Down Expand Up @@ -513,7 +511,13 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta
return statusUnchanged, err
}

if job.State == client.Running {
// wait until all vertices have been scheduled and started
allVerticesStarted := true
for _, v := range job.Vertices {
allVerticesStarted = allVerticesStarted && (v.StartTime > 0)
}

if job.State == client.Running && allVerticesStarted {
// Update the application status with the running job info
app.Status.DeployHash = hash
app.Status.SavepointPath = ""
Expand Down
49 changes: 4 additions & 45 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func TestHandleStartingDual(t *testing.T) {
return true, nil
}

mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (b bool, e error) {
return true, nil
}

mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) {
fd := testFlinkDeployment(application)
fd.Taskmanager.Status.AvailableReplicas = 2
Expand Down Expand Up @@ -393,51 +397,6 @@ func TestSubmittingToRunning(t *testing.T) {
assert.Equal(t, 2, statusUpdateCount)
}

func TestHandleApplicationNotReady(t *testing.T) {
stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) {
return false, nil
}
mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) {
assert.False(t, true)
return nil, nil
}
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) {
assert.False(t, true)
return "", nil
}

app := v1beta1.FlinkApplication{
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationSubmittingJob,
},
}

mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string) (*v1.Service, error) {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"flink-app-hash": flink.HashForApplication(&app),
},
},
}, nil
}

mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
assert.False(t, true)
return nil
}
err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
}

func TestHandleApplicationRunning(t *testing.T) {
stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
Expand Down

0 comments on commit a42556a

Please sign in to comment.