Skip to content

Commit

Permalink
Use local copy of RunPolicy by MPI-operator (#513)
Browse files Browse the repository at this point in the history
* Use local copy of RunPolicy by MPI-operator

Steps performed:
- copy the `RunPolicy` from common to `types.go`
- fix compilation errors by using the local RunPolicy definition
- run `make generate`
- run `make all`
- regenerate openapi_generated.go by `./hack/python-sdk/gen-sdk.sh` (with commented out rollback)

* Copy SchedulingPolicy and CleanPodPolicy for RunPolicy
  • Loading branch information
mimowo committed Jan 31, 2023
1 parent 382da78 commit 0b32af3
Show file tree
Hide file tree
Showing 22 changed files with 1,010 additions and 92 deletions.
6 changes: 3 additions & 3 deletions pkg/apis/kubeflow/v2beta1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func setDefaultsTypeWorker(spec *common.ReplicaSpec) {
}
}

func setDefaultsRunPolicy(policy *common.RunPolicy) {
func setDefaultsRunPolicy(policy *RunPolicy) {
if policy.CleanPodPolicy == nil {
policy.CleanPodPolicy = newCleanPodPolicy(common.CleanPodPolicyNone)
policy.CleanPodPolicy = newCleanPodPolicy(CleanPodPolicyNone)
}
// The remaining fields are passed as-is to the k8s Job API, which does its
// own defaulting.
Expand Down Expand Up @@ -80,6 +80,6 @@ func newInt32(v int32) *int32 {
return &v
}

func newCleanPodPolicy(policy common.CleanPodPolicy) *common.CleanPodPolicy {
func newCleanPodPolicy(policy CleanPodPolicy) *CleanPodPolicy {
return &policy
}
20 changes: 10 additions & 10 deletions pkg/apis/kubeflow/v2beta1/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func TestSetDefaults_MPIJob(t *testing.T) {
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
RunPolicy: common.RunPolicy{
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
RunPolicy: RunPolicy{
CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand All @@ -42,8 +42,8 @@ func TestSetDefaults_MPIJob(t *testing.T) {
job: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(10),
RunPolicy: common.RunPolicy{
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
RunPolicy: RunPolicy{
CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyRunning),
TTLSecondsAfterFinished: newInt32(2),
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
Expand All @@ -55,8 +55,8 @@ func TestSetDefaults_MPIJob(t *testing.T) {
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(10),
RunPolicy: common.RunPolicy{
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning),
RunPolicy: RunPolicy{
CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyRunning),
TTLSecondsAfterFinished: newInt32(2),
ActiveDeadlineSeconds: newInt64(3),
BackoffLimit: newInt32(4),
Expand All @@ -77,8 +77,8 @@ func TestSetDefaults_MPIJob(t *testing.T) {
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
RunPolicy: common.RunPolicy{
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
RunPolicy: RunPolicy{
CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand All @@ -102,8 +102,8 @@ func TestSetDefaults_MPIJob(t *testing.T) {
want: MPIJob{
Spec: MPIJobSpec{
SlotsPerWorker: newInt32(1),
RunPolicy: common.RunPolicy{
CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone),
RunPolicy: RunPolicy{
CleanPodPolicy: newCleanPodPolicy(CleanPodPolicyNone),
},
SSHAuthMountPath: "/root/.ssh",
MPIImplementation: MPIImplementationOpenMPI,
Expand Down
126 changes: 115 additions & 11 deletions pkg/apis/kubeflow/v2beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 57 additions & 1 deletion pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@
"runPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the job.",
"default": {},
"$ref": "#/definitions/v1.RunPolicy"
"$ref": "#/definitions/v2beta1.RunPolicy"
},
"slotsPerWorker": {
"description": "Specifies the number of slots per worker used in hostfile. Defaults to 1.",
Expand All @@ -272,6 +272,62 @@
"type": "string"
}
}
},
"v2beta1.RunPolicy": {
"description": "RunPolicy encapsulates various runtime policies of the distributed training job, for example how to clean up resources and how long the job can stay active.",
"type": "object",
"properties": {
"activeDeadlineSeconds": {
"description": "Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer.",
"type": "integer",
"format": "int64"
},
"backoffLimit": {
"description": "Optional number of retries before marking this job failed.",
"type": "integer",
"format": "int32"
},
"cleanPodPolicy": {
"description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.",
"type": "string"
},
"schedulingPolicy": {
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/v2beta1.SchedulingPolicy"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
"type": "integer",
"format": "int32"
}
}
},
"v2beta1.SchedulingPolicy": {
"description": "SchedulingPolicy encapsulates various scheduling policies of the distributed training job, for example `minAvailable` for gang-scheduling.",
"type": "object",
"properties": {
"minAvailable": {
"type": "integer",
"format": "int32"
},
"minResources": {
"type": "object",
"additionalProperties": {
"default": {},
"$ref": "#/definitions/resource.Quantity"
}
},
"priorityClass": {
"type": "string"
},
"queue": {
"type": "string"
},
"scheduleTimeoutSeconds": {
"type": "integer",
"format": "int32"
}
}
}
}
}
51 changes: 50 additions & 1 deletion pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v2beta1

import (
common "github.com/kubeflow/common/pkg/apis/common/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -37,6 +38,54 @@ type MPIJobList struct {
Items []MPIJob `json:"items"`
}

// CleanPodPolicy describes how to deal with pods when the job is finished.
type CleanPodPolicy string

const (
CleanPodPolicyUndefined CleanPodPolicy = ""
CleanPodPolicyAll CleanPodPolicy = "All"
CleanPodPolicyRunning CleanPodPolicy = "Running"
CleanPodPolicyNone CleanPodPolicy = "None"
)

// SchedulingPolicy encapsulates various scheduling policies of the distributed training
// job, for example `minAvailable` for gang-scheduling.
type SchedulingPolicy struct {
MinAvailable *int32 `json:"minAvailable,omitempty"`
Queue string `json:"queue,omitempty"`
MinResources *v1.ResourceList `json:"minResources,omitempty"`
PriorityClass string `json:"priorityClass,omitempty"`
ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"`
}

// RunPolicy encapsulates various runtime policies of the distributed training
// job, for example how to clean up resources and how long the job can stay
// active.
type RunPolicy struct {
// CleanPodPolicy defines the policy to kill pods after the job completes.
// Default to Running.
CleanPodPolicy *CleanPodPolicy `json:"cleanPodPolicy,omitempty"`

// TTLSecondsAfterFinished is the TTL to clean up jobs.
// It may take extra ReconcilePeriod seconds for the cleanup, since
// reconcile gets called periodically.
// Default to infinite.
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`

// Specifies the duration in seconds relative to the startTime that the job may be active
// before the system tries to terminate it; value must be positive integer.
// +optional
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`

// Optional number of retries before marking this job failed.
// +optional
BackoffLimit *int32 `json:"backoffLimit,omitempty"`

// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`
}

type MPIJobSpec struct {

// Specifies the number of slots per worker used in hostfile.
Expand All @@ -46,7 +95,7 @@ type MPIJobSpec struct {
SlotsPerWorker *int32 `json:"slotsPerWorker,omitempty"`

// RunPolicy encapsulates various runtime policies of the job.
RunPolicy common.RunPolicy `json:"runPolicy,omitempty"`
RunPolicy RunPolicy `json:"runPolicy,omitempty"`

// MPIReplicaSpecs contains maps from `MPIReplicaType` to `ReplicaSpec` that
// specify the MPI replicas to run.
Expand Down
Loading

0 comments on commit 0b32af3

Please sign in to comment.