<a href="https://colab.research.google.com/github/Josepholaidepetro/tensorflow_job/blob/main/tfjob.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
JOB_FILE = "tfjob.py"
TFJOB_YAML_FILE = "tfjob.yaml"

In [None]:
!pip install tensorflow_datasets --user

In [None]:
!pip install --upgrade --user nbconvert

**PYTHON SCRIPT**

In [2]:
%%writefile $JOB_FILE
import argparse
import logging
import json
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import SGD, Adam, RMSprop

logging.getLogger().setLevel(logging.INFO)




def make_datasets_unbatched():
  BUFFER_SIZE = 10000

  datasets, ds_info = tfds.load(name="mnist", download=True, with_info=True, as_supervised=True)
  mnist_train, mnist_test = datasets["train"], datasets["test"]

  def scale(image, label):
      image = tf.cast(image, tf.float32) / 255
      return image, label

  train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).repeat()
  test_dataset = mnist_test.map(scale)

  return train_dataset, test_dataset


def model(args):
  model = models.Sequential()
  model.add(
      layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28, 28, 1)))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(128, (3, 3), activation='relu'))
  model.add(layers.Flatten())
  model.add(layers.Dense(256, activation='relu'))
  model.add(layers.Dense(10, activation='softmax'))

  model.summary()
  opt = args.optimizer
  model.compile(optimizer=opt,
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])
  tf.keras.backend.set_value(model.optimizer.learning_rate, args.learning_rate)
  return model


def main(args):

  # MultiWorkerMirroredStrategy creates copies of all variables in the model's
  # layers on each device across all workers
  # if your GPUs don't support NCCL, replace "communication" with another
  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
      communication=tf.distribute.experimental.CollectiveCommunication.AUTO)
  logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}")
  BATCH_SIZE_PER_REPLICA = args.batch_size
  BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

  train_dataset, test_dataset = make_datasets_unbatched()
  train_dataset = train_dataset.batch(batch_size=BATCH_SIZE)
  test_dataset = test_dataset.batch(batch_size=BATCH_SIZE)

  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = \
        tf.data.experimental.AutoShardPolicy.DATA

  train_datasets_sharded  = train_dataset.with_options(options)
  test_dataset_sharded = test_dataset.with_options(options)

  with strategy.scope():
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = model(args)

  # Callback for printing the LR at the end of each epoch.
  class PrintLR(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use
      print('\nLearning rate for epoch {} is {}'.format(
        epoch + 1, multi_worker_model.optimizer.lr.numpy()))

  callbacks = [
      tf.keras.callbacks.TensorBoard(log_dir='./logs'),
      PrintLR()
   ]

  # Keras' `model.fit()` trains the model with specified number of epochs and
  # number of steps per epoch. Note that the numbers here are for demonstration
  # purposes only and may not sufficiently produce a model with good quality.
  multi_worker_model.fit(train_datasets_sharded,
                         epochs=10,
                         steps_per_epoch=10,
                         callbacks=callbacks)
  
  eval_loss, eval_acc = multi_worker_model.evaluate(test_dataset_sharded, 
                                                    verbose=0, steps=10)

  # Log metrics for Katib
  logging.info("loss={:.4f}".format(eval_loss))
  logging.info("accuracy={:.4f}".format(eval_acc))


if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument("--batch_size",
                      type=int,
                      default=128,
                      metavar="N",
                      help="Batch size for training (default: 128)")
  parser.add_argument("--learning_rate", 
                      type=float,  
                      default=0.001,
                      metavar="N",
                      help='Initial learning rate')
  parser.add_argument("--optimizer", 
                      type=str, 
                      default='adam',
                      metavar="N",
                      help='optimizer')

  parsed_args, _ = parser.parse_known_args()
  main(parsed_args)

Writing tfjob.py


In [3]:
%run $JOB_FILE --optimizer 'adam'


Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


INFO:absl:Load pre-computed DatasetInfo (eg: splits, num examples,...) from GCS: mnist/3.0.1
INFO:absl:Load dataset info from /tmp/tmpbn_96efxtfds
INFO:absl:Field info.citation from disk and from code do not match. Keeping the one from code.
INFO:absl:Generating dataset mnist (/root/tensorflow_datasets/mnist/3.0.1)


[1mDownloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.



HBox(children=(FloatProgress(value=0.0, description='Dl Completed...', max=4.0, style=ProgressStyle(descriptio…

INFO:absl:Load dataset info from /root/tensorflow_datasets/mnist/3.0.1.incompleteKLO22H
INFO:absl:Field info.citation from disk and from code do not match. Keeping the one from code.
INFO:absl:Constructing tf.data.Dataset for split None, from /root/tensorflow_datasets/mnist/3.0.1




[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.[0m
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 26, 26, 64)        640       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 13, 13, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 11, 11, 128)       73856     
_________________________________________________________________
flatten (Flatten)            (None, 15488)             0         
_________________________________________________________________
dense (Dense)                (None, 256)               3965184   
_________________________________________________________________
dense_1 (Dense)              (None, 10)                2570

INFO:root:loss=0.0934
INFO:root:accuracy=0.9688


# **YAML FILE**

In [None]:
%%writefile $TFJOB_YAML_FILE
apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "fmnist"
  namespace: demo01 # your-user-namespace
spec:
  cleanPodPolicy: None
  tfReplicaSpecs:
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
          - name: tensorflow
            image: mavencodev/tfjob:6.0
            command:
                - "python"
                - "/tfjob.py"
                - "--batch_size=150"
                - "--learning_rate=0.001"
                - "--optimizer=adam"

Overwriting tfjob.yaml


In [None]:
import re

from IPython.utils.capture import CapturedIO


def get_resource(captured_io: CapturedIO) -> str:
    """
    Gets a resource name from `kubectl apply -f <configuration.yaml>`.

    :param str captured_io: Output captured by using `%%capture` cell magic
    :return: Name of the Kubernetes resource
    :rtype: str
    :raises Exception: if the resource could not be created
    """
    out = captured_io.stdout
    matches = re.search(r"^(.+)\s+created", out)
    if matches is not None:
        return matches.group(1)
    else:
        raise Exception(f"Cannot get resource as its creation failed: {out}. It may already exist.")


In [None]:
%%capture tf_output --no-stderr
! kubectl create -f $TFJOB_YAML_FILE

In [None]:
TF_JOB = get_resource(tf_output)

In [None]:
! kubectl describe $TF_JOB

Name:         fmnist
Namespace:    demo01
Labels:       <none>
Annotations:  <none>
API Version:  kubeflow.org/v1
Kind:         TFJob
Metadata:
  Creation Timestamp:  2021-03-20T23:07:51Z
  Generation:          1
  Managed Fields:
    API Version:  kubeflow.org/v1
    Fields Type:  FieldsV1
    fieldsV1:
      f:spec:
        .:
        f:cleanPodPolicy:
        f:tfReplicaSpecs:
          .:
          f:Worker:
            .:
            f:replicas:
            f:restartPolicy:
            f:template:
              .:
              f:metadata:
                .:
                f:annotations:
                  .:
                  f:sidecar.istio.io/inject:
              f:spec:
                .:
                f:containers:
    Manager:         kubectl
    Operation:       Update
    Time:            2021-03-20T23:07:51Z
  Resource Version:  2920908
  Self Link:         /apis/kubeflow.org/v1/namespaces/demo01/tfjobs/fmnist
  UID:               9

In [None]:
! kubectl get pods -l job-name=fmnist

NAME              READY   STATUS        RESTARTS   AGE
fmnist-worker-0   1/1     Terminating   0          10m
fmnist-worker-1   1/1     Terminating   0          10m


In [None]:
!kubectl -n demo01 logs fmnist-worker-1

2021-03-20 22:57:25.128895: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcudart.so.10.1'; dlerror: libcudart.so.10.1: cannot open shared object file: No such file or directory
2021-03-20 22:57:25.128932: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-03-20 22:57:26.787203: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-03-20 22:57:26.787244: W tensorflow/stream_executor/cuda/cuda_driver.cc:312] failed call to cuInit: UNKNOWN ERROR (303)
2021-03-20 22:57:26.787269: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (fmnist-worker-1): /proc/driver/nvidia/version does not exist
2021-03-20 22:57:26.787549: I tensorflow/core/platform/c

In [None]:
!kubectl -n demo01 delete tfjob fmnist

tfjob.kubeflow.org "fmnist" deleted


# Put all these files inside a Github repository