diff --git a/CONTROLLER_VERSION b/CONTROLLER_VERSION index b615a11c5..971333c39 100644 --- a/CONTROLLER_VERSION +++ b/CONTROLLER_VERSION @@ -1 +1 @@ -1.29.50 +1.29.51 diff --git a/config/crd/bases/mcad.ibm.com_appwrappers.yaml b/config/crd/bases/mcad.ibm.com_appwrappers.yaml index 6bdf3053a..8fc1e52d3 100644 --- a/config/crd/bases/mcad.ibm.com_appwrappers.yaml +++ b/config/crd/bases/mcad.ibm.com_appwrappers.yaml @@ -257,6 +257,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_queuejobs.yaml b/config/crd/bases/mcad.ibm.com_queuejobs.yaml index 6962ff118..b32bd425c 100644 --- a/config/crd/bases/mcad.ibm.com_queuejobs.yaml +++ b/config/crd/bases/mcad.ibm.com_queuejobs.yaml @@ -34,7 +34,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -43,6 +43,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml index 0087f6af5..73999d042 100644 --- a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml +++ b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml @@ -36,6 +36,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index e8a6dfc96..4a6409a1a 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -136,6 +136,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -181,7 +203,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -190,6 +212,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -7225,6 +7269,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml index 12e06317d..aca07f6e2 100644 --- a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml @@ -118,6 +118,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -163,7 +185,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -172,6 +194,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -7198,6 +7242,28 @@ spec: properties: minAvailable: type: integer + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: int + default: 0 + maxNumRequeuings: + type: int + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index db59f8f71..fa83b223e 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -48,6 +48,16 @@ type SchedulingSpec struct { type SchedulingSpecTemplate struct { NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,1,rep,name=nodeSelector"` MinAvailable int `json:"minAvailable,omitempty" protobuf:"bytes,2,rep,name=minAvailable"` + Requeuing RequeuingTemplate `json:"requeuing,omitempty" protobuf:"bytes,1,rep,name=requeuing"` +} + +type RequeuingTemplate struct { + InitialTimeInSeconds int `json:"initialTimeInSeconds,omitempty" protobuf:"bytes,1,rep,name=initialTimeInSeconds"` + TimeInSeconds int `json:"timeInSeconds,omitempty" protobuf:"bytes,2,rep,name=timeInSeconds"` + MaxTimeInSeconds int `json:"maxTimeInSeconds,omitempty" protobuf:"bytes,3,rep,name=maxTimeInSeconds"` + GrowthType string `json:"growthType,omitempty" protobuf:"bytes,4,rep,name=growthType"` + NumRequeuings int `json:"numRequeuings,omitempty" protobuf:"bytes,5,rep,name=numRequeuings"` + MaxNumRequeuings int `json:"maxNumRequeuings,omitempty" protobuf:"bytes,6,rep,name=maxNumRequeuings"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index fc73a18c3..5654e5d9d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -424,6 +424,10 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * func (qjm *XController) PreemptQueueJobs() { qjobs := qjm.GetQueueJobsEligibleForPreemption() for _, aw := range qjobs { + if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { + continue + } + var updateNewJob *arbv1.AppWrapper var message string newjob, e := qjm.queueJobLister.AppWrappers(aw.Namespace).Get(aw.Name) @@ -431,15 +435,35 @@ func (qjm *XController) PreemptQueueJobs() { continue } newjob.Status.CanRun = false + cleanAppWrapper := false if (aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable) { message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded) cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message) newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - updateNewJob = newjob.DeepCopy() - //If pods failed scheduling generate new preempt condition + if aw.Spec.SchedSpec.Requeuing.GrowthType == "exponential" { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds += aw.Spec.SchedSpec.Requeuing.TimeInSeconds + } else if aw.Spec.SchedSpec.Requeuing.GrowthType == "linear" { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds += aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds + } + + if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 { + if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Spec.SchedSpec.Requeuing.TimeInSeconds { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds = aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds + } + } + + if newjob.Spec.SchedSpec.Requeuing.MaxNumRequeuings > 0 && newjob.Spec.SchedSpec.Requeuing.NumRequeuings == newjob.Spec.SchedSpec.Requeuing.MaxNumRequeuings { + newjob.Status.State = arbv1.AppWrapperStateDeleted + cleanAppWrapper = true + } else { + newjob.Spec.SchedSpec.Requeuing.NumRequeuings += 1 + } + + updateNewJob = newjob.DeepCopy() } else { + //If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running) index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling") //ignore co-scheduler failed scheduling events. This is a temp @@ -457,12 +481,16 @@ func (qjm *XController) PreemptQueueJobs() { if err := qjm.updateEtcd(updateNewJob, "PreemptQueueJobs - CanRun: false"); err != nil { klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", aw.Namespace, aw.Name, err) } - klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.", - aw.Name, aw.Namespace) - go qjm.backoff(aw, "PreemptionTriggered", string(message)) - + if cleanAppWrapper { + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of requeuings exceeded.", aw.Name, aw.Namespace) + go qjm.Cleanup(aw) + } else { + klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.", aw.Name, aw.Namespace) + go qjm.backoff(aw, "PreemptionTriggered", string(message)) + } } } + func (qjm *XController) preemptAWJobs(preemptAWs []*arbv1.AppWrapper) { if preemptAWs == nil { return @@ -503,43 +531,59 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { - //Check to see if if this AW job has been dispatched for a time window before preempting - conditionsLen := len(value.Status.Conditions) - var dispatchConditionExists bool - dispatchConditionExists = false - var condition arbv1.AppWrapperCondition - // Get the last time the AppWrapper was dispatched - for i := (conditionsLen - 1); i > 0; i-- { - condition = value.Status.Conditions[i] - if condition.Type != arbv1.AppWrapperCondDispatched { + // Find the dispatched condition if there is any + numConditions := len(value.Status.Conditions) + var dispatchedCondition arbv1.AppWrapperCondition + dispatchedConditionExists := false + + for i := numConditions - 1; i > 0; i-- { + dispatchedCondition = value.Status.Conditions[i] + if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { continue } - dispatchConditionExists = true + dispatchedConditionExists = true break } - // Now check for 0 running pods and for the minimum age and then - // skip preempt if current time is not beyond minimum age ie 10 mins - minAge := condition.LastTransitionMicroTime.Add(600 * time.Second) - if (value.Status.Running <= 0) && (dispatchConditionExists && (time.Now().Before(minAge))) { + // Check for the minimum age and then skip preempt if current time is not beyond minimum age + // The minimum age is controlled by the requeuing.TimeInSeconds stanza + // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later + lastCondition := value.Status.Conditions[numConditions - 1] + var condition arbv1.AppWrapperCondition + + if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { + condition = dispatchedCondition + } else { + condition = lastCondition + } + + requeuingTimeInSeconds := value.Spec.SchedSpec.Requeuing.TimeInSeconds + minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) + currentTime := time.Now() + + if currentTime.Before(minAge) { continue } + if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { + value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds + } + if replicas > 0 { klog.V(3).Infof("AppWrapper %s is eligible for preemption Running: %v - minAvailable: %v , Succeeded: %v !!! \n", value.Name, value.Status.Running, replicas, value.Status.Succeeded) qjobs = append(qjobs, value) } } else { - //Preempt when schedulingSpec stanza is not set but pods fails scheduling. - //ignore co-scheduler pods + // Preempt when schedulingSpec stanza is not set but pods fails scheduling. + // ignore co-scheduler pods if len(value.Status.PendingPodConditions) > 0 { klog.V(3).Infof("AppWrapper %s is eligible for preemption Running: %v , Succeeded: %v due to failed scheduling !!! \n", value.Name, value.Status.Running, value.Status.Succeeded) qjobs = append(qjobs, value) } - } } } + return qjobs } diff --git a/test/e2e/queue.go b/test/e2e/queue.go index a1db58aa6..7699e427d 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -36,7 +36,6 @@ package e2e import ( "fmt" "os" - "time" . "github.com/onsi/ginkgo" @@ -106,11 +105,11 @@ var _ = Describe("AppWrapper E2E Test", func() { // Using quite mode due to creating of pods in earlier step. err = waitAWReadyQuiet(context, aw2) Expect(err).NotTo(HaveOccurred()) - }) It("MCAD CPU Preemption Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Preemption Test - Started.\n") + context := initTestContext() var appwrappers []*arbv1.AppWrapper appwrappersPtr := &appwrappers @@ -140,6 +139,44 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(err).NotTo(HaveOccurred()) }) + It("MCAD CPU Requeuing - Completion After Enough Requeuing Times Test", func() { + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + + context := initTestContext() + var appwrappers []*arbv1.AppWrapper + appwrappersPtr := &appwrappers + defer cleanupTestObjectsPtr(context, appwrappersPtr) + + // Create a job with init containers that need 200 seconds to be ready before the container starts. + // The requeuing mechanism is set to start at 1 minute, which is not enough time for the PODs to be completed. + // The job should be requeued 3 times before it finishes since the wait time is doubled each time the job is requeued (i.e., initially it waits + // for 1 minutes before requeuing, then 2 minutes, and then 4 minutes). Since the init containers take 3 minutes + // and 20 seconds to finish, a 4 minute wait should be long enough to finish the job successfully + aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 60, "exponential", 0) + appwrappers = append(appwrappers, aw) + + err := waitAWPodsCompleted(context, aw, 720 * time.Second) // This test waits for 10 minutes to make sure all PODs complete + Expect(err).NotTo(HaveOccurred()) + }) + + It("MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test", func() { + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + + context := initTestContext() + var appwrappers []*arbv1.AppWrapper + appwrappersPtr := &appwrappers + defer cleanupTestObjectsPtr(context, appwrappersPtr) + + // Create a job with init containers that need 200 seconds to be ready before the container starts. + // The requeuing mechanism is set to fire after 1 second (plus the 60 seconds time interval of the background thread) + // Within 5 minutes, the AppWrapper will be requeued up to 3 times at which point it will be deleted + aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 1, "none", 3) + appwrappers = append(appwrappers, aw) + + err := waitAWPodsCompleted(context, aw, 300 * time.Second) + Expect(err).To(HaveOccurred()) + }) + It("Create AppWrapper - StatefulSet Only - 2 Pods", func() { fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - StatefulSet Only - 2 Pods - Started.\n") @@ -351,7 +388,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster with customPodResources matching deployment resource demands so AW pods are created aw := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2) + context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2, 60) appwrappers = append(appwrappers, aw) @@ -364,6 +401,7 @@ var _ = Describe("AppWrapper E2E Test", func() { It("MCAD Scheduling Fail Fast Preemption Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Started.\n") + context := initTestContext() var appwrappers []*arbv1.AppWrapper appwrappersPtr := &appwrappers @@ -378,7 +416,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on any node but should dispatch because there is enough aggregated resources. aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1) + context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1, 60) appwrappers = append(appwrappers, aw2) @@ -391,7 +429,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster after AW aw-deployment-1-700-cpu above is automatically preempted on // scheduling failure aw3 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2) + context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2, 60) appwrappers = append(appwrappers, aw3) @@ -406,7 +444,6 @@ var _ = Describe("AppWrapper E2E Test", func() { // Make sure pods from AW aw-deployment-1-700-cpu above do not exist proving preemption err = waitAWAnyPodsExists(context, aw2) Expect(err).To(HaveOccurred()) - }) It("MCAD Bad Custom Pod Resources vs. Deployment Pod Resource Not Queuing Test", func() { @@ -425,7 +462,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on cluster but customPodResources is incorrect so AW pods are created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2) + context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2, 60) appwrappers = append(appwrappers, aw2) @@ -452,7 +489,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster but customPodResources is incorrect so AW pods are not created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2) + context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2, 60) appwrappers = append(appwrappers, aw2) @@ -497,7 +534,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createGenericJobAWWithScheduleSpec(context, "aw-test-job-with-scheduling-spec") err1 := waitAWPodsReady(context, aw) Expect(err1).NotTo(HaveOccurred()) - err2 := waitAWPodsCompleted(context, aw) + err2 := waitAWPodsCompleted(context, aw, 90 * time.Second) Expect(err2).NotTo(HaveOccurred()) // Once pods are completed, we wait for them to see if they change their status to anything BUT "Completed" diff --git a/test/e2e/util.go b/test/e2e/util.go index a88979214..afa94bf63 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -65,6 +65,7 @@ import ( var ninetySeconds = 90 * time.Second var threeMinutes = 180 * time.Second +var tenMinutes = 600 * time.Second var threeHundredSeconds = 300 * time.Second var oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} @@ -611,8 +612,8 @@ func waitAWPodsReady(ctx *context, aw *arbv1.AppWrapper) error { return waitAWPodsReadyEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false) } -func waitAWPodsCompleted(ctx *context, aw *arbv1.AppWrapper) error { - return waitAWPodsCompletedEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false) +func waitAWPodsCompleted(ctx *context, aw *arbv1.AppWrapper, timeout time.Duration) error { + return waitAWPodsCompletedEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false, timeout) } func waitAWPodsNotCompleted(ctx *context, aw *arbv1.AppWrapper) error { @@ -653,8 +654,8 @@ func waitAWPodsReadyEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bo []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}, taskNum, quite)) } -func waitAWPodsCompletedEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool) error { - return wait.Poll(100*time.Millisecond, ninetySeconds, awPodPhase(ctx, aw, +func waitAWPodsCompletedEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool, timeout time.Duration ) error { + return wait.Poll(100*time.Millisecond, timeout, awPodPhase(ctx, aw, []v1.PodPhase{v1.PodSucceeded}, taskNum, quite)) } @@ -744,6 +745,98 @@ func createReplicaSet(context *context, name string, rep int32, img string, req return deployment } +func createJobAWWithInitContainer(context *context, name string, requeuingTimeInSeconds int, requeuingGrowthType string, requeuingMaxNumRequeuings int ) *arbv1.AppWrapper { + rb := []byte(`{"apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": "aw-job-3-init-container", + "namespace": "test", + "labels": { + "app": "aw-job-3-init-container" + } + }, + "spec": { + "parallelism": 3, + "template": { + "metadata": { + "labels": { + "app": "aw-job-3-init-container" + }, + "annotations": { + "appwrapper.mcad.ibm.com/appwrapper-name": "aw-job-3-init-container" + } + }, + "spec": { + "terminationGracePeriodSeconds": 1, + "restartPolicy": "Never", + "initContainers": [ + { + "name": "job-init-container", + "image": "k8s.gcr.io/busybox:latest", + "command": ["sleep", "200"], + "resources": { + "requests": { + "cpu": "500m" + } + } + } + ], + "containers": [ + { + "name": "job-container", + "image": "k8s.gcr.io/busybox:latest", + "command": ["sleep", "10"], + "resources": { + "requests": { + "cpu": "500m" + } + } + } + ] + } + } + }} `) + + var minAvailable int = 3 + + aw := &arbv1.AppWrapper{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: context.namespace, + }, + Spec: arbv1.AppWrapperSpec{ + SchedSpec: arbv1.SchedulingSpecTemplate{ + MinAvailable: minAvailable, + Requeuing: arbv1.RequeuingTemplate{ + TimeInSeconds: requeuingTimeInSeconds, + GrowthType: requeuingGrowthType, + MaxNumRequeuings: requeuingMaxNumRequeuings, + }, + }, + AggrResources: arbv1.AppWrapperResourceList{ + GenericItems: []arbv1.AppWrapperGenericResource{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: context.namespace, + }, + DesiredAvailable: 1, + GenericTemplate: runtime.RawExtension{ + Raw: rb, + }, + CompletionStatus: "Complete", + }, + }, + }, + }, + } + + appwrapper, err := context.karclient.ArbV1().AppWrappers(context.namespace).Create(aw) + Expect(err).NotTo(HaveOccurred()) + + return appwrapper +} + func createDeploymentAW(context *context, name string) *arbv1.AppWrapper { rb := []byte(`{"apiVersion": "apps/v1", "kind": "Deployment", @@ -2201,7 +2294,7 @@ func createGenericDeploymentWithCPUAW(context *context, name string, cpuDemand s return appwrapper } -func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int) *arbv1.AppWrapper { +func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int, requeuingTimeInSeconds int) *arbv1.AppWrapper { rb := []byte(fmt.Sprintf(`{ "apiVersion": "apps/v1", "kind": "Deployment", @@ -2260,6 +2353,9 @@ func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name s Spec: arbv1.AppWrapperSpec{ SchedSpec: arbv1.SchedulingSpecTemplate{ MinAvailable: schedSpecMin, + Requeuing: arbv1.RequeuingTemplate{ + TimeInSeconds: requeuingTimeInSeconds, + }, }, AggrResources: arbv1.AppWrapperResourceList{ GenericItems: []arbv1.AppWrapperGenericResource{