Skip to content

Commit

Permalink
Enable allowNonRestoredState in flink CRD (#43)
Browse files Browse the repository at this point in the history
* Propagated `allowNonRestoredState` to state machine. Update tests
  • Loading branch information
YuvalItzchakov authored and anandswaminathan committed Jul 12, 2019
1 parent 1a62f03 commit bb7c8dd
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 21 deletions.
9 changes: 5 additions & 4 deletions pkg/apis/app/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ type FlinkJobStatus struct {
Health HealthStatus `json:"health,omitEmpty"`
State JobState `json:"state,omitEmpty"`

JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`

StartTime *metav1.Time `json:"startTime,omitEmpty"`
JobRestartCount int32 `json:"jobRestartCount,omitEmpty"`
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type ControllerInterface interface {

// Starts the Job in the Flink Cluster
StartFlinkJob(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error)
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error)

// Savepoint creation is asynchronous.
// Polls the status of the Savepoint, using the triggerID
Expand Down Expand Up @@ -229,7 +229,7 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1alpha1.Fl
}

func (f *Controller) StartFlinkJob(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {
response, err := f.flinkClient.SubmitJob(
ctx,
getURLFromApp(application, hash),
Expand All @@ -239,7 +239,7 @@ func (f *Controller) StartFlinkJob(ctx context.Context, application *v1alpha1.Fl
SavepointPath: application.Spec.SavepointInfo.SavepointLocation,
EntryClass: entryClass,
ProgramArgs: programArgs,
AllowNonRestoredState: application.Spec.AllowNonRestoredState,
AllowNonRestoredState: allowNonRestoredState,
})
if err != nil {
return "", err
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestStartFlinkJob(t *testing.T) {
}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
assert.Nil(t, err)
assert.Equal(t, jobID, testJobID)
}
Expand All @@ -463,7 +463,7 @@ func TestStartFlinkJobAllowNonRestoredState(t *testing.T) {
}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
assert.Nil(t, err)
assert.Equal(t, jobID, testJobID)
}
Expand All @@ -478,7 +478,7 @@ func TestStartFlinkJobEmptyJobID(t *testing.T) {
return &client.SubmitJobResponse{}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
assert.EqualError(t, err, "unable to submit job: invalid job id")
assert.Empty(t, jobID)
}
Expand All @@ -492,7 +492,7 @@ func TestStartFlinkJobErr(t *testing.T) {
return nil, errors.New("submit error")
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
assert.EqualError(t, err, "submit error")
assert.Empty(t, jobID)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DeleteOldResourcesForApp func(ctx context.Context, application *v1alpha1.Fl
type CancelWithSavepointFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error)
type ForceCancelFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error
type StartFlinkJobFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error)
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error)
type GetSavepointStatusFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (*client.SavepointResponse, error)
type IsClusterReadyFunc func(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error)
type IsServiceReadyFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error)
Expand Down Expand Up @@ -77,9 +77,9 @@ func (m *FlinkController) ForceCancel(ctx context.Context, application *v1alpha1
}

func (m *FlinkController) StartFlinkJob(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {
if m.StartFlinkJobFunc != nil {
return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs)
return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState)
}
return "", nil
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
}

func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (*client.FlinkJob, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (*client.FlinkJob, error) {
isReady, _ := s.flinkController.IsServiceReady(ctx, app, hash)
// Ignore errors
if !isReady {
Expand All @@ -342,7 +342,7 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1alpha1
if activeJob == nil {
logger.Infof(ctx, "No active job found for the application %v", jobs)
jobID, err := s.flinkController.StartFlinkJob(ctx, app, hash,
jarName, parallelism, entryClass, programArgs)
jarName, parallelism, entryClass, programArgs, allowNonRestoredState)
if err != nil {
s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed",
fmt.Sprintf("Failed to submit job to cluster for deploy %s: %v", hash, err))
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1alph
}

activeJob, err := s.submitJobIfNeeded(ctx, app, hash,
app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs)
app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs, app.Spec.AllowNonRestoredState)
if err != nil {
return err
}
Expand All @@ -420,6 +420,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1alph
app.Status.JobStatus.Parallelism = app.Spec.Parallelism
app.Status.JobStatus.EntryClass = app.Spec.EntryClass
app.Status.JobStatus.ProgramArgs = app.Spec.ProgramArgs
app.Status.JobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState

return s.updateApplicationPhase(ctx, app, v1alpha1.FlinkApplicationRunning)
}
Expand Down Expand Up @@ -460,7 +461,8 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1alpha1
// submit the old job
activeJob, err := s.submitJobIfNeeded(ctx, app, app.Status.DeployHash,
app.Status.JobStatus.JarName, app.Status.JobStatus.Parallelism,
app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs)
app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs,
app.Status.JobStatus.AllowNonRestoredState)

if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,14 @@ func TestSubmittingToRunning(t *testing.T) {

startCount := 0
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {

assert.Equal(t, appHash, hash)
assert.Equal(t, app.Spec.JarName, jarName)
assert.Equal(t, app.Spec.Parallelism, parallelism)
assert.Equal(t, app.Spec.EntryClass, entryClass)
assert.Equal(t, app.Spec.ProgramArgs, programArgs)
assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState)

startCount++
return jobID, nil
Expand Down Expand Up @@ -407,7 +408,7 @@ func TestHandleApplicationNotReady(t *testing.T) {
return nil, nil
}
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {
assert.False(t, true)
return "", nil
}
Expand Down Expand Up @@ -522,14 +523,15 @@ func TestRollingBack(t *testing.T) {

startCalled := false
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {

startCalled = true
assert.Equal(t, "old-hash", hash)
assert.Equal(t, app.Status.JobStatus.JarName, jarName)
assert.Equal(t, app.Status.JobStatus.Parallelism, parallelism)
assert.Equal(t, app.Status.JobStatus.EntryClass, entryClass)
assert.Equal(t, app.Status.JobStatus.ProgramArgs, programArgs)
assert.Equal(t, app.Status.JobStatus.AllowNonRestoredState, allowNonRestoredState)
return jobID, nil
}

Expand Down

0 comments on commit bb7c8dd

Please sign in to comment.