Skip to content

Commit

Permalink
Merge ed04956 into 4e7bd93
Browse files Browse the repository at this point in the history
  • Loading branch information
gaocegege committed Mar 26, 2018
2 parents 4e7bd93 + ed04956 commit 679964c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
10 changes: 5 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func (tc *TFJobController) syncTFJob(key string) (bool, error) {
// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFJobController) reconcileTFJobs(tfjob *tfv1alpha2.TFJob) error {
log.Infof("Reconcile TFJobs %s", tfjob.Name)

pods, err := tc.getPodsForTFJob(tfjob)

Expand Down Expand Up @@ -398,7 +399,8 @@ func (tc *TFJobController) reconcileTFJobs(tfjob *tfv1alpha2.TFJob) error {
}
}

return nil
// TODO(CPH): Add check here, no need to update the tfjob if the status hasn't changed since last time.
return tc.updateStatusHandler(tfjob)
}

func genGeneralName(tfjobKey, rtype, index string) string {
Expand Down Expand Up @@ -445,8 +447,7 @@ func (tc *TFJobController) addTFJob(obj interface{}) {
scheme.Scheme.Default(tfjob)

// Leave a created condition.
newTFJob := tfjob.DeepCopy()
err := tc.updateTFJobConditions(newTFJob, tfv1alpha2.TFJobCreated, tfJobCreatedReason, msg)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobCreated, tfJobCreatedReason, msg)
if err != nil {
log.Infof("Append tfjob condition error: %v", err)
return
Expand All @@ -470,8 +471,7 @@ func (tc *TFJobController) updateTFJobStatus(tfjob *tfv1alpha2.TFJob) error {
func (tc *TFJobController) updateTFJobConditions(tfjob *tfv1alpha2.TFJob, conditionType tfv1alpha2.TFJobConditionType, reason, message string) error {
condition := newCondition(conditionType, reason, message)
setCondition(&tfjob.Status, condition)
err := tc.updateStatusHandler(tfjob)
return err
return nil
}

// resolveControllerRef returns the tfjob referenced by a ControllerRef,
Expand Down
9 changes: 1 addition & 8 deletions pkg/controller/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (tc *TFJobController) reconcilePods(
if len(activePods) == int(*spec.Replicas) && rtype == tfv1alpha2.TFReplicaTypeWorker {
now := metav1.Now()
tfjob.Status.StartTime = &now
err := tc.updateStatusHandler(tfjob)
if err != nil {
log.Infof("Set tfjob start time error: %v", err)
return err
}
}

// Some workers or pss are failed , leave a failed condition.
Expand Down Expand Up @@ -186,9 +181,7 @@ func (tc *TFJobController) reconcilePods(
tfjob.Status.TFReplicaStatuses[rtype].Active = expected
tfjob.Status.TFReplicaStatuses[rtype].Succeeded = succeeded
tfjob.Status.TFReplicaStatuses[rtype].Failed = failed

// TODO(CPH): Add check here, no need to update the tfjob if the status hasn't changed since last time.
return tc.updateStatusHandler(tfjob)
return nil
}

// getDiffPodIndexes checks and gets diff indexes from desired and current.
Expand Down
33 changes: 24 additions & 9 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ const (
threadCount = 1
)

var alwaysReady = func() bool { return true }
var (
alwaysReady = func() bool { return true }

tfJobRunning = tfv1alpha2.TFJobRunning
tfJobSucceeded = tfv1alpha2.TFJobSucceeded
)

func newTFJobControllerFromClient(kubeClientSet kubeclientset.Interface, tfJobClientSet tfjobclientset.Interface, resyncPeriod ResyncPeriodFunc) (*TFJobController, kubeinformers.SharedInformerFactory, tfjobinformers.SharedInformerFactory) {
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, resyncPeriod())
Expand Down Expand Up @@ -152,6 +157,9 @@ func TestNormalPath(t *testing.T) {

expectedCondition *tfv1alpha2.TFJobConditionType
expectedConditionReason string

// There are some cases that should not check start time since the field should be set in the previous sync loop.
needCheckStartTime bool
}{
"Local TFJob is created": {
1, 0,
Expand All @@ -162,7 +170,9 @@ func TestNormalPath(t *testing.T) {
1, 0, 1,
1, 0, 0,
0, 0, 0,
// We can not check if it is created since the condition is set in addTFJob.
nil, "",
false,
},
"Distributed TFJob (4 workers, 2 PS) is created": {
4, 2,
Expand All @@ -174,6 +184,7 @@ func TestNormalPath(t *testing.T) {
4, 0, 0,
2, 0, 0,
nil, "",
false,
},
"Distributed TFJob (4 workers, 2 PS) is created and all replicas are pending": {
4, 2,
Expand All @@ -184,7 +195,8 @@ func TestNormalPath(t *testing.T) {
0, 0, 0,
4, 0, 0,
2, 0, 0,
nil, "",
&tfJobRunning, tfJobRunningReason,
false,
},
"Distributed TFJob (4 workers, 2 PS) is created and all replicas are running": {
4, 2,
Expand All @@ -195,7 +207,8 @@ func TestNormalPath(t *testing.T) {
0, 0, 0,
4, 0, 0,
2, 0, 0,
nil, "",
&tfJobRunning, tfJobRunningReason,
true,
},
"Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending": {
4, 2,
Expand All @@ -207,6 +220,7 @@ func TestNormalPath(t *testing.T) {
4, 0, 0,
2, 0, 0,
nil, "",
false,
},
"Distributed TFJob (4 workers, 2 PS) is created, 2 workers, 1 PS are pending, 1 worker is running": {
4, 2,
Expand All @@ -217,7 +231,8 @@ func TestNormalPath(t *testing.T) {
2, 0, 2,
4, 0, 0,
2, 0, 0,
nil, "",
&tfJobRunning, tfJobRunningReason,
false,
},
"Distributed TFJob (4 workers, 2 PS) is succeeded": {
4, 2,
Expand All @@ -228,7 +243,8 @@ func TestNormalPath(t *testing.T) {
0, 0, 0,
0, 4, 0,
0, 2, 0,
nil, "",
&tfJobSucceeded, tfJobSucceededReason,
false,
},
}

Expand Down Expand Up @@ -341,11 +357,10 @@ func TestNormalPath(t *testing.T) {
t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailedPSPods, actual.Status.TFReplicaStatuses[tfv1alpha2.TFReplicaTypePS].Failed)
}
}
// TODO(gaocegege): Set StartTime for the status.
// Validate StartTime.
// if actual.Status.StartTime == nil {
// t.Errorf("%s: .status.startTime was not set", name)
// }
if tc.needCheckStartTime && actual.Status.StartTime == nil {
t.Errorf("%s: StartTime was not set", name)
}
// Validate conditions.
if tc.expectedCondition != nil && !checkCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
Expand Down

0 comments on commit 679964c

Please sign in to comment.