Skip to content

Commit

Permalink
add custom non retry code (#258)
Browse files Browse the repository at this point in the history
Co-authored-by: Filipe Regadas <oss@regadas.email>
Co-authored-by: Yunus Olgun <yunuso@spotify.com>
  • Loading branch information
3 people committed Jan 18, 2023
1 parent cecd1f9 commit 878bae4
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/flink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
GeneratedNameMaxLength *int `json:"generatedNameMaxLength" pflag:"Specifies the length of TaskExecutionID generated name. default: 50"`
RemoteClusterConfig ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for array jobs"`
NonRetryableExitCodes []int32 `json:"nonRetryableExitCodes" pfFlag:"Defines which job submitter exit codes should not be retried"`
NonRetryableFlyteCode *string `json:"nonRetryableFlyteCode,omitempty" pfFlag:"Defines which code should be returned in case of nonRetryable exit codes"`
}

func GetFlinkConfig() *Config {
Expand Down
3 changes: 3 additions & 0 deletions pkg/flink/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package flink

import (
"github.com/flyteorg/flyteplugins/go/tasks/errors"
"regexp"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
Expand Down Expand Up @@ -42,6 +43,7 @@ var (
regexpFlinkClusterName = regexp.MustCompile(`^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`)
generatedNameMaxLength = 50
nonRetryableExitCodes = []int32{}
nonRetryableFlyteCode = errors.DownstreamSystemError
defaultServiceAccount = "default"
defaultResourceRequirements = corev1.ResourceRequirements{
Limits: map[corev1.ResourceName]resource.Quantity{
Expand All @@ -66,6 +68,7 @@ var (
},
GeneratedNameMaxLength: &generatedNameMaxLength,
NonRetryableExitCodes: nonRetryableExitCodes,
NonRetryableFlyteCode: &nonRetryableFlyteCode,
}

flinkConfigSection = pluginsConfig.MustRegisterSubSection("flink", &defaultConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/flink/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func flinkClusterJobPhaseInfo(ctx context.Context, jobStatus *flinkOp.JobStatus,
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info)
}
reason := fmt.Sprintf("Flink jobsubmitter exited with non-zero exit code: %v (non-retryable)", jobStatus.FailureReasons)
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, reason, info)
return pluginsCore.PhaseInfoFailure(nonRetryableFlyteCode, reason, info)
default:
msg := fmt.Sprintf("job id: %s with unknown state: %s", jobStatus.ID, jobStatus.State)
return pluginsCore.PhaseInfoFailure(errors.DownstreamSystemError, msg, info)
Expand Down

0 comments on commit 878bae4

Please sign in to comment.