Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter Server: Run TF server by default #36

Merged
merged 18 commits into from
Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/tf_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ spec:
containers:
- image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff
name: tensorflow
restartPolicy: OnFailure
restartPolicy: OnFailure
- replicas: 2
tfReplicaType: PS
162 changes: 162 additions & 0 deletions grpc_tensorflow_server/grpc_tensorflow_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
""" TODO: Once grpc_tensorflow_server.py is included in tensorflow
docker image we should use it instead" """

#!/usr/bin/python
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Python-based TensorFlow GRPC server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO indicating that we'd eventually like to get rid of this once grpc_tensorflow_server is included in the TF container.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


Takes input arguments cluster_spec, job_name and task_id, and start a blocking
TensorFlow GRPC server.

Usage:
grpc_tensorflow_server.py --cluster_spec=SPEC --job_name=NAME --task_id=ID

Where:
SPEC is <JOB>(,<JOB>)*
JOB is <NAME>|<HOST:PORT>(;<HOST:PORT>)*
NAME is a valid job name ([a-z][0-9a-z]*)
HOST is a hostname or IP address
PORT is a port number
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import sys

from tensorflow.core.protobuf import config_pb2
from tensorflow.core.protobuf import tensorflow_server_pb2
from tensorflow.python.platform import app
from tensorflow.python.training import server_lib


def parse_cluster_spec(cluster_spec, cluster, verbose=False):
"""Parse content of cluster_spec string and inject info into cluster protobuf.

Args:
cluster_spec: cluster specification string, e.g.,
"local|localhost:2222;localhost:2223"
cluster: cluster protobuf.
verbose: If verbose logging is requested.

Raises:
ValueError: if the cluster_spec string is invalid.
"""

job_strings = cluster_spec.split(",")

if not cluster_spec:
raise ValueError("Empty cluster_spec string")

for job_string in job_strings:
job_def = cluster.job.add()

if job_string.count("|") != 1:
raise ValueError("Not exactly one instance of '|' in cluster_spec")

job_name = job_string.split("|")[0]

if not job_name:
raise ValueError("Empty job_name in cluster_spec")

job_def.name = job_name

if verbose:
print("Added job named \"%s\"" % job_name)

job_tasks = job_string.split("|")[1].split(";")
for i in range(len(job_tasks)):
if not job_tasks[i]:
raise ValueError("Empty task string at position %d" % i)

job_def.tasks[i] = job_tasks[i]

if verbose:
print(" Added task \"%s\" to job \"%s\"" % (job_tasks[i], job_name))


def main(unused_args):
# Create Protobuf ServerDef
server_def = tensorflow_server_pb2.ServerDef(protocol="grpc")

# Cluster info
parse_cluster_spec(FLAGS.cluster_spec, server_def.cluster, FLAGS.verbose)

# Job name
if not FLAGS.job_name:
raise ValueError("Empty job_name")
server_def.job_name = FLAGS.job_name

# Task index
if FLAGS.task_id < 0:
raise ValueError("Invalid task_id: %d" % FLAGS.task_id)
server_def.task_index = FLAGS.task_id

config = config_pb2.ConfigProto(gpu_options=config_pb2.GPUOptions(
per_process_gpu_memory_fraction=FLAGS.gpu_memory_fraction))

# Create GRPC Server instance
server = server_lib.Server(server_def, config=config)

# join() is blocking, unlike start()
server.join()


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
parser.add_argument(
"--cluster_spec",
type=str,
default="",
help="""\
Cluster spec: SPEC. SPEC is <JOB>(,<JOB>)*," JOB is
<NAME>|<HOST:PORT>(;<HOST:PORT>)*," NAME is a valid job name
([a-z][0-9a-z]*)," HOST is a hostname or IP address," PORT is a
port number." E.g., local|localhost:2222;localhost:2223,
ps|ps0:2222;ps1:2222\
"""
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="Job name: e.g., local"
)
parser.add_argument(
"--task_id",
type=int,
default=0,
help="Task index, e.g., 0"
)
parser.add_argument(
"--gpu_memory_fraction",
type=float,
default=1.0,
help="Fraction of GPU memory allocated",)
parser.add_argument(
"--verbose",
type="bool",
nargs="?",
const=True,
default=False,
help="Verbose mode"
)

