diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index a76d66ad91..721b137576 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -13,11 +13,9 @@ import ( tfv1alpha1 "github.com/tensorflow/k8s/pkg/apis/tensorflow/v1alpha1" log "github.com/golang/glog" - "github.com/golang/protobuf/proto" // TOOO(jlewi): Rename to apiErrors "github.com/tensorflow/k8s/pkg/apis/tensorflow/helper" "github.com/tensorflow/k8s/pkg/util" - batch "k8s.io/api/batch/v1" "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -177,7 +175,7 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { // Create the service. service := &v1.Service{ ObjectMeta: meta_v1.ObjectMeta{ - Name: s.jobName(index), + Name: s.genName(index), Labels: taskLabels, OwnerReferences: []meta_v1.OwnerReference{ helper.AsOwner(s.Job.job), @@ -200,7 +198,7 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { // If the job already exists do nothing. if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("Service %v already exists.", s.jobName(index)) + log.Infof("Service %v already exists.", s.genName(index)) } else { s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err) return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating service %v returned error.", createdService.ObjectMeta.Name), err}) @@ -231,38 +229,22 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py", "--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} } - // Make a copy of the template because we will modify it below. . - newPodSpecTemplate := s.Spec.Template.DeepCopy() - - newJ := &batch.Job{ + newP := &v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ - Name: s.jobName(index), + Name: s.genName(index), Labels: taskLabels, OwnerReferences: []meta_v1.OwnerReference{ helper.AsOwner(s.Job.job), }, }, - Spec: batch.JobSpec{ - Completions: proto.Int32(1), - Parallelism: proto.Int32(1), - Template: *newPodSpecTemplate, - }, - } - - if newJ.Spec.Template.ObjectMeta.Labels == nil { - newJ.Spec.Template.ObjectMeta.Labels = make(map[string]string) - } - - // Pods need to be tagged with the labels. - for k, v := range taskLabels { - newJ.Spec.Template.ObjectMeta.Labels[k] = v + Spec: *s.Spec.Template.Spec.DeepCopy(), } // Add TF_CONFIG environment variable. - for i, _ := range newJ.Spec.Template.Spec.Containers { + for i, _ := range newP.Spec.Containers { // We can't get c in the loop variable because that would be by value so our modifications // wouldn't have any effect. - c := &newJ.Spec.Template.Spec.Containers[i] + c := &newP.Spec.Containers[i] if tfv1alpha1.ContainerName(c.Name) != tfv1alpha1.TENSORFLOW { continue } @@ -275,20 +257,20 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { }) } - log.Infof("Creating Job: %v", newJ.ObjectMeta.Name) - createdJob, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Create(newJ) + log.Infof("Creating Pod: %v", newP.ObjectMeta.Name) + createdPod, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(newP) - // If the job already exists do nothing. + // If the Pod already exists do nothing. if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("%v already exists.", s.jobName(index)) + log.Infof("%v already exists.", s.genName(index)) } else { s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err) - return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Job %v returned error.", createdJob.ObjectMeta.Name), err}) + return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating Pod %v returned error.", createdPod.ObjectMeta.Name), err}) } } else { - s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created job: %v", createdJob.Name) + s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created Pod: %v", createdPod.Name) } } return nil @@ -352,11 +334,11 @@ func (s *TFReplicaSet) Delete() error { // Services doesn't support DeleteCollection so we delete them individually. // TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases. for index := int32(0); index < *s.Spec.Replicas; index++ { - log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName((index))) - err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(index), &meta_v1.DeleteOptions{}) + log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index))) + err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{}) if err != nil { - log.Errorf("Error deleting service %v; %v", s.jobName(index), err) + log.Errorf("Error deleting service %v; %v", s.genName(index), err) failures = true } } @@ -441,7 +423,7 @@ func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1a } func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState { - j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.jobName(index), meta_v1.GetOptions{}) + j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{}) if err != nil { return tfv1alpha1.ReplicaStateUnknown @@ -518,10 +500,10 @@ func (s *TFReplicaSet) GetStatus() (tfv1alpha1.TFReplicaStatus, error) { return status, nil } -func (s *TFReplicaSet) jobName(index int32) string { +func (s *TFReplicaSet) genName(index int32) string { // Truncate tfjob name to 40 characters // The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters - // Thus jobname(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces + // Thus genName(40 chars)-replicaType(6 chars)-runtimeId(4 chars)-index(4 chars), also leaving some spaces // See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md return fmt.Sprintf("%v-%v-%v-%v", fmt.Sprintf("%.40s", s.Job.job.ObjectMeta.Name), strings.ToLower(string(s.Spec.TFReplicaType)), s.Job.job.Spec.RuntimeId, index) } diff --git a/pkg/trainer/tensorboard.go b/pkg/trainer/tensorboard.go index 817ab48a67..cd76b7a63d 100644 --- a/pkg/trainer/tensorboard.go +++ b/pkg/trainer/tensorboard.go @@ -49,7 +49,7 @@ func (s *TBReplicaSet) Create() error { // create the service exposing TensorBoard service := &v1.Service{ ObjectMeta: meta_v1.ObjectMeta{ - Name: s.jobName(), + Name: s.genName(), Labels: s.Labels(), OwnerReferences: []meta_v1.OwnerReference{ helper.AsOwner(s.Job.job), @@ -76,7 +76,7 @@ func (s *TBReplicaSet) Create() error { // If the job already exists do nothing. if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("Service %v already exists.", s.jobName()) + log.Infof("Service %v already exists.", s.genName()) } else { return err } @@ -84,7 +84,7 @@ func (s *TBReplicaSet) Create() error { newD := &v1beta1.Deployment{ ObjectMeta: meta_v1.ObjectMeta{ - Name: s.jobName(), + Name: s.genName(), Labels: s.Labels(), OwnerReferences: []meta_v1.OwnerReference{ helper.AsOwner(s.Job.job), @@ -104,7 +104,7 @@ func (s *TBReplicaSet) Create() error { if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("%v already exists.", s.jobName()) + log.Infof("%v already exists.", s.genName()) } else { return err } @@ -116,19 +116,19 @@ func (s *TBReplicaSet) Delete() error { failures := false delProp := meta_v1.DeletePropagationForeground - log.V(1).Infof("Deleting deployment %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName()) - err := s.ClientSet.ExtensionsV1beta1().Deployments(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(), &meta_v1.DeleteOptions{ + log.V(1).Infof("Deleting deployment %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName()) + err := s.ClientSet.ExtensionsV1beta1().Deployments(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(), &meta_v1.DeleteOptions{ PropagationPolicy: &delProp, }) if err != nil { - log.Errorf("There was a problem deleting TensorBoard's deployment %v; %v", s.jobName(), err) + log.Errorf("There was a problem deleting TensorBoard's deployment %v; %v", s.genName(), err) failures = true } - log.V(1).Infof("Deleting service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName()) - err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(), &meta_v1.DeleteOptions{}) + log.V(1).Infof("Deleting service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName()) + err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(), &meta_v1.DeleteOptions{}) if err != nil { - log.Errorf("Error deleting service: %v; %v", s.jobName(), err) + log.Errorf("Error deleting service: %v; %v", s.genName(), err) failures = true } @@ -141,7 +141,7 @@ func (s *TBReplicaSet) Delete() error { func (s *TBReplicaSet) getDeploymentSpecTemplate(image string) v1.PodTemplateSpec { // TODO: make the TensorFlow image a parameter of the job operator. c := &v1.Container{ - Name: s.jobName(), + Name: s.genName(), Image: image, Command: []string{ "tensorboard", "--logdir", s.Spec.LogDir, "--host", "0.0.0.0", @@ -165,7 +165,7 @@ func (s *TBReplicaSet) getDeploymentSpecTemplate(image string) v1.PodTemplateSpe return v1.PodTemplateSpec{ ObjectMeta: meta_v1.ObjectMeta{ - Name: s.jobName(), + Name: s.genName(), Labels: s.Labels(), }, Spec: *ps, @@ -182,10 +182,10 @@ func (s *TBReplicaSet) Labels() KubernetesLabels { }) } -func (s *TBReplicaSet) jobName() string { +func (s *TBReplicaSet) genName() string { // Truncate tfjob name to 40 characters // The whole job name should be compliant with the DNS_LABEL spec, up to a max length of 63 characters - // Thus jobname(40 chars)-tensorboard(11 chars)-runtimeId(4 chars), also leaving some spaces + // Thus genName(40 chars)-tensorboard(11 chars)-runtimeId(4 chars), also leaving some spaces // See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/architecture/identifiers.md return fmt.Sprintf("%v-tensorboard-%v", fmt.Sprintf("%.40s", s.Job.job.ObjectMeta.Name), strings.ToLower(s.Job.job.Spec.RuntimeId)) } diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 5bc38ddcfd..dc2115515d 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -97,7 +97,7 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec { replicaNames := make([]string, 0, *p.Spec.Replicas) for i := int32(0); i < *p.Spec.Replicas; i++ { - replicaNames = append(replicaNames, fmt.Sprintf("%v:%v", p.jobName(i), *p.Spec.TFPort)) + replicaNames = append(replicaNames, fmt.Sprintf("%v:%v", p.genName(i), *p.Spec.TFPort)) } clusterSpec[strings.ToLower(string(p.Spec.TFReplicaType))] = replicaNames