# TF on GKE

This notebook shows how to run the [TensorFlow CIFAR10 sample](https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10_estimator) on GKE using TfJobs

In [71]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')

<IPython.core.display.Javascript object>

<h2 id="tocheading">Table of Contents</h2>
<div id="toc"></div>
<script type="text/javascript" src="https://raw.github.com/kmahelona/ipython_notebook_goodies/master/ipython_notebook_toc.js">

## Requirements

To run this notebook you must have the following installed
  * gcloud
  * kubectl
  * helm
  * kubernetes python client library
  
There is a Docker image based on Datalab suitable for running this notebook.

You can start that container as follows

```
docker run --name=gke-datalab -p "127.0.0.1:8081:8080" \
    -v "${HOME}:/content/datalab/home" \
    -v /var/run/docker.sock:/var/run/docker.sock -d  -e "PROJECT_ID=" \
    gcr.io/tf-on-k8s-dogfood/gke-datalab:v20171025-28df43b-dirty
```
  * You need to map in docker so that we can build docker images inside the container.

## Preliminaries

In [1]:
# Turn on autoreloading
%load_ext autoreload
%autoreload 2

In [2]:
from __future__ import print_function

import os
import sys

ROOT_DIR = os.path.abspath(os.path.join("../.."))
sys.path.append(ROOT_DIR)

import kubernetes
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubernetes.client.rest import ApiException
import datetime
from googleapiclient import discovery
from googleapiclient import errors
from oauth2client.client import GoogleCredentials
import logging
from pprint import pprint
from py import build_and_push_image
import StringIO
import subprocess
import time
import yaml

logging.getLogger().setLevel(logging.INFO)

TF_JOB_GROUP = "mlkube.io"
TF_JOB_VERSION = "v1beta1"
TF_JOB_PLURAL = "tfjobs"
TF_JOB_KIND = "TfJob"


Change **project** to a project you have access to.
* GKE should be enabled for that project
* Optional change the cluster name

In [3]:
project="cloud-ml-dev"
zone="us-east1-d"
cluster_name="gke-tf-example"
registry = "gcr.io/" + project
data_dir = "gs://cloud-ml-dev_jlewi/cifar10/data"
job_dirs = "gs://cloud-ml-dev_jlewi/cifar10/jobs"
gke = discovery.build("container", "v1")

### Some Utility Functions

In [4]:
def run(command, cwd=None):
  logging.info("Running: %s", " ".join(command))
  subprocess.check_call(command, cwd=cwd)

class TimeoutError(Exception):
  """An error indicating an operation timed out."""

def wait_for_operation(client,
                       project,
                       zone,
                       op_id,
                       timeout=datetime.timedelta(hours=1),
                       polling_interval=datetime.timedelta(seconds=5)):
  """Wait for the specified operation to complete.

  Args:
    client: Client for the API that owns the operation.
    project: project
    zone: Zone. Set to none if its a global operation
    op_id: Operation id.
    timeout: A datetime.timedelta expressing the amount of time to wait before
      giving up.
    polling_interval: A datetime.timedelta to represent the amount of time to
      wait between requests polling for the operation status.

  Returns:
    op: The final operation.

  Raises:
    TimeoutError: if we timeout waiting for the operation to complete.
  """
  endtime = datetime.datetime.now() + timeout
  while True:
    if zone:
      op = client.projects().zones().operations().get(
          projectId=project, zone=zone,
          operationId=op_id).execute()
    else:
      op = client.globalOperations().get(project=project,
                                         operation=op_id).execute()

    status = op.get("status", "")
    # Need to handle other status's
    if status == "DONE":
      return op
    if datetime.datetime.now() > endtime:
      raise TimeoutError("Timed out waiting for op: {0} to complete.".format(
          op_id))
    time.sleep(polling_interval.total_seconds())


## GKE Cluster Setup

* The instructions below create a **CPU** cluster
* To create a GKE cluster with GPUs sign up for the [GKE GPU Alpha](https://goo.gl/forms/ef7eh2x00hV3hahx1)
* TODO(jlewi): Update code once GPUs are in beta.

In [106]:
def create_cluster(gke, name, project, zone):
  """Create the cluster.

  Args:
    gke: Client for GKE.

  """
  cluster_request = {
      "cluster": {
          "name": name,
          "description": "A GKE cluster for TF.",
          "initialNodeCount": 1,
          "nodeConfig": {
              "machineType": "n1-standard-8",
              "oauthScopes": [
                "https://www.googleapis.com/auth/cloud-platform",
              ],
          },
      }
  }
  request = gke.projects().zones().clusters().create(body=cluster_request,
                                                     projectId=project,
                                                     zone=zone)

  try:
    logging.info("Creating cluster; project=%s, zone=%s, name=%s", project,
                 zone, name)
    response = request.execute()
    logging.info("Response %s", response)
    create_op = wait_for_operation(gke, project, zone, response["name"])
    logging.info("Cluster creation done.\n %s", create_op)

  except errors.HttpError as e:
    logging.error("Exception occured creating cluster: %s, status: %s",
                  e, e.resp["status"])
    # Status appears to be a string.
    if e.resp["status"] == '409':      
      pass
    else:
      raise

create_cluster(gke, cluster_name, project, zone)      
logging.info("Configuring kubectl")
run(["gcloud", "--project=" + project, "container",
     "clusters", "--zone=" + zone, "get-credentials", cluster_name])


INFO:googleapiclient.discovery:URL being requested: POST https://container.googleapis.com/v1/projects/cloud-ml-dev/zones/us-east1-d/clusters?alt=json
INFO:root:Creating cluster; project=cloud-ml-dev, zone=us-east1-d, name=gke-tf-example
INFO:root:Response {u'status': u'RUNNING', u'name': u'operation-1509072287071-d4291ce2', u'zone': u'us-east1-d', u'startTime': u'2017-10-27T02:44:47.071137464Z', u'targetLink': u'https://container.googleapis.com/v1/projects/236417448818/zones/us-east1-d/clusters/gke-tf-example', u'operationType': u'CREATE_CLUSTER', u'selfLink': u'https://container.googleapis.com/v1/projects/236417448818/zones/us-east1-d/operations/operation-1509072287071-d4291ce2'}
INFO:googleapiclient.discovery:URL being requested: GET https://container.googleapis.com/v1/projects/cloud-ml-dev/zones/us-east1-d/operations/operation-1509072287071-d4291ce2?alt=json
INFO:googleapiclient.discovery:URL being requested: GET https://container.googleapis.com/v1/projects/cloud-ml-dev/zones/us-eas

### Install the Operator

In [107]:
run(["helm", "init"])

INFO:root:Running: helm init


In [None]:
CHART="https://storage.googleapis.com/tf-on-k8s-dogfood-releases/latest/tf-job-operator-chart-latest.tgz"
run(["helm", "install", CHART, "-n", "tf-job", "--wait", "--replace"])

## Build Docker images

we build two docker images
  * One image based on the CPU version of TensorFlow
  * One image based on the GPU Version of TensorFlow

In [51]:
reload(build_and_push_image)
image = os.path.join(registry, "tf-models")
dockerfile = os.path.join(ROOT_DIR, "examples", "tensorflow-models", "Dockerfile.template")
base_images = {
  "cpu": "gcr.io/tensorflow/tensorflow:1.3.0",
  "gpu": "gcr.io/tensorflow/tensorflow:1.3.0-gpu",
}
modes =[ "cpu"]
images = build_and_push_image.build_and_push(dockerfile, image, modes=modes, base_images=base_images)

INFO:root:context_dir: /tmp/tmpTfJobSampleContentxtEss_cW
INFO:root:Running docker build -t gcr.io/cloud-ml-dev/tf-models-cpu:e3afe3d-dirty-2377933 /tmp/tmpTfJobSampleContentxtEss_cW
Sending build context to Docker daemon  5.12 kB

INFO:root:Step 1 : FROM gcr.io/tensorflow/tensorflow:1.3.0

INFO:root: ---> 1bb38d61d261

INFO:root:Step 2 : RUN apt-get update && apt-get install -y --no-install-recommends     ca-certificates     build-essential     git

INFO:root: ---> Using cache

INFO:root: ---> 02d9bcdd5293

INFO:root:Step 3 : RUN git clone https://github.com/jlewi/models.git /tensorflow_models &&     cd /tensorflow_models &&     git checkout generate_records

INFO:root: ---> Using cache

INFO:root: ---> e1d25a2ebd6c

INFO:root:Successfully built e1d25a2ebd6c

INFO:root:Built image: gcr.io/cloud-ml-dev/tf-models-cpu:e3afe3d-dirty-2377933
INFO:root:Running gcloud docker -- push gcr.io/cloud-ml-dev/tf-models-cpu:e3afe3d-dirty-2377933
INFO:root:The push refers to a repository [gcr.io/clou

## Create the CIFAR10 Datasets

We need to create the cifar10 TFRecord files by running [generate_cifar10_tfrecords.py](https://github.com/tensorflow/models/blob/master/tutorials/image/cifar10_estimator/generate_cifar10_tfrecords.py)
  * We submit a K8s job to run this program
  * You can skip this step if your data is already available in data_dir

In [21]:
k8s_config.load_kube_config()
api_client = k8s_client.ApiClient()
batch_api = k8s_client.BatchV1Api(api_client)

namespace = "default"
job_name = "cifar10-data-"+ datetime.datetime.now().strftime("%y%m%d-%H%M%S")

body = {}
body['apiVersion'] = "batch/v1"
body['kind'] = "Job"
body['metadata'] = {}
body['metadata']['name'] = job_name
body['metadata']['namespace'] = namespace

# Note backoffLimit requires K8s >= 1.8
spec = """
backoffLimit: 4
template:
  spec:
    containers:
    - name: cifar10
      image: {image}
      command: ["python",  "/tensorflow_models/tutorials/image/cifar10_estimator/generate_cifar10_tfrecords.py", "--data-dir={data_dir}"]
    restartPolicy: Never
""".format(data_dir=data_dir, image=images["cpu"])

spec_buffer = StringIO.StringIO(spec)
body['spec'] = yaml.load(spec_buffer)

try: 
    # Create a Resource
    api_response = batch_api.create_namespaced_job(namespace, body)
    pprint(api_response)
except ApiException as e:
    print(
        "Exception when calling DefaultApi->apis_fqdn_v1_namespaces_namespace_resource_post: %s\n" % 
        e)


INFO:requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): accounts.google.com


{'api_version': 'batch/v1',
 'kind': 'Job',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2017, 10, 27, 5, 20, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': {u'controller-uid': '7af9ff24-bad6-11e7-966d-42010a8e01b5',
                         u'job-name': 'cifar10-data-171027-052000'},
              'name': 'cifar10-data-171027-052000',
              'namespace': 'default',
              'owner_references': None,
              'resource_version': '18443',
              'self_link': '/apis/batch/v1/namespaces/default/jobs/cifar10-data-171027-052000',
              'uid': '7af9ff24-bad6-11e7-966d-42010a8e01b5'},
 'spec': {'active_deadline_seconds': None,
          'completions': 1,


wait for the job to finish

In [8]:
while True:
  results = batch_api.read_namespaced_job(job_name, namespace)
  pprint(results)
  if results.status.succeeded >= 1 or results.status.failed >= 3:
    break
  print("Sleep....")
  time.sleep(5)

if results.status.succeeded >= 1:
  print("Job completed successfully")
else:
  print("Job failed")

{'api_version': 'batch/v1',
 'kind': 'Job',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2017, 10, 27, 4, 3, 41, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'initializers': None,
              'labels': {u'controller-uid': 'd192edb5-bacb-11e7-966d-42010a8e01b5',
                         u'job-name': 'cifar10-data-171027-040341'},
              'name': 'cifar10-data-171027-040341',
              'namespace': 'default',
              'owner_references': None,
              'resource_version': '10866',
              'self_link': '/apis/batch/v1/namespaces/default/jobs/cifar10-data-171027-040341',
              'uid': 'd192edb5-bacb-11e7-966d-42010a8e01b5'},
 'spec': {'active_deadline_seconds': None,
          'completions': 

## Create a TfJob

In [53]:
k8s_config.load_kube_config()
api_client = k8s_client.ApiClient()
crd_api = k8s_client.CustomObjectsApi(api_client)

namespace = "default"
job_name = "cifar10-"+ datetime.datetime.now().strftime("%y%m%d-%H%M%S")
job_dir = os.path.join(job_dirs, job_name)
num_steps = 10
body = {}
body['apiVersion'] = TF_JOB_GROUP + "/" + TF_JOB_VERSION
body['kind'] = TF_JOB_KIND
body['metadata'] = {}
body['metadata']['name'] = job_name
body['metadata']['namespace'] = namespace

spec = """
  replicaSpecs:
    - replicas: 1
      tfReplicaType: MASTER
      template:
        spec:
          containers:
            - image: {image}
              name: tensorflow
              command:
                - python
                - /tensorflow_models/tutorials/image/cifar10_estimator/cifar10_main.py
                - --data-dir={data_dir}
                - --job-dir={job_dir}
                - --train-steps={num_steps}
                - --num-gpus=0
          restartPolicy: OnFailure
  tfImage: {image}      
""".format(image=images["cpu"], data_dir=data_dir, job_dir=job_dir, num_steps=num_steps)

#     - replicas: 1
#       tfReplicaType: PS

spec_buffer = StringIO.StringIO(spec)
body['spec'] = yaml.load(spec_buffer)

try: 
    # Create a Resource
    api_response = crd_api.create_namespaced_custom_object(TF_JOB_GROUP, TF_JOB_VERSION, namespace, TF_JOB_PLURAL, body)
    pprint(api_response)
except ApiException as e:
    print(
        "Exception when calling DefaultApi->apis_fqdn_v1_namespaces_namespace_resource_post: %s\n" % 
        e)

INFO:requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): accounts.google.com


{u'apiVersion': u'mlkube.io/v1beta1',
 u'kind': u'TfJob',
 u'metadata': {u'clusterName': u'',
               u'creationTimestamp': u'2017-10-27T17:42:24Z',
               u'deletionGracePeriodSeconds': None,
               u'deletionTimestamp': None,
               u'name': u'cifar10-171027-174224',
               u'namespace': u'default',
               u'resourceVersion': u'98470',
               u'selfLink': u'/apis/mlkube.io/v1beta1/namespaces/default/tfjobs/cifar10-171027-174224',
               u'uid': u'316daf0b-bb3e-11e7-966d-42010a8e01b5'},
 u'spec': {u'replicaSpecs': [{u'replicas': 1,
                              u'template': {u'spec': {u'containers': [{u'command': [u'python',
                                                                                    u'/tensorflow_models/tutorials/image/cifar10_estimator/cifar10_main.py',
                                                                                    u'--data-dir=gs://cloud-ml-dev_jlewi/cifar10/data',
          

## Wait for job to finish

In [9]:

from kubernetes.client.models.v1_label_selector import V1LabelSelector
import urllib2
# Get pod logs
config = k8s_config.load_kube_config()

api_client = k8s_client.ApiClient()
api_client.config.debug=True
v1 = k8s_client.CoreV1Api(api_client)
runtime_id = results["spec"]["RuntimeId"]
# TODO(jlewi): V1LabelSelector doesn't seem to help
pods = v1.list_namespaced_pod(namespace=namespace, label_selector="runtime_id={0},job_type=MASTER".format(runtime_id))

pod = pods.items[0]
pod.metadata.name

2017-10-27 22:50:24,168 DEBUG Starting new HTTPS connection (1): 35.196.178.48
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): 35.196.178.48
2017-10-27 22:50:24,514 DEBUG https://35.196.178.48:443 "GET /api/v1/namespaces/default/pods?labelSelector=runtime_id%3Dhrhh%2Cjob_type%3DMASTER HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:https://35.196.178.48:443 "GET /api/v1/namespaces/default/pods?labelSelector=runtime_id%3Dhrhh%2Cjob_type%3DMASTER HTTP/1.1" 200 None


send: 'GET /api/v1/namespaces/default/pods?labelSelector=runtime_id%3Dhrhh%2Cjob_type%3DMASTER HTTP/1.1\r\nHost: 35.196.178.48\r\nAccept-Encoding: identity\r\nContent-Type: application/json\r\nauthorization: Bearer ya29.GlvxBFWg_2SoP-BqOEjsiGrO97EPSj5-ru1D2WOgqcdfc_XA4ErGEsy9yXtYlwYM2jRecj75dg3rot10ObCMQPc4mHetuUGH3AmHD8E6mPVLQY9eUbpaUY-gcv8n\r\nAccept: application/json\r\nUser-Agent: Swagger-Codegen/1.0.0-snapshot/python\r\n\r\n'
reply: 'HTTP/1.1 200 OK\r\n'
header: Content-Type: application/json
header: Date: Fri, 27 Oct 2017 22:50:24 GMT
header: Transfer-Encoding: chunked


'master-hrhh-0-wrh6g'

In [14]:
!pip install --upgrade google-cloud-logging

Collecting google-cloud-logging
  Downloading google_cloud_logging-1.3.0-py2.py3-none-any.whl (43kB)
[K    100% |████████████████████████████████| 51kB 3.3MB/s 
[?25hRequirement already up-to-date: gapic-google-cloud-logging-v2<0.92dev,>=0.91.0 in /usr/local/lib/python2.7/dist-packages (from google-cloud-logging)
Collecting google-cloud-core<0.28dev,>=0.27.0 (from google-cloud-logging)
  Downloading google_cloud_core-0.27.1-py2.py3-none-any.whl (50kB)
[K    100% |████████████████████████████████| 51kB 2.9MB/s 
[?25hCollecting grpcio<2.0dev,>=1.2.0 (from google-cloud-logging)
  Downloading grpcio-1.7.0-cp27-cp27mu-manylinux1_x86_64.whl (5.7MB)
[K    100% |████████████████████████████████| 5.7MB 169kB/s 
[?25hRequirement already up-to-date: google-gax<0.16dev,>=0.15.7 in /usr/local/lib/python2.7/dist-packages (from gapic-google-cloud-logging-v2<0.92dev,>=0.91.0->google-cloud-logging)
Collecting oauth2client<4.0dev,>=2.0.0 (from gapic-google-cloud-logging-v2<0.92dev,>=0.91.0->google

In [1]:
import logging
from google.cloud import logging as gcp_logging

In [3]:
project="cloud-ml-dev"
gcp_logging.__version__

'1.3.0'

In [4]:
import logging
from google.cloud import logging as gcp_logging
#pod_filter = 'resource.type="container" AND resource.labels.pod_id="{name}"'.format(name=pod.metadata.name)
pod_filter = 'resource.type="container" AND resource.labels.pod_id="master-hrhh-0-wrh6g"'
#pod_filter = 'resource.type="container"'.format(name=pod.metadata.name)
#pod_filter = 'logName: "projects/cloud-ml-dev/logs/tensorflow"'
client = gcp_logging.Client(project=project)

# i = client.list_sinks()
# for e in i:
#   print(e)
#for entry in client.list_entries(filter_=pod_filter):
for entry in client.list_entries(filter_=pod_filter):
  print("iterate")
  print(entry)

iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a1f10>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a1f10>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d7255ae90>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d7255ae90>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d7255ae90>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d7255ae90>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d721a14d0>
iterate
<google.cloud.logging.entries.TextEntry object at 0x7f6d7255ae90>
iterate
<google.cloud.logging.entries.

In [117]:
project

'cloud-ml-dev'

In [8]:

k8s_config.load_kube_config()
api_client = k8s_client.ApiClient()
crd_api = k8s_client.CustomObjectsApi(api_client)

while True:
  results = crd_api.get_namespaced_custom_object(TF_JOB_GROUP, TF_JOB_VERSION, namespace, TF_JOB_PLURAL, job_name)
  pprint(results)
  
  if results["status"]["phase"] == "Done":
    break
  logging.info("Sleep...")
  time.sleep(5)
  
logging.info("Job %s", result["status"]["state"])

INFO:requests.packages.urllib3.connectionpool:Starting new HTTPS connection (1): accounts.google.com


{u'apiVersion': u'mlkube.io/v1beta1',
 u'kind': u'TfJob',
 u'metadata': {u'clusterName': u'',
               u'creationTimestamp': u'2017-10-27T17:42:24Z',
               u'generation': 0,
               u'name': u'cifar10-171027-174224',
               u'namespace': u'default',
               u'resourceVersion': u'98814',
               u'selfLink': u'/apis/mlkube.io/v1beta1/namespaces/default/tfjobs/cifar10-171027-174224',
               u'uid': u'316daf0b-bb3e-11e7-966d-42010a8e01b5'},
 u'spec': {u'RuntimeId': u'hrhh',
           u'replicaSpecs': [{u'IsDefaultPS': False,
                              u'replicas': 1,
                              u'template': {u'metadata': {u'creationTimestamp': None},
                                            u'spec': {u'containers': [{u'command': [u'python',
                                                                                    u'/tensorflow_models/tutorials/image/cifar10_estimator/cifar10_main.py',
                                  

NameError: name 'logginginfo' is not defined