diff --git a/cmd/tf_operator/main.go b/cmd/tf_operator/main.go index e7d573b69a..824cebe0c6 100644 --- a/cmd/tf_operator/main.go +++ b/cmd/tf_operator/main.go @@ -22,10 +22,10 @@ import ( "io/ioutil" + "github.com/jlewi/mlkube.io/pkg/spec" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" - "github.com/jlewi/mlkube.io/pkg/spec" ) var ( @@ -36,6 +36,7 @@ var ( chaosLevel int controllerConfigFile string printVersion bool + grpcServerFile string ) var ( @@ -52,7 +53,6 @@ func init() { flag.BoolVar(&printVersion, "version", false, "Show version and quit") flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "GC interval") flag.StringVar(&controllerConfigFile, "controller_config_file", "", "Path to file containing the controller config.") - flag.Parse() // Workaround for watching TPR resource. @@ -84,6 +84,7 @@ func init() { } else { log.Info("No controller_config_file provided; using empty config.") } + } func main() { diff --git a/examples/tf_job.yaml b/examples/tf_job.yaml index 6955973094..30d26c4be4 100644 --- a/examples/tf_job.yaml +++ b/examples/tf_job.yaml @@ -19,4 +19,6 @@ spec: containers: - image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff name: tensorflow - restartPolicy: OnFailure \ No newline at end of file + restartPolicy: OnFailure + - replicas: 2 + tfReplicaType: PS \ No newline at end of file diff --git a/glide.lock b/glide.lock index 71b5f61481..6cb62595ed 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: b9c34817e80d7914b889e41fb456c8b0595f707cf3d2923da3f7b2ac6a6de44a -updated: 2017-08-16T11:49:07.068368434-07:00 +updated: 2017-09-29T11:30:21.243398665-04:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 diff --git a/grpc_tensorflow_server/grpc_tensorflow_server.py b/grpc_tensorflow_server/grpc_tensorflow_server.py new file mode 100644 index 0000000000..703b64270a --- /dev/null +++ b/grpc_tensorflow_server/grpc_tensorflow_server.py @@ -0,0 +1,163 @@ +#!/usr/bin/python +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +TODO: Once grpc_tensorflow_server.py is included in tensorflow +docker image we should use it instead" + +Python-based TensorFlow GRPC server. + +Takes input arguments cluster_spec, job_name and task_id, and start a blocking +TensorFlow GRPC server. + +Usage: + grpc_tensorflow_server.py --cluster_spec=SPEC --job_name=NAME --task_id=ID + +Where: + SPEC is (,)* + JOB is |(;)* + NAME is a valid job name ([a-z][0-9a-z]*) + HOST is a hostname or IP address + PORT is a port number +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import sys + +from tensorflow.core.protobuf import config_pb2 +from tensorflow.core.protobuf import tensorflow_server_pb2 +from tensorflow.python.platform import app +from tensorflow.python.training import server_lib + + +def parse_cluster_spec(cluster_spec, cluster, verbose=False): + """Parse content of cluster_spec string and inject info into cluster protobuf. + + Args: + cluster_spec: cluster specification string, e.g., + "local|localhost:2222;localhost:2223" + cluster: cluster protobuf. + verbose: If verbose logging is requested. + + Raises: + ValueError: if the cluster_spec string is invalid. + """ + + job_strings = cluster_spec.split(",") + + if not cluster_spec: + raise ValueError("Empty cluster_spec string") + + for job_string in job_strings: + job_def = cluster.job.add() + + if job_string.count("|") != 1: + raise ValueError("Not exactly one instance of '|' in cluster_spec") + + job_name = job_string.split("|")[0] + + if not job_name: + raise ValueError("Empty job_name in cluster_spec") + + job_def.name = job_name + + if verbose: + print("Added job named \"%s\"" % job_name) + + job_tasks = job_string.split("|")[1].split(";") + for i in range(len(job_tasks)): + if not job_tasks[i]: + raise ValueError("Empty task string at position %d" % i) + + job_def.tasks[i] = job_tasks[i] + + if verbose: + print(" Added task \"%s\" to job \"%s\"" % (job_tasks[i], job_name)) + + +def main(unused_args): + # Create Protobuf ServerDef + server_def = tensorflow_server_pb2.ServerDef(protocol="grpc") + + # Cluster info + parse_cluster_spec(FLAGS.cluster_spec, server_def.cluster, FLAGS.verbose) + + # Job name + if not FLAGS.job_name: + raise ValueError("Empty job_name") + server_def.job_name = FLAGS.job_name + + # Task index + if FLAGS.task_id < 0: + raise ValueError("Invalid task_id: %d" % FLAGS.task_id) + server_def.task_index = FLAGS.task_id + + config = config_pb2.ConfigProto(gpu_options=config_pb2.GPUOptions( + per_process_gpu_memory_fraction=FLAGS.gpu_memory_fraction)) + + # Create GRPC Server instance + server = server_lib.Server(server_def, config=config) + + # join() is blocking, unlike start() + server.join() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.register("type", "bool", lambda v: v.lower() == "true") + parser.add_argument( + "--cluster_spec", + type=str, + default="", + help="""\ + Cluster spec: SPEC. SPEC is (,)*," JOB is + |(;)*," NAME is a valid job name + ([a-z][0-9a-z]*)," HOST is a hostname or IP address," PORT is a + port number." E.g., local|localhost:2222;localhost:2223, + ps|ps0:2222;ps1:2222\ + """ + ) + parser.add_argument( + "--job_name", + type=str, + default="", + help="Job name: e.g., local" + ) + parser.add_argument( + "--task_id", + type=int, + default=0, + help="Task index, e.g., 0" + ) + parser.add_argument( + "--gpu_memory_fraction", + type=float, + default=1.0, + help="Fraction of GPU memory allocated",) + parser.add_argument( + "--verbose", + type="bool", + nargs="?", + const=True, + default=False, + help="Verbose mode" + ) + + FLAGS, unparsed = parser.parse_known_args() + app.run(main=main, argv=[sys.argv[0]] + unparsed) \ No newline at end of file diff --git a/images/tf_operator/Dockerfile b/images/tf_operator/Dockerfile index 50a0048b10..52e10fc14d 100644 --- a/images/tf_operator/Dockerfile +++ b/images/tf_operator/Dockerfile @@ -5,6 +5,7 @@ RUN mkdir -p /opt/mlkube RUN mkdir -p /opt/mlkube/test COPY tf_operator /opt/mlkube COPY e2e /opt/mlkube/test +COPY grpc_tensorflow_server.py /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py RUN chmod a+x /opt/mlkube/tf_operator RUN chmod a+x /opt/mlkube/test/e2e diff --git a/images/tf_operator/build_and_push.py b/images/tf_operator/build_and_push.py index c76042947d..0bb0438a53 100755 --- a/images/tf_operator/build_and_push.py +++ b/images/tf_operator/build_and_push.py @@ -13,8 +13,9 @@ def GetGitHash(): # The image tag is based on the githash. - git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]) + git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode("utf-8") git_hash = git_hash.strip() + modified_files = subprocess.check_output(["git", "ls-files", "--modified"]) untracked_files = subprocess.check_output( ["git", "ls-files", "--others", "--exclude-standard"]) @@ -23,7 +24,8 @@ def GetGitHash(): sha = hashlib.sha256() sha.update(diff) diffhash = sha.hexdigest()[0:7] - git_hash = "{0}-dirty-{1}".format(git_hash, diffhash) + git_hash = "{0}-dirty-{1}".format(git_hash, diffhash) + return git_hash def run(command, cwd=None): @@ -57,6 +59,9 @@ def run(command, cwd=None): help="Use Google Container Builder to build the image.") parser.add_argument("--no-gcb", dest="use_gcb", action="store_false", help="Use Docker to build the image.") + parser.add_argument("--no-push", dest="should_push", action="store_false", + help="Do not push the image once build is finished.") + parser.set_defaults(use_gcb=False) args = parser.parse_args() @@ -90,6 +95,7 @@ def run(command, cwd=None): "images/tf_operator/Dockerfile", os.path.join(go_path, "bin/tf_operator"), os.path.join(go_path, "bin/e2e"), + "grpc_tensorflow_server/grpc_tensorflow_server.py" ] for s in sources: @@ -111,9 +117,11 @@ def run(command, cwd=None): "--tag=" + image, "--project=" + args.project ]) else: run(["docker", "build", "-t", image, context_dir]) - logging.info("Built image: %s", image) - run(["gcloud", "docker", "--", "push", image]) - logging.info("Pushed image: %s", image) + logging.info("Built image: %s", image) + + if args.should_push: + run(["gcloud", "docker", "--", "push", image]) + logging.info("Pushed image: %s", image) if args.output: logging.info("Writing build information to %s", args.output) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0a4d0842ea..65ca498f36 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,24 +6,25 @@ import ( "errors" "fmt" "io" - "k8s.io/client-go/kubernetes" - "github.com/jlewi/mlkube.io/pkg/spec" - "github.com/jlewi/mlkube.io/pkg/trainer" - "github.com/jlewi/mlkube.io/pkg/util/k8sutil" "net/http" "reflect" "sync" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "github.com/jlewi/mlkube.io/pkg/spec" + "github.com/jlewi/mlkube.io/pkg/trainer" + "github.com/jlewi/mlkube.io/pkg/util/k8sutil" + "k8s.io/client-go/kubernetes" + log "github.com/golang/glog" + "github.com/jlewi/mlkube.io/pkg/util" + v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kwatch "k8s.io/apimachinery/pkg/watch" k8sErrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" - "github.com/jlewi/mlkube.io/pkg/util" + kwatch "k8s.io/apimachinery/pkg/watch" ) var ( @@ -66,7 +67,7 @@ func New(kubeCli kubernetes.Interface, apiCli apiextensionsclient.Interface, tfJ return &Controller{ Namespace: ns, KubeCli: kubeCli, - ApiCli: apiCli, + ApiCli: apiCli, TfJobClient: tfJobClient, // TODO(jlewi)): What to do about cluster.Cluster? jobs: make(map[string]*trainer.TrainingJob), @@ -147,7 +148,7 @@ func (c *Controller) handleTfJobEvent(event *Event) error { //NewJob(kubeCli kubernetes.Interface, job spec.TfJob, stopC <-chan struct{}, wg *sync.WaitGroup) c.stopChMap[clus.Metadata.Name] = stopC - c.jobs[clus.Metadata.Namespace + "-" + clus.Metadata.Name] = nc + c.jobs[clus.Metadata.Namespace+"-"+clus.Metadata.Name] = nc c.jobRVs[clus.Metadata.Name] = clus.Metadata.ResourceVersion //case kwatch.Modified: @@ -158,10 +159,10 @@ func (c *Controller) handleTfJobEvent(event *Event) error { // c.jobRVs[clus.Metadata.Name] = clus.Metadata.ResourceVersion // case kwatch.Deleted: - if _, ok := c.jobs[clus.Metadata.Namespace + "-" + clus.Metadata.Name]; !ok { + if _, ok := c.jobs[clus.Metadata.Namespace+"-"+clus.Metadata.Name]; !ok { return fmt.Errorf("unsafe state. TfJob was never created but we received event (%s)", event.Type) } - c.jobs[clus.Metadata.Namespace + "-" + clus.Metadata.Name].Delete() + c.jobs[clus.Metadata.Namespace+"-"+clus.Metadata.Name].Delete() delete(c.jobs, clus.Metadata.Name) delete(c.jobRVs, clus.Metadata.Name) } @@ -193,7 +194,7 @@ func (c *Controller) findAllTfJobs() (string, error) { continue } c.stopChMap[clus.Metadata.Name] = stopC - c.jobs[clus.Metadata.Namespace + "-" + clus.Metadata.Name] = nc + c.jobs[clus.Metadata.Namespace+"-"+clus.Metadata.Name] = nc c.jobRVs[clus.Metadata.Name] = clus.Metadata.ResourceVersion } @@ -237,16 +238,16 @@ func (c *Controller) createCRD() error { Name: spec.CRDName(), }, Spec: v1beta1.CustomResourceDefinitionSpec{ - Group: spec.CRDGroup, + Group: spec.CRDGroup, Version: spec.CRDVersion, - Scope: v1beta1.NamespaceScoped, - Names: v1beta1.CustomResourceDefinitionNames{ - Plural: spec.CRDKindPlural, - // TODO(jlewi): Do we want to set the singular name? - // Kind is the serialized kind of the resource. It is normally CamelCase and singular. - Kind: reflect.TypeOf(spec.TfJob{}).Name(), - }, + Scope: v1beta1.NamespaceScoped, + Names: v1beta1.CustomResourceDefinitionNames{ + Plural: spec.CRDKindPlural, + // TODO(jlewi): Do we want to set the singular name? + // Kind is the serialized kind of the resource. It is normally CamelCase and singular. + Kind: reflect.TypeOf(spec.TfJob{}).Name(), }, + }, } _, err := c.ApiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) diff --git a/pkg/spec/controller.go b/pkg/spec/controller.go index d08e054a0b..446a989f85 100644 --- a/pkg/spec/controller.go +++ b/pkg/spec/controller.go @@ -5,6 +5,9 @@ type ControllerConfig struct { // This should match the value specified as a container limit. // e.g. alpha.kubernetes.io/nvidia-gpu Accelerators map[string]AcceleratorConfig + + // Path to the file containing the grpc server source + GrpcServerFilePath string } // AcceleratorVolume represents a host path that must be mounted into diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 79950f131a..63ce7ffd7e 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -6,23 +6,23 @@ import ( "fmt" "time" + "github.com/golang/protobuf/proto" + "github.com/jlewi/mlkube.io/pkg/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/util" - "github.com/golang/protobuf/proto" ) const ( - CRDKind = "TfJob" - CRDKindPlural = "tfjobs" - CRDGroup = "mlkube.io" - CRDVersion = "v1beta1" + CRDKind = "TfJob" + CRDKindPlural = "tfjobs" + CRDGroup = "mlkube.io" + CRDVersion = "v1beta1" // Value of the APP label that gets applied to a lot of entities. AppLabel = "tensorflow-job" // Defaults for the Spec - TfPort = 2222 + TfPort = 2222 Replicas = 1 ) @@ -60,6 +60,10 @@ type TfJobSpec struct { // ReplicaSpecs specifies the TF replicas to run. ReplicaSpecs []*TfReplicaSpec `json:"replicaSpecs"` + + // TfImage defines the tensorflow docker image that should be used for Tensorboard + // and the default parameter server + TfImage string `json:"tfImage,omitempty"` } // TfReplicaType determines how a set of TF processes are handled. @@ -75,7 +79,8 @@ const ( type ContainerName string const ( - TENSORFLOW ContainerName = "tensorflow" + TENSORFLOW ContainerName = "tensorflow" + DefaultTFImage = "tensorflow/tensorflow:1.3.0" ) // TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. @@ -91,6 +96,8 @@ type TfReplicaSpec struct { // TfPort is the port to use for TF services. TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TfReplicaType `json:"tfReplicaType"` + // IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server + IsDefaultPS bool } type TensorBoardSpec struct { @@ -106,7 +113,7 @@ func (c *TfJobSpec) Validate() error { // Check that each replica has a TensorFlow container. for _, r := range c.ReplicaSpecs { found := false - if r.Template == nil { + if r.Template == nil && r.TfReplicaType != PS { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } @@ -205,10 +212,14 @@ func (c *TfJobSpec) ConfigureAccelerators(accelerators map[string]AcceleratorCon // SetDefaults sets any unspecified values to defaults func (c *TfJobSpec) SetDefaults() error { + if c.TfImage == "" { + c.TfImage = DefaultTFImage + } + // Check that each replica has a TensorFlow container. for _, r := range c.ReplicaSpecs { - if r.Template == nil { - return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) + if r.Template == nil && r.TfReplicaType != PS { + return fmt.Errorf("ReplicaType: %v, Replica is missing Template; %v", r.TfReplicaType, util.Pformat(r)) } if r.TfPort == nil { @@ -222,6 +233,11 @@ func (c *TfJobSpec) SetDefaults() error { if r.Replicas == nil { r.Replicas = proto.Int32(Replicas) } + + //Set the default configuration for a PS server if the user didn't specify a PodTemplateSpec + if r.Template == nil && r.TfReplicaType == PS { + r.setDefaultPSPodTemplateSpec(c.TfImage) + } } return nil } @@ -233,6 +249,27 @@ func (c *TfJobSpec) Cleanup() { // We should have default container images so user doesn't have to provide these. } +func (r *TfReplicaSpec) setDefaultPSPodTemplateSpec(tfImage string) { + r.IsDefaultPS = true + r.Template = &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: tfImage, + Name: "tensorflow", + VolumeMounts: []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ps-config-volume", + MountPath: "/ps-server", + }, + }, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + } +} + type TfJobPhase string const ( diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index c8db5566a3..cb563362b4 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -5,9 +5,9 @@ import ( "testing" "github.com/gogo/protobuf/proto" + "github.com/jlewi/mlkube.io/pkg/util" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/util" ) func TestAddAccelertor(t *testing.T) { @@ -254,6 +254,7 @@ func TestSetDefaults(t *testing.T) { }, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, expected: &TfJobSpec{ ReplicaSpecs: []*TfReplicaSpec{ @@ -272,6 +273,45 @@ func TestSetDefaults(t *testing.T) { TfReplicaType: MASTER, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", + }, + }, + { + in: &TfJobSpec{ + ReplicaSpecs: []*TfReplicaSpec{ + { + TfReplicaType: PS, + }, + }, + TfImage: "tensorflow/tensorflow:1.3.0", + }, + expected: &TfJobSpec{ + ReplicaSpecs: []*TfReplicaSpec{ + { + Replicas: proto.Int32(1), + TfPort: proto.Int32(2222), + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: "tensorflow/tensorflow:1.3.0", + Name: "tensorflow", + VolumeMounts: []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ps-config-volume", + MountPath: "/ps-server", + }, + }, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + TfReplicaType: PS, + IsDefaultPS: true, + }, + }, + TfImage: "tensorflow/tensorflow:1.3.0", }, }, } @@ -284,4 +324,4 @@ func TestSetDefaults(t *testing.T) { t.Errorf("Want\n%v; Got\n %v", util.Pformat(c.expected), util.Pformat(c.in)) } } -} \ No newline at end of file +} diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 0d9803d78f..20b93e4db7 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -4,20 +4,24 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" + "sort" "strings" + "github.com/jlewi/mlkube.io/pkg/util/k8sutil" + "github.com/jlewi/mlkube.io/pkg/spec" log "github.com/golang/glog" "github.com/golang/protobuf/proto" // TOOO(jlewi): Rename to apiErrors + "github.com/jlewi/mlkube.io/pkg/util" k8s_errors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sErrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" - k8sErrors "k8s.io/apimachinery/pkg/util/errors" batch "k8s.io/client-go/pkg/apis/batch/v1" - "github.com/jlewi/mlkube.io/pkg/util" ) // TFReplicaSet is a set of TF processes all acting as the same role (e.g. worker @@ -53,8 +57,8 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, tfReplicaSpec spec.TfReplic return nil, errors.New("tfReplicaSpec.TfPort can't be nil.") } - if tfReplicaSpec.Template == nil { - return nil, errors.New("tfReplicaSpec.Template can't be nil.") + if tfReplicaSpec.Template == nil && tfReplicaSpec.TfReplicaType != spec.PS { + return nil, fmt.Errorf("tfReplicaSpec.Template can't be nil for replica type %v.", tfReplicaSpec.TfReplicaType) } // Make sure the replica type is valid. @@ -71,6 +75,7 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, tfReplicaSpec spec.TfReplic if !isValidReplicaType { return nil, fmt.Errorf("tfReplicaSpec.TfReplicaType is %v but must be one of %v", tfReplicaSpec.TfReplicaType, validReplicaTypes) } + return &TFReplicaSet{ ClientSet: clientSet, Job: job, @@ -88,7 +93,56 @@ func (s *TFReplicaSet) Labels() KubernetesLabels { "runtime_id": s.Job.job.Spec.RuntimeId}) } -func (s *TFReplicaSet) Create() error { +// Transforms the tfconfig to work with grpc_tensorflow_server +func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { + + // sort by keys to make unit testing easier + keys := []string{} + for k := range clusterSpec { + keys = append(keys, k) + } + sort.Strings(keys) + + jobs := []string{} + for _, jobType := range keys { + hosts := []string{} + for _, h := range clusterSpec[jobType] { + hosts = append(hosts, h) + } + s := jobType + "|" + strings.Join(hosts, ";") + jobs = append(jobs, s) + } + + return strings.Join(jobs, ",") +} + +func (s *TFReplicaSet) Create(config *spec.ControllerConfig) error { + if s.Spec.IsDefaultPS { + // Create the ConfigMap containing the sources for the default Parameter Server + err, cm := s.getDefaultPSConfigMap(config) + if err != nil { + log.Errorf("Error building PS ConfigMap: %v", err) + return err + } + _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Create(cm) + if err != nil { + log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + return err + } + + // Update Volumes to include the ConfigMap containing grpc_tensorflow_server.py + s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: s.defaultPSConfigMapName(), + }, + }, + }, + }) + } + for index := int32(0); index < *s.Spec.Replicas; index++ { taskLabels := s.Labels() taskLabels["task_index"] = fmt.Sprintf("%v", index) @@ -140,6 +194,11 @@ func (s *TFReplicaSet) Create() error { return err } + if s.Spec.IsDefaultPS { + cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec()) + s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py", "--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} + } + // Make a copy of the template because we will modify it below. // TODO(jlewi): I don't fully understand why this works but setting Template: *s.Spec.Template // leads to TF_CONFIG being added multiples as an environment variable. @@ -199,6 +258,31 @@ func (s *TFReplicaSet) Create() error { return nil } +// Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) +// that will be used as default PS +func (s *TFReplicaSet) getDefaultPSConfigMap(config *spec.ControllerConfig) (error, *v1.ConfigMap) { + cm := &v1.ConfigMap{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: s.defaultPSConfigMapName(), + }, + Data: make(map[string]string), + } + + //grab server sources from files + filePaths := map[string]string{ + "grpc_tensorflow_server.py": config.GrpcServerFilePath, + } + for n, fp := range filePaths { + data, err := ioutil.ReadFile(fp) + if err != nil { + return err, nil + } + cm.Data[n] = string(data) + } + + return nil, cm +} + // Delete deletes the replicas func (s *TFReplicaSet) Delete() error { selector, err := s.Labels().ToSelector() @@ -237,6 +321,16 @@ func (s *TFReplicaSet) Delete() error { } } + // If the ConfigMap for the default parameter server exists, we delete it + _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) + if err != nil { + if !k8sutil.IsKubernetesResourceNotFoundError(err) { + log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) + } + } else { + s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) + } + if failures { return errors.New("Some of the replicas resources could not be deleted") } @@ -278,7 +372,6 @@ func replicaStatusFromPodList(l v1.PodList, name spec.ContainerName) spec.Replic } } - if tfState.Running != nil || tfState.Waiting != nil { return spec.ReplicaStateRunning } @@ -288,7 +381,6 @@ func replicaStatusFromPodList(l v1.PodList, name spec.ContainerName) spec.Replic return spec.ReplicaStateSucceeded } - if isRetryableTerminationState(tfState.Terminated) { // Since its a retryable error just return RUNNING. // We can just let Kubernetes restart the container to retry. @@ -384,3 +476,7 @@ func (s *TFReplicaSet) GetStatus() (spec.TfReplicaStatus, error) { func (s *TFReplicaSet) jobName(index int32) string { return fmt.Sprintf("%v-%v-%v", strings.ToLower(string(s.Spec.TfReplicaType)), s.Job.job.Spec.RuntimeId, index) } + +func (s *TFReplicaSet) defaultPSConfigMapName() string { + return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId) +} diff --git a/pkg/trainer/replicas_test.go b/pkg/trainer/replicas_test.go index e45382e964..9a31646f80 100644 --- a/pkg/trainer/replicas_test.go +++ b/pkg/trainer/replicas_test.go @@ -10,11 +10,11 @@ import ( "sync" "time" + "github.com/jlewi/mlkube.io/pkg/spec" + tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/spec" - tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" ) func TestTFReplicaSet(t *testing.T) { @@ -49,7 +49,7 @@ func TestTFReplicaSet(t *testing.T) { t.Fatalf("NewTFReplicaSet failed: %v", err) } - if err := replica.Create(); err != nil { + if err := replica.Create(&spec.ControllerConfig{}); err != nil { t.Fatalf("replica.Create() error; %v", err) } @@ -270,3 +270,19 @@ func TestTFReplicaSetStatusFromPodList(t *testing.T) { } } } + +func TestTransformClusterSpecForDefaultPS(t *testing.T) { + + cs := ClusterSpec{ + "master": {"master-0:2222"}, + "worker": {"worker-0:2222", "worker-1:2222"}, + "ps": {"localhost:2222", "ps-1:2222"}, + } + expected := "master|master-0:2222,ps|localhost:2222;ps-1:2222,worker|worker-0:2222;worker-1:2222" + + tx := transformClusterSpecForDefaultPS(cs) + + if tx != expected { + t.Errorf("transformClusterSpecForDefaultPS() expected: %v, received: %v", expected, tx) + } +} diff --git a/pkg/trainer/tensorboard.go b/pkg/trainer/tensorboard.go index cc1a749cd3..8b1998b9d2 100644 --- a/pkg/trainer/tensorboard.go +++ b/pkg/trainer/tensorboard.go @@ -88,7 +88,7 @@ func (s *TBReplicaSet) Create() error { MatchLabels: s.Labels(), }, Replicas: proto.Int32(1), - Template: s.getDeploymentSpecTemplate(), + Template: s.getDeploymentSpecTemplate(s.Job.job.Spec.TfImage), }, } @@ -129,11 +129,11 @@ func (s *TBReplicaSet) Delete() error { return nil } -func (s *TBReplicaSet) getDeploymentSpecTemplate() v1.PodTemplateSpec { +func (s *TBReplicaSet) getDeploymentSpecTemplate(image string) v1.PodTemplateSpec { // TODO: make the TensorFlow image a parameter of the job operator. c := &v1.Container{ Name: s.jobName(), - Image: "tensorflow/tensorflow", + Image: image, Command: []string{ "tensorboard", "--logdir", s.Spec.LogDir, "--host", "0.0.0.0", }, diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 2e9b87b8ef..d40009480e 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -24,10 +24,6 @@ import ( "k8s.io/client-go/pkg/api/v1" ) -const ( - NAMESPACE string = "default" -) - var ( reconcileInterval = 8 * time.Second ) @@ -123,7 +119,7 @@ func NewJob(kubeCli kubernetes.Interface, tfJobClient k8sutil.TfJobClient, job * } return } - j.run(stopC) + j.run(config, stopC) }() return j, nil @@ -146,9 +142,9 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec { } // createResources creates all the replicas and TensorBoard if requested -func (j *TrainingJob) createResources() error { +func (j *TrainingJob) createResources(config *spec.ControllerConfig) error { for _, r := range j.Replicas { - if err := r.Create(); err != nil { + if err := r.Create(config); err != nil { return err } } @@ -408,7 +404,7 @@ func (j *TrainingJob) updateTPRStatus() error { return nil } -func (j *TrainingJob) run(stopC <-chan struct{}) { +func (j *TrainingJob) run(config *spec.ControllerConfig, stopC <-chan struct{}) { // TODO(jlewi): What does the run function do? clusterFailed := false @@ -468,7 +464,7 @@ func (j *TrainingJob) run(stopC <-chan struct{}) { // now we always call Create. if j.job.Status.Phase == spec.TfJobPhaseRunning { // We call Create to make sure all the resources exist and are running. - if cErr := j.createResources(); cErr != nil { + if cErr := j.createResources(config); cErr != nil { log.Errorf("trainingJobCreateReplicas() error; %v", cErr) } diff --git a/test/e2e/main.go b/test/e2e/main.go index 65389cd57f..ac0685b535 100644 --- a/test/e2e/main.go +++ b/test/e2e/main.go @@ -73,17 +73,6 @@ func run() error { Replicas: proto.Int32(1), TfPort: proto.Int32(2222), TfReplicaType: spec.PS, - Template: &v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "tensorflow", - Image: *image, - }, - }, - RestartPolicy: v1.RestartPolicyOnFailure, - }, - }, }, { Replicas: proto.Int32(1), diff --git a/tf-job-operator-chart/templates/deployment.yaml b/tf-job-operator-chart/templates/deployment.yaml index 6de0fefa81..9320afe0df 100644 --- a/tf-job-operator-chart/templates/deployment.yaml +++ b/tf-job-operator-chart/templates/deployment.yaml @@ -5,6 +5,7 @@ metadata: namespace: default data: controller_config_file.yaml: | + grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py accelerators: alpha.kubernetes.io/nvidia-gpu: volumes: