Skip to content

Commit

Permalink
Fixed a bug with applying default values (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed May 3, 2020
1 parent 8ae8575 commit 0cdfd05
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 57 deletions.
5 changes: 2 additions & 3 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ type RestartPolicy struct {
// +optional
OnFailureRetries *int32 `json:"onFailureRetries,omitempty"`

// OnSubmissionFailureRetryInterval is the interval between retries on failed submissions.
// Interval to wait between successive retries of a failed application.
// OnSubmissionFailureRetryInterval is the interval in seconds between retries on failed submissions.
// +kubebuilder:validation:Minimum=1
// +optional
OnSubmissionFailureRetryInterval *int64 `json:"onSubmissionFailureRetryInterval,omitempty"`

// OnFailureRetryInterval is the interval between retries on failed runs.
// OnFailureRetryInterval is the interval in seconds between retries on failed runs.
// +kubebuilder:validation:Minimum=1
// +optional
OnFailureRetryInterval *int64 `json:"onFailureRetryInterval,omitempty"`
Expand Down
92 changes: 44 additions & 48 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) erro
}

if driverPod == nil {
app.Status.AppState.ErrorMessage = "Driver Pod not found"
app.Status.AppState.ErrorMessage = "driver pod not found"
app.Status.AppState.State = v1beta2.FailingState
app.Status.TerminationTime = metav1.Now()
return nil
Expand Down Expand Up @@ -491,7 +491,6 @@ func shouldRetry(app *v1beta2.SparkApplication) bool {
//| +-------------------------------+ |
//| |
//+--------------------------------------------------------------------------------------------------------------------+

func (c *Controller) syncSparkApplication(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand All @@ -510,71 +509,72 @@ func (c *Controller) syncSparkApplication(key string) error {
return nil
}

appToUpdate := app.DeepCopy()
appCopy := app.DeepCopy()
// Apply the default values to the copy. Note that the default values applied
// won't be sent to the API server as we only update the /status subresource.
v1beta2.SetSparkApplicationDefaults(appCopy)

// Take action based on application state.
switch appToUpdate.Status.AppState.State {
switch appCopy.Status.AppState.State {
case v1beta2.NewState:
c.recordSparkApplicationEvent(appToUpdate)
if err := c.validateSparkApplication(appToUpdate); err != nil {
appToUpdate.Status.AppState.State = v1beta2.FailedState
appToUpdate.Status.AppState.ErrorMessage = err.Error()
c.recordSparkApplicationEvent(appCopy)
if err := c.validateSparkApplication(appCopy); err != nil {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appToUpdate = c.submitSparkApplication(appToUpdate)
appCopy = c.submitSparkApplication(appCopy)
}
case v1beta2.SucceedingState:
if !shouldRetry(appToUpdate) {
// Application is not subject to retry. Move to terminal CompletedState.
appToUpdate.Status.AppState.State = v1beta2.CompletedState
c.recordSparkApplicationEvent(appToUpdate)
if !shouldRetry(appCopy) {
appCopy.Status.AppState.State = v1beta2.CompletedState
c.recordSparkApplicationEvent(appCopy)
} else {
if err := c.deleteSparkResources(appToUpdate); err != nil {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appToUpdate.Namespace, appToUpdate.Name, err)
appCopy.Namespace, appCopy.Name, err)
return err
}
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
appCopy.Status.AppState.State = v1beta2.PendingRerunState
}
case v1beta2.FailingState:
if !shouldRetry(appToUpdate) {
// Application is not subject to retry. Move to terminal FailedState.
appToUpdate.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appToUpdate)
} else if hasRetryIntervalPassed(appToUpdate.Spec.RestartPolicy.OnFailureRetryInterval, appToUpdate.Status.ExecutionAttempts, appToUpdate.Status.TerminationTime) {
if err := c.deleteSparkResources(appToUpdate); err != nil {
if !shouldRetry(appCopy) {
appCopy.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appCopy)
} else if isNextRetryDue(appCopy.Spec.RestartPolicy.OnFailureRetryInterval, appCopy.Status.ExecutionAttempts, appCopy.Status.TerminationTime) {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appToUpdate.Namespace, appToUpdate.Name, err)
appCopy.Namespace, appCopy.Name, err)
return err
}
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
appCopy.Status.AppState.State = v1beta2.PendingRerunState
}
case v1beta2.FailedSubmissionState:
if !shouldRetry(appToUpdate) {
if !shouldRetry(appCopy) {
// App will never be retried. Move to terminal FailedState.
appToUpdate.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appToUpdate)
} else if hasRetryIntervalPassed(appToUpdate.Spec.RestartPolicy.OnSubmissionFailureRetryInterval, appToUpdate.Status.SubmissionAttempts, appToUpdate.Status.LastSubmissionAttemptTime) {
appToUpdate = c.submitSparkApplication(appToUpdate)
appCopy.Status.AppState.State = v1beta2.FailedState
c.recordSparkApplicationEvent(appCopy)
} else if isNextRetryDue(appCopy.Spec.RestartPolicy.OnSubmissionFailureRetryInterval, appCopy.Status.SubmissionAttempts, appCopy.Status.LastSubmissionAttemptTime) {
appCopy = c.submitSparkApplication(appCopy)
}
case v1beta2.InvalidatingState:
// Invalidate the current run and enqueue the SparkApplication for re-execution.
if err := c.deleteSparkResources(appToUpdate); err != nil {
if err := c.deleteSparkResources(appCopy); err != nil {
glog.Errorf("failed to delete resources associated with SparkApplication %s/%s: %v",
appToUpdate.Namespace, appToUpdate.Name, err)
appCopy.Namespace, appCopy.Name, err)
return err
}
c.clearStatus(&appToUpdate.Status)
appToUpdate.Status.AppState.State = v1beta2.PendingRerunState
c.clearStatus(&appCopy.Status)
appCopy.Status.AppState.State = v1beta2.PendingRerunState
case v1beta2.PendingRerunState:
glog.V(2).Infof("SparkApplication %s/%s pending rerun", appToUpdate.Namespace, appToUpdate.Name)
if c.validateSparkResourceDeletion(appToUpdate) {
glog.V(2).Infof("Resources for SparkApplication %s/%s successfully deleted", appToUpdate.Namespace, appToUpdate.Name)
c.recordSparkApplicationEvent(appToUpdate)
c.clearStatus(&appToUpdate.Status)
appToUpdate = c.submitSparkApplication(appToUpdate)
glog.V(2).Infof("SparkApplication %s/%s is pending rerun", appCopy.Namespace, appCopy.Name)
if c.validateSparkResourceDeletion(appCopy) {
glog.V(2).Infof("Resources for SparkApplication %s/%s successfully deleted", appCopy.Namespace, appCopy.Name)
c.recordSparkApplicationEvent(appCopy)
c.clearStatus(&appCopy.Status)
appCopy = c.submitSparkApplication(appCopy)
}
case v1beta2.SubmittedState, v1beta2.RunningState, v1beta2.UnknownState:
if err := c.getAndUpdateAppState(appToUpdate); err != nil {
if err := c.getAndUpdateAppState(appCopy); err != nil {
return err
}
case v1beta2.CompletedState, v1beta2.FailedState:
Expand All @@ -588,8 +588,8 @@ func (c *Controller) syncSparkApplication(key string) error {
}
}

if appToUpdate != nil {
err = c.updateStatusAndExportMetrics(app, appToUpdate)
if appCopy != nil {
err = c.updateStatusAndExportMetrics(app, appCopy)
if err != nil {
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return err
Expand All @@ -599,9 +599,8 @@ func (c *Controller) syncSparkApplication(key string) error {
return nil
}

// Helper func to determine if we have waited enough to retry the SparkApplication.
func hasRetryIntervalPassed(retryInterval *int64, attemptsDone int32, lastEventTime metav1.Time) bool {
glog.V(3).Infof("retryInterval: %d , lastEventTime: %v, attempsDone: %d", retryInterval, lastEventTime, attemptsDone)
// Helper func to determine if the next retry the SparkApplication is due now.
func isNextRetryDue(retryInterval *int64, attemptsDone int32, lastEventTime metav1.Time) bool {
if retryInterval == nil || lastEventTime.IsZero() || attemptsDone <= 0 {
return false
}
Expand All @@ -618,9 +617,6 @@ func hasRetryIntervalPassed(retryInterval *int64, attemptsDone int32, lastEventT

// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication {
// Apply default values before submitting the application to run.
v1beta2.SetSparkApplicationDefaults(app)

if app.PrometheusMonitoringEnabled() {
if err := configPrometheusMonitoring(app, c.kubeClient); err != nil {
glog.Error(err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,14 +1496,14 @@ func TestSyncSparkApplication_ApplicationExpired(t *testing.T) {
assert.True(t, errors.IsNotFound(err))
}

func TestHasRetryIntervalPassed(t *testing.T) {
func TestIsNextRetryDue(t *testing.T) {
// Failure cases.
assert.False(t, hasRetryIntervalPassed(nil, 3, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.False(t, hasRetryIntervalPassed(int64ptr(5), 0, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.False(t, hasRetryIntervalPassed(int64ptr(5), 3, metav1.Time{}))
assert.False(t, isNextRetryDue(nil, 3, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.False(t, isNextRetryDue(int64ptr(5), 0, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.False(t, isNextRetryDue(int64ptr(5), 3, metav1.Time{}))
// Not enough time passed.
assert.False(t, hasRetryIntervalPassed(int64ptr(50), 3, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.True(t, hasRetryIntervalPassed(int64ptr(50), 3, metav1.Time{Time: metav1.Now().Add(-151 * time.Second)}))
assert.False(t, isNextRetryDue(int64ptr(50), 3, metav1.Time{Time: metav1.Now().Add(-100 * time.Second)}))
assert.True(t, isNextRetryDue(int64ptr(50), 3, metav1.Time{Time: metav1.Now().Add(-151 * time.Second)}))
}

func stringptr(s string) *string {
Expand Down

0 comments on commit 0cdfd05

Please sign in to comment.