FLAGS, unparsed = parser.parse_known_args()
app.run(main=main, argv=[sys.argv[0]] + unparsed)
88 changes: 70 additions & 18 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ import (
"errors"
"fmt"
"io"
"k8s.io/client-go/kubernetes"
"github.com/jlewi/mlkube.io/pkg/spec"
"github.com/jlewi/mlkube.io/pkg/trainer"
"github.com/jlewi/mlkube.io/pkg/util/k8sutil"
"io/ioutil"
"net/http"
"reflect"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"github.com/jlewi/mlkube.io/pkg/spec"
"github.com/jlewi/mlkube.io/pkg/trainer"
"github.com/jlewi/mlkube.io/pkg/util/k8sutil"
"k8s.io/client-go/kubernetes"

log "github.com/golang/glog"
"github.com/jlewi/mlkube.io/pkg/util"
v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kwatch "k8s.io/apimachinery/pkg/watch"
k8sErrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/jlewi/mlkube.io/pkg/util"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/pkg/api/v1"
)

var (
Expand Down Expand Up @@ -66,7 +69,7 @@ func New(kubeCli kubernetes.Interface, apiCli apiextensionsclient.Interface, tfJ
return &Controller{
Namespace: ns,
KubeCli: kubeCli,
ApiCli: apiCli,
ApiCli: apiCli,
TfJobClient: tfJobClient,
// TODO(jlewi)): What to do about cluster.Cluster?
jobs: make(map[string]*trainer.TrainingJob),
Expand Down Expand Up @@ -226,25 +229,74 @@ func (c *Controller) initResource() (string, error) {
return "", fmt.Errorf("fail to create CRD: %v", err)
}
}

err = c.createPSConfigMap()
if err != nil {
log.Errorf("createPSConfigMap() returned error: %v", err)
}

return watchVersion, nil
}

//Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after "//"

//that will be used as default PS
func (c *Controller) createPSConfigMap() error {
//If a ConfigMap with the same name already exists, it was created by an earlier operator
//we delete and recreate it in case the grpc_tensorflow_server.py was updated in the meantime
cm, err := c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Get(spec.PSConfigMapName(), metav1.GetOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause problems? Suppose a job is happily running using the supplied config map. Now suppose the operator gets restarted. The operator would then try to delete and recreate the ConfigMap which seems problematic since some jobs could already be mounting it.

Here are some options I can think of for avoiding this.

  • Create a unique ConfigMap for every job
  • Do not delete the ConfigMap if it already exists; just reuse it.

I favor the first option because the ConfigMap is just a temporary solution until the TensorFlow container has the gRPC server binary built in. I think ConfigMap per job makes it more robust and I think this is better than the slight efficiency we get by reusing ConfigMaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
1 ConfigMap per job would also mean specifying the volumes configuration for the default PS in the create phase (probably where I specify the Command) because the name of the ConfigMap won't be known in advance anymore and will rely on the RuntimeId.
Are you ok with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
} else {
err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Delete(spec.PSConfigMapName(), &metav1.DeleteOptions{})
if err != nil {
return err
}
}

cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: spec.PSConfigMapName(),
},
Data: make(map[string]string),
}

//grab server sources from files
filePaths := map[string]string{
"grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the path of grpc_tensorflow_server.py a flag that gets plumbed through?

Do you plan on updating the Dockerfile for the controller to include grpc_tensorflow_server.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I get this right, do you want this to be a flag at the controller level, so users could specify a custom `grpc_tensorflow_server.py, or do you want a flag at the job level to customize where this file gets mounted in the pod?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking at the controller level not the job level. Right now the Docker image and controller both have to hardcode the same location of grpc_tensorflow_server.py. That seems brittle.

If you make it a flag in main.go that gets plumbed through, then the Docker image can put grpc_tensorflow_server.py anywhere in the image and we can just specify the location as an argument to main.py.

}
for n, fp := range filePaths {
data, err := ioutil.ReadFile(fp)
if err != nil {
return err
}
cm.Data[n] = string(data)
}

_, err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Create(cm)
if err != nil {
return err
}
return nil
}

func (c *Controller) createCRD() error {
crd := &v1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: spec.CRDName(),
},
Spec: v1beta1.CustomResourceDefinitionSpec{
Group: spec.CRDGroup,
Group: spec.CRDGroup,
Version: spec.CRDVersion,
Scope: v1beta1.NamespaceScoped,
Names: v1beta1.CustomResourceDefinitionNames{
Plural: spec.CRDKindPlural,
// TODO(jlewi): Do we want to set the singular name?
// Kind is the serialized kind of the resource. It is normally CamelCase and singular.
Kind: reflect.TypeOf(spec.TfJob{}).Name(),
},
Scope: v1beta1.NamespaceScoped,
Names: v1beta1.CustomResourceDefinitionNames{
Plural: spec.CRDKindPlural,
// TODO(jlewi): Do we want to set the singular name?
// Kind is the serialized kind of the resource. It is normally CamelCase and singular.
Kind: reflect.TypeOf(spec.TfJob{}).Name(),
},
},
}

_, err := c.ApiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
Expand Down
Loading