Skip to content

Commit

Permalink
Deprecated pointer, use ptr instead (#627)
Browse files Browse the repository at this point in the history
Signed-off-by: kuizhiqing <kuizhiqing@msn.com>
  • Loading branch information
kuizhiqing committed Feb 27, 2024
1 parent a6c2da8 commit f92b9c7
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 59 deletions.
9 changes: 4 additions & 5 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -673,7 +672,7 @@ func (c *MPIJobController) syncHandler(key string) error {
if launcher != nil {
if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) {
// align the suspension state of launcher with the MPIJob
launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob))
launcher.Spec.Suspend = ptr.To(isMPIJobSuspended(mpiJob))
if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil {
return err
}
Expand Down Expand Up @@ -998,11 +997,11 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
}

func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool {
return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false)
return ptr.Deref(mpiJob.Spec.RunPolicy.Suspend, false)
}

func isJobSuspended(job *batchv1.Job) bool {
return pointer.BoolDeref(job.Spec.Suspend, false)
return ptr.Deref(job.Spec.Suspend, false)
}

func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
Expand Down Expand Up @@ -1486,7 +1485,7 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
},
}
if isMPIJobSuspended(mpiJob) {
job.Spec.Suspend = pointer.Bool(true)
job.Spec.Suspend = ptr.To(true)
}
return job
}
Expand Down
29 changes: 14 additions & 15 deletions pkg/controller/mpi_job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
clocktesting "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -804,7 +803,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
// create a suspended job
var replicas int32 = 8
mpiJob := newMPIJob("test", &replicas, nil, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)
mpiJob.Spec.MPIImplementation = implementation
f.setUpMPIJob(mpiJob)

Expand All @@ -823,7 +822,7 @@ func TestCreateSuspendedMPIJob(t *testing.T) {
// expect creating of the launcher
fmjc := f.newFakeMPIJobController()
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(true)
launcher.Spec.Suspend = ptr.To(true)
f.expectCreateJobAction(launcher)

// expect an update to add the conditions
Expand Down Expand Up @@ -851,7 +850,7 @@ func TestSuspendedRunningMPIJob(t *testing.T) {
var replicas int32 = 8
startTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(false)
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
Expand Down Expand Up @@ -893,18 +892,18 @@ func TestSuspendedRunningMPIJob(t *testing.T) {

// setup launcher and its pod
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(false)
launcher.Spec.Suspend = ptr.To(false)
launcherPod := mockJobPod(launcher)
launcherPod.Status.Phase = corev1.PodRunning
f.setUpLauncher(launcher)
f.setUpPod(launcherPod)

// transition the MPIJob into suspended state
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)

// expect moving the launcher pod into suspended state
launcherCopy := launcher.DeepCopy()
launcherCopy.Spec.Suspend = pointer.Bool(true)
launcherCopy.Spec.Suspend = ptr.To(true)
f.expectUpdateJobAction(launcherCopy)

// expect removal of the pods
Expand Down Expand Up @@ -939,7 +938,7 @@ func TestResumeMPIJob(t *testing.T) {
var replicas int32 = 8
startTime := metav1.Now()
mpiJob := newMPIJob("test", &replicas, &startTime, nil)
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(true)
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, corev1.ConditionTrue, mpiJobCreatedReason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, corev1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended")
Expand All @@ -966,14 +965,14 @@ func TestResumeMPIJob(t *testing.T) {
// expect creating of the launcher
fmjc := f.newFakeMPIJobController()
launcher := fmjc.newLauncherJob(mpiJob)
launcher.Spec.Suspend = pointer.Bool(true)
launcher.Spec.Suspend = ptr.To(true)
f.setUpLauncher(launcher)

// move the timer by a second so that the StartTime is updated after resume
fakeClock.Sleep(time.Second)

// resume the MPIJob
mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false)
mpiJob.Spec.RunPolicy.Suspend = ptr.To(false)

// expect creation of the pods
for i := 0; i < int(replicas); i++ {
Expand All @@ -983,7 +982,7 @@ func TestResumeMPIJob(t *testing.T) {

// expect the launcher update to resume it
launcherCopy := launcher.DeepCopy()
launcherCopy.Spec.Suspend = pointer.Bool(false)
launcherCopy.Spec.Suspend = ptr.To(false)
f.expectUpdateJobAction(launcherCopy)

// expect an update to add the conditions
Expand Down Expand Up @@ -1545,7 +1544,7 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
RunLauncherAsWorker: ptr.To(true),
},
},
workerReplicas: 2,
Expand All @@ -1570,7 +1569,7 @@ func TestNewConfigMap(t *testing.T) {
},
Spec: kubeflow.MPIJobSpec{
MPIImplementation: kubeflow.MPIImplementationOpenMPI,
RunLauncherAsWorker: pointer.Bool(true),
RunLauncherAsWorker: ptr.To(true),
},
},
workerReplicas: 0,
Expand Down Expand Up @@ -1618,7 +1617,7 @@ func TestNewConfigMap(t *testing.T) {
Namespace: "project-x",
},
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
SlotsPerWorker: ptr.To[int32](10),
MPIImplementation: kubeflow.MPIImplementationIntel,
},
},
Expand All @@ -1643,7 +1642,7 @@ func TestNewConfigMap(t *testing.T) {
Namespace: "project-x",
},
Spec: kubeflow.MPIJobSpec{
SlotsPerWorker: pointer.Int32(10),
SlotsPerWorker: ptr.To[int32](10),
MPIImplementation: kubeflow.MPIImplementationMPICH,
},
},
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
schedulinglisters "k8s.io/client-go/listers/scheduling/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
schedinformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *SchedulerPluginsCtrl) newPodGroup(mpiJob *kubeflow.MPIJob) metav1.Objec
if mpiJob == nil {
return nil
}
scheduleTimeoutSec := pointer.Int32(0)
scheduleTimeoutSec := ptr.To[int32](0)
if schedPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedPolicy != nil && schedPolicy.ScheduleTimeoutSeconds != nil {
scheduleTimeoutSec = schedPolicy.ScheduleTimeoutSeconds
}
Expand Down Expand Up @@ -364,9 +364,9 @@ func calPGMinResource(minMember *int32, mpiJob *kubeflow.MPIJob, pcLister schedu
klog.Warningf("Couldn't find the worker replicas")
return nil
}
order[wIndex].Replicas = pointer.Int32(*minMember - 1)
order[wIndex].Replicas = ptr.To(*minMember - 1)
} else {
order[1].Replicas = pointer.Int32(*minMember - 1)
order[1].Replicas = ptr.To(*minMember - 1)
}
}

Expand All @@ -390,7 +390,7 @@ func calculateMinAvailable(mpiJob *kubeflow.MPIJob) *int32 {
if schedulingPolicy := mpiJob.Spec.RunPolicy.SchedulingPolicy; schedulingPolicy != nil && schedulingPolicy.MinAvailable != nil {
return schedulingPolicy.MinAvailable
}
return pointer.Int32(workerReplicas(mpiJob) + 1)
return ptr.To(workerReplicas(mpiJob) + 1)
}

// calculatePriorityClassName calculates the priorityClass name needed for podGroup according to the following priorities:
Expand Down
Loading

0 comments on commit f92b9c7

Please sign in to comment.