Skip to content

Commit

Permalink
tensorflow: Support old versions of estimator (#809)
Browse files Browse the repository at this point in the history
* tensorflow: Support old versions of estimator

Support Master spec similar to Chief

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* validation: Fix

Signed-off-by: Ce Gao <gaoce@caicloud.io>

* status: Add comments

Signed-off-by: Ce Gao <gaoce@caicloud.io>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Aug 30, 2018
1 parent b10551d commit 6815986
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/apis/tensorflow/v1alpha2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func setTypeNamesToCamelCase(tfJob *TFJob) {
setTypeNameToCamelCase(tfJob, TFReplicaTypePS)
setTypeNameToCamelCase(tfJob, TFReplicaTypeWorker)
setTypeNameToCamelCase(tfJob, TFReplicaTypeChief)
setTypeNameToCamelCase(tfJob, TFReplicaTypeMaster)
setTypeNameToCamelCase(tfJob, TFReplicaTypeEval)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/tensorflow/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ const (
// Else, worker:0 is the chief worker.
TFReplicaTypeChief TFReplicaType = "Chief"

// TFReplicaTypeMaster is the type for master worker of distributed TensorFlow.
// This is similar to chief, and kept just for backwards compatibility.
TFReplicaTypeMaster TFReplicaType = "Master"

// TFReplicaTypeEval is the type for evaluation replica in TensorFlow.
TFReplicaTypeEval TFReplicaType = "Evaluator"
)
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/tensorflow/v1alpha2/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha2

// IsChieforMaster returns true if the type is Master or Chief.
func IsChieforMaster(typ TFReplicaType) bool {
return typ == TFReplicaTypeChief || typ == TFReplicaTypeMaster
}
44 changes: 44 additions & 0 deletions pkg/apis/tensorflow/v1alpha2/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha2

import "testing"

func TestIsChieforMaster(t *testing.T) {
tc := []struct {
Type TFReplicaType
Expected bool
}{
{
Type: TFReplicaTypeChief,
Expected: true,
},
{
Type: TFReplicaTypeMaster,
Expected: true,
},
{
Type: TFReplicaTypeWorker,
Expected: false,
},
}

for _, c := range tc {
actual := IsChieforMaster(c.Type)
if actual != c.Expected {
t.Errorf("Expected %v; Got %v", c.Expected, actual)
}
}
}
19 changes: 15 additions & 4 deletions pkg/apis/tensorflow/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,22 @@ import (

// ValidateAlphaTwoTFJobSpec checks that the v1alpha2.TFJobSpec is valid.
func ValidateAlphaTwoTFJobSpec(c *tfv2.TFJobSpec) error {
if c.TFReplicaSpecs == nil {
return validateAlphaTwoReplicaSpecs(c.TFReplicaSpecs)
}

func validateAlphaTwoReplicaSpecs(specs map[tfv2.TFReplicaType]*tfv2.TFReplicaSpec) error {
if specs == nil {
return fmt.Errorf("TFJobSpec is not valid")
}
for rType, value := range c.TFReplicaSpecs {
foundChief := 0
for rType, value := range specs {
if value == nil || len(value.Template.Spec.Containers) == 0 {
return fmt.Errorf("TFJobSpec is not valid")
}
//Make sure the image is defined in the container
if tfv2.IsChieforMaster(rType) {
foundChief++
}
// Make sure the image is defined in the container.
numNamedTensorflow := 0
for _, container := range value.Template.Spec.Containers {
if container.Image == "" {
Expand All @@ -45,12 +53,15 @@ func ValidateAlphaTwoTFJobSpec(c *tfv2.TFJobSpec) error {
numNamedTensorflow++
}
}
//Make sure there has at least one container named "tensorflow"
// Make sure there has at least one container named "tensorflow".
if numNamedTensorflow == 0 {
log.Warnf("There is no container named tensorflow in %v", rType)
return fmt.Errorf("TFJobSpec is not valid")
}
}
if foundChief > 1 {
return fmt.Errorf("More than 1 chief/master found")
}
return nil
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/apis/tensorflow/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,29 @@ func TestValidateAlphaTwoTFJobSpec(t *testing.T) {
},
},
},
{
TFReplicaSpecs: map[tfv2.TFReplicaType]*tfv2.TFReplicaSpec{
tfv2.TFReplicaTypeChief: &tfv2.TFReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
},
},
},
tfv2.TFReplicaTypeMaster: &tfv2.TFReplicaSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{},
},
},
},
},
},
}
for _, c := range testCases {
err := ValidateAlphaTwoTFJobSpec(&c)
if err.Error() != "TFJobSpec is not valid" {
t.Error("Failed validate the alpha2.TFJobSpec")
if err == nil {
t.Error("Expected error got nil")
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v2/tfcontroller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func updateStatusSingle(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType,
tfjob.Status.StartTime = &now
}

if ContainChiefSpec(tfjob) {
if rtype == tfv1alpha2.TFReplicaTypeChief {
// If the TFJob contains Chief or Master spec, then we will update the status
// according to the Chief/Master spec.
if ContainChieforMasterSpec(tfjob) {
if tfv1alpha2.IsChieforMaster(rtype) {
if running > 0 {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller.v2/tfcontroller/tfcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1alpha2.TFJob) error {
initializeTFReplicaStatuses(tfjob, tfv1alpha2.TFReplicaTypeWorker)
initializeTFReplicaStatuses(tfjob, tfv1alpha2.TFReplicaTypePS)
initializeTFReplicaStatuses(tfjob, tfv1alpha2.TFReplicaTypeChief)
initializeTFReplicaStatuses(tfjob, tfv1alpha2.TFReplicaTypeMaster)
return tc.updateStatusHandler(tfjob)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/controller.v2/tfcontroller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ func GetPortFromTFJob(tfJob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType) (
return -1, errPortNotFound
}

func ContainChiefSpec(tfJob *tfv1alpha2.TFJob) bool {
// ContainChieforMasterSpec returns true if the tfjob contains chief or master spec.
func ContainChieforMasterSpec(tfJob *tfv1alpha2.TFJob) bool {
if _, ok := tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeChief]; ok {
return true
} else if _, ok := tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeMaster]; ok {
return true
}
return false
}

0 comments on commit 6815986

Please sign in to comment.