In [None]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# BERT fine-tuning using AI Platform Training


This tutorial demonstrates how to use the AI Platform (Unified) Training to run a single node multi-worker (distributed) training with GPUs.

This tutorial contains complete code to fine-tune BERT to perform sentiment analysis on a dataset of plain-text IMDB movie reviews. It uses  the same modeling techniques as the `01_bert_finetuning_local.ipynb` notebook. Refer to that notebook for more information. 


## Setting up the environment



### Set up your GCP project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a GCP project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the AI Platform APIs, Compute Engine APIs and Container Registry API.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component,containerregistry.googleapis.com)

4. [Google Cloud SDK](https://cloud.google.com/sdk) is already installed in AI Platform Notebooks.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Project ID

**If you don't know your project ID**, you might be able to get your project ID using `gcloud` command by executing the second cell below.

In [3]:
PROJECT_ID = 'jk-demos'

! gcloud config set project $PROJECT_ID

Updated property [core/project].


#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for AI Platform (Unified). We recommend when possible, to choose the region closest to you. 

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You can not use a Multi-Regional Storage bucket for training with AI Platform. Not all regions provide support for all AI Platform services. For the lastest support per region, see [Region support for AI Platform (Unified) services](https://cloud.google.com/ai-platform-unified/docs/general/locations)

In [4]:
REGION = 'us-central1' 

### Install the required Python packages

Follow the instructions in the [repo's README](README.md) to install the latest (preview) version of AI Platform (Unified) SDK.

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

In this tutorial, your training job retrieves data and  saves the artifacts created during the job, including
a trained model, checkpoints, and the TensorBoard logs, into a Google Cloud storage bucket. 

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets. 

In [5]:
BUCKET_NAME = "jk-demos-bucket" 

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [6]:
! gsutil mb -l $REGION gs://$BUCKET_NAME

Creating gs://jk-demos-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'jk-demos-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


Finally, validate access to your Cloud Storage bucket by examining its contents:

In [7]:
! gsutil ls -al gs://$BUCKET_NAME

                                 gs://jk-demos-bucket/data/
                                 gs://jk-demos-bucket/jobs/
                                 gs://jk-demos-bucket/tfrecords/


### Set AI Platform (Unified) constants

Let's now setup some constants for AI Platform (Unified):

- `API_ENDPOINT`: The AI Platform (Unified) API service endpoint for dataset, model, job, pipeline and endpoint services.
- `API_PREDICT_ENDPOINT`: The AI Platform (Unified) API service endpoint for prediction.
- `PARENT`: The AI Platform (Unified) location root path for dataset, model and endpoint resources.

In [8]:
# API Endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
API_PREDICT_ENDPOINT = "{}-prediction-aiplatform.googleapis.com".format(REGION)

# AI Platform (Unified) location root path for your dataset, model and endpoint resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

### Import libraries and define constants

In [41]:
import os
import shutil
import sys
import time

import tensorflow as tf

from datetime import datetime
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

from google.cloud.aiplatform import gapic as aip

## Preparing data

#### Download IMDB dataset

In [8]:
local_dir = os.path.expanduser('~')
local_dir = f'{local_dir}/datasets'

if tf.io.gfile.exists(local_dir):
    tf.io.gfile.rmtree(local_dir)
tf.io.gfile.makedirs(local_dir)

url = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'
local_path = f'{local_dir}/aclImdb_v1.tar.gz'

dataset = tf.keras.utils.get_file(local_path, url,
                                  untar=True, 
                                  cache_dir=local_dir,
                                  cache_subdir='.'
                                  )
dataset_dir = os.path.join(os.path.dirname(dataset), 'aclImdb')

train_dir = os.path.join(dataset_dir, 'train')

# remove unused folders to make it easier to load the data
remove_dir = os.path.join(train_dir, 'unsup')
shutil.rmtree(remove_dir)

Downloading data from https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz


#### Convert the IMDB dataset to TFRecords files

##### Create tf.data datasets from IMDB text files

In [9]:
def create_datasets(train_dir, test_dir, val_split, seed):
    
    train_ds = tf.keras.preprocessing.text_dataset_from_directory(
        train_dir,
        validation_split=val_split,
        subset='training',
        seed=seed)

    class_names = train_ds.class_names
    
    train_ds = train_ds.unbatch()

    val_ds = tf.keras.preprocessing.text_dataset_from_directory(
        train_dir,
        validation_split=val_split,
        subset='validation',
        seed=seed).unbatch()

    test_ds = tf.keras.preprocessing.text_dataset_from_directory(
        test_dir).unbatch()

    return train_ds, val_ds, test_ds, class_names

In [12]:
seed = 42
val_split = 0.2
test_dir = f'{dataset_dir}/test'

train_ds, val_ds, test_ds, class_names = (
    create_datasets(train_dir, test_dir, val_split, seed)
)

Found 25000 files belonging to 2 classes.
Using 20000 files for training.
Found 25000 files belonging to 2 classes.
Using 5000 files for validation.
Found 25000 files belonging to 2 classes.


##### Prepare tf.Example serialization routines

In [13]:
def serialize_example(text_fragment, label):
    """Serializes text fragment and label in tf.Example."""
    
    def _bytes_feature(value):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    def _int64_feature(value):
        """Returns an int64_list from a bool / enum / int / uint."""
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    feature = {
        'text_fragment': _bytes_feature(text_fragment),
        'label': _int64_feature(label)
    }
    
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()
    
def tf_serialize_example(text_fragment, label):
  tf_string = tf.py_function(
    serialize_example,
    (text_fragment, label), 
    tf.string)      
  return tf.reshape(tf_string, ()) 

##### Write TFRecords files

In [14]:
tfrecords_folder = '{}/tfrecords'.format(os.path.expanduser('~'))
if tf.io.gfile.exists(tfrecords_folder):
    tf.io.gfile.rmtree(tfrecords_folder)
tf.io.gfile.makedirs(tfrecords_folder)

filenames = ['train.tfrecords', 'valid.tfrecords', 'test.tfrecords']
for file_name, dataset in zip(filenames, [train_ds, val_ds, test_ds]):
    writer = tf.data.experimental.TFRecordWriter(os.path.join(tfrecords_folder, file_name))
    writer.write(dataset.map(tf_serialize_example))

##### Double check that you can read the created format

In [15]:
for record in tf.data.TFRecordDataset([os.path.join(tfrecords_folder, file_name)]).take(2):
    print(record)

tf.Tensor(b"\n\xfe\t\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x01\n\xeb\t\n\rtext_fragment\x12\xd9\t\n\xd6\t\n\xd3\tCaught this movie on TV and I watched it again. I said to myself, it's been a long time since I watched this movie, so why not. And once again this movie thrilled me. It is so easy, so watchable and so human that I don't know why some people dislike it. <br /><br />John Travolta shines as Michael (his dance and every move), angel that can hardly be related to this word. He smokes, drinks and he eats like some savage, but he's got big heart. On his way Michael helps all people he meets. Dorothy Winters (Andie MacDowell) in her singing and finding a right man. Frank Quinlan (William Hurt) in developing himself as a good and decent man. Michael even helped dear old Pansy Milbank (Jean Stapleton) - that last scene is beautiful when they dance on the street.<br /><br />Travolta had great help in other actors. Andie MacDowell is so beautiful and likable, William Hurt is great as 

### Copy the created TFRecord files to GCS

In [16]:
gcs_paths = [f'gs://{BUCKET_NAME}/tfrecords/train',
             f'gs://{BUCKET_NAME}/tfrecords/valid',
             f'gs://{BUCKET_NAME}/tfrecords/test']

for filename, gcs_path in zip(filenames, gcs_paths):
    local_file_path = os.path.join(tfrecords_folder, filename)
    gcs_file_path = f'{gcs_path}/{filename}'
    !gsutil cp {local_file_path} {gcs_file_path}

Copying file:///home/jupyter/tfrecords/train.tfrecords [Content-Type=application/octet-stream]...
/ [1 files][ 26.5 MiB/ 26.5 MiB]                                                
Operation completed over 1 objects/26.5 MiB.                                     
Copying file:///home/jupyter/tfrecords/valid.tfrecords [Content-Type=application/octet-stream]...
/ [1 files][  6.6 MiB/  6.6 MiB]                                                
Operation completed over 1 objects/6.6 MiB.                                      
Copying file:///home/jupyter/tfrecords/test.tfrecords [Content-Type=application/octet-stream]...
- [1 files][ 32.3 MiB/ 32.3 MiB]                                                
Operation completed over 1 objects/32.3 MiB.                                     


## 

## Submitting training jobs


There are three types of AI Platform resources you can create to train custom models on AI Platform:

- [Custom jobs](https://cloud.google.com/ai-platform-unified/docs/training/create-custom-job)
- [Hyperparameter tuning jobs](https://cloud.google.com/ai-platform-unified/docs/training/using-hyperparameter-tuning)
- [Training pipelines](https://cloud.google.com/ai-platform-unified/docs/training/create-training-pipeline)

In this notebook, we will use a Custom job.

When you create a training job, you need to specify a configuration of a compute node (for single node training) or a cluster of nodes (for distributed) and what training code will each node run. You define the configuration of a node(s) but creating *Worker Pools Specifictions*. The core part of a worker pools definition are specification of machine types and accelerators.

There are two ways of packaging your training code to run on worker pools. 

- **Use a Google Cloud prebuilt container**. If you use a prebuilt container, you will additionally specify a Python package to install into the container image. This Python package contains your code for training a custom model.

- **Use your own custom container image**. If you use your own container, the container needs to contain your code for training a custom model.

In this tutorial, you will use your own custom container.

In the following sections you will create a custom container image that packages your training code followed by defining and submitting both single node and distributed training jobs.


### Creating the training container image


To create a custom training container you need to define a Python training module and package it in a container image together with all the required dependencies.

#### Create the training module

We will use the code snippets from the `01_bert_finetuning_local.ipynb` as the base for traning module. The major difference is that the training module will use `tfrecords` files you created in the previous step as its input data. Also the training module supports the [MultiWorkerMirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/MultiWorkerMirroredStrategy) in addtion to [MirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy) for distributed training.

In [17]:
! rm -rf trainer
! mkdir trainer
! touch trainer/__init__.py

In [55]:
%%writefile trainer/task.py


# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#            http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import os
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text

from absl import app
from absl import flags
from absl import logging
from official.nlp import optimization 


TFHUB_HANDLE_ENCODER = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/3'
TFHUB_HANDLE_PREPROCESS = 'https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3'
LOCAL_TB_FOLDER = '/tmp/logs'
LOCAL_SAVED_MODEL_DIR = '/tmp/saved_model'

FLAGS = flags.FLAGS
flags.DEFINE_integer('steps_per_epoch', 625, 'Steps per training epoch')
flags.DEFINE_integer('eval_steps', 150, 'Evaluation steps')
flags.DEFINE_integer('epochs', 2, 'Nubmer of epochs')
flags.DEFINE_integer('per_replica_batch_size', 32, 'Per replica batch size')
flags.DEFINE_string('training_data_path', 'gs://jk-demos-bucket/tfrecords/train', 'Training data GCS path')
flags.DEFINE_string('validation_data_path', 'gs://jk-demos-bucket/tfrecords/valid', 'Validation data GCS path')
flags.DEFINE_string('testing_data_path', 'gs://jk-demos-bucket/data/imdb/test', 'Testing data GCS path')

flags.DEFINE_string('job_dir', 'gs://jk-demos-bucket/jobs', 'A base GCS path for jobs')
flags.DEFINE_enum('strategy', 'multiworker', ['mirrored', 'multiworker'], 'Distribution strategy')
flags.DEFINE_enum('auto_shard_policy', 'auto', ['auto', 'data', 'file', 'off'], 'Dataset sharing strategy')



auto_shard_policy = {
    'auto': tf.data.experimental.AutoShardPolicy.AUTO,
    'data': tf.data.experimental.AutoShardPolicy.DATA,
    'file': tf.data.experimental.AutoShardPolicy.FILE,
    'off': tf.data.experimental.AutoShardPolicy.OFF,
}


def create_unbatched_dataset(tfrecords_folder):
    """Creates an unbatched dataset in the format required by the 
       sentiment analysis model from the folder with TFrecords files."""
    
    feature_description = {
        'text_fragment': tf.io.FixedLenFeature([], tf.string, default_value=''),
        'label': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    }

    def _parse_function(example_proto):
        parsed_example = tf.io.parse_single_example(example_proto, feature_description)
        return parsed_example['text_fragment'], parsed_example['label']
  
    file_paths = [f'{tfrecords_folder}/{file_path}' for file_path in tf.io.gfile.listdir(tfrecords_folder)]
    dataset = tf.data.TFRecordDataset(file_paths)
    dataset = dataset.map(_parse_function)
    
    return dataset


def configure_dataset(ds, auto_shard_policy):
    """
    Optimizes the performance of a dataset.
    """
    
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = (
        auto_shard_policy
    )
    
    ds = ds.repeat(-1).cache()
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    ds = ds.with_options(options)
    return ds


def create_input_pipelines(train_dir, valid_dir, test_dir, batch_size, auto_shard_policy):
    """Creates input pipelines from Imdb dataset."""
    
    train_ds = create_unbatched_dataset(train_dir)
    train_ds = train_ds.batch(batch_size)
    train_ds = configure_dataset(train_ds, auto_shard_policy)
    
    valid_ds = create_unbatched_dataset(valid_dir)
    valid_ds = valid_ds.batch(batch_size)
    valid_ds = configure_dataset(valid_ds, auto_shard_policy)
    
    test_ds = create_unbatched_dataset(test_dir)
    test_ds = test_ds.batch(batch_size)
    test_ds = configure_dataset(test_ds, auto_shard_policy)

    return train_ds, valid_ds, test_ds


def build_classifier_model(tfhub_handle_preprocess, tfhub_handle_encoder):
    """Builds a simple binary classification model with BERT trunk."""
    
    text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
    preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess, name='preprocessing')
    encoder_inputs = preprocessing_layer(text_input)
    encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True, name='BERT_encoder')
    outputs = encoder(encoder_inputs)
    net = outputs['pooled_output']
    net = tf.keras.layers.Dropout(0.1)(net)
    net = tf.keras.layers.Dense(1, activation=None, name='classifier')(net)
    
    return tf.keras.Model(text_input, net)


def copy_tensorboard_logs(local_path: str, gcs_path: str):
    """Copies Tensorboard logs from a local dir to a GCS location.
    
    After training, batch copy Tensorboard logs locally to a GCS location. This can result
    in faster pipeline runtimes over streaming logs per batch to GCS that can get bottlenecked
    when streaming large volumes.
    
    Args:
      local_path: local filesystem directory uri.
      gcs_path: cloud filesystem directory uri.
    Returns:
      None.
    """
    pattern = '{}/*/events.out.tfevents.*'.format(local_path)
    local_files = tf.io.gfile.glob(pattern)
    gcs_log_files = [local_file.replace(local_path, gcs_path) for local_file in local_files]
    for local_file, gcs_file in zip(local_files, gcs_log_files):
        tf.io.gfile.copy(local_file, gcs_file)


def main(argv):
    del argv
    
    def _is_chief(task_type, task_id):
        return ((task_type == 'chief' or task_type == 'worker') and task_id == 0) or task_type is None
        
    
    logging.info('Setting up training.')
    logging.info('   epochs: {}'.format(FLAGS.epochs))
    logging.info('   steps_per_epoch: {}'.format(FLAGS.steps_per_epoch))
    logging.info('   eval_steps: {}'.format(FLAGS.eval_steps))
    logging.info('   strategy: {}'.format(FLAGS.strategy))
    
    if FLAGS.strategy == 'mirrored':
        strategy = tf.distribute.MirroredStrategy()
    else:
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
        
    if strategy.cluster_resolver:    
        task_type, task_id = (strategy.cluster_resolver.task_type,
                              strategy.cluster_resolver.task_id)
    else:
        task_type, task_id =(None, None)
        
    
    global_batch_size = (strategy.num_replicas_in_sync *
                         FLAGS.per_replica_batch_size)
    
    
    train_ds, valid_ds, test_ds = create_input_pipelines(
        FLAGS.training_data_path,
        FLAGS.validation_data_path,
        FLAGS.testing_data_path,
        global_batch_size,
        auto_shard_policy[FLAGS.auto_shard_policy])
        
    num_train_steps = FLAGS.steps_per_epoch * FLAGS.epochs
    num_warmup_steps = int(0.1*num_train_steps)
    init_lr = 3e-5
    
    with strategy.scope():
        model = build_classifier_model(TFHUB_HANDLE_PREPROCESS, TFHUB_HANDLE_ENCODER)
        loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)
        metrics = tf.metrics.BinaryAccuracy()
        optimizer = optimization.create_optimizer(
            init_lr=init_lr,
            num_train_steps=num_train_steps,
            num_warmup_steps=num_warmup_steps,
            optimizer_type='adamw')

        model.compile(optimizer=optimizer,
                      loss=loss,
                      metrics=metrics)
        
    # Configure BackupAndRestore callback
    backup_dir = '{}/backupandrestore'.format(FLAGS.job_dir)
    callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir)]
    
    # Configure TensorBoard callback on Chief
    if _is_chief(task_type, task_id):
        callbacks.append(tf.keras.callbacks.TensorBoard(
            log_dir=LOCAL_TB_FOLDER, update_freq='batch'))
    
    logging.info('Starting training ...')
    
    history = model.fit(x=train_ds,
                        validation_data=valid_ds,
                        steps_per_epoch=FLAGS.steps_per_epoch,
                        validation_steps=FLAGS.eval_steps,
                        epochs=FLAGS.epochs,
                        callbacks=callbacks)

    if _is_chief(task_type, task_id):
        # Copy tensorboard logs to GCS
        tb_logs = '{}/tb_logs'.format(FLAGS.job_dir)
        logging.info('Copying TensorBoard logs to: {}'.format(tb_logs))
        copy_tensorboard_logs(LOCAL_TB_FOLDER, tb_logs)
        saved_model_dir = '{}/saved_model'.format(FLAGS.job_dir)
    else:
        saved_model_dir = LOCAL_SAVED_MODEL_DIR
        
    # Save trained model
    saved_model_dir = '{}/saved_model'.format(FLAGS.job_dir)
    logging.info('Training completed. Saving the trained model to: {}'.format(saved_model_dir))
    model.save(saved_model_dir)
    #tf.saved_model.save(model, saved_model_dir)
    
    
if __name__ == '__main__':
    logging.set_verbosity(logging.INFO)
    app.run(main)


Overwriting trainer/task.py


#### Create a docker file

We will use a standard Deep Learning container image as the base for our custom training container image.

In [56]:
TRAIN_BASE_IMAGE = 'gcr.io/deeplearning-platform-release/tf2-gpu.2-4'
TRAIN_IMAGE = f'gcr.io/{PROJECT_ID}/imdb_bert'

In [57]:
dockerfile = f'''
FROM {TRAIN_BASE_IMAGE}

RUN pip install pip install tf-models-official tensorflow-text 

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
'''

with open('Dockerfile', 'w') as f:
    f.write(dockerfile)

#### Build a container image and upload it to your Container Registry

In [58]:
! docker build -t {TRAIN_IMAGE} .

Sending build context to Docker daemon  97.94MB
Step 1/5 : FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-4
 ---> 4306f1450ab2
Step 2/5 : RUN pip install pip install tf-models-official tensorflow-text
 ---> Using cache
 ---> 7b934d54db7f
Step 3/5 : WORKDIR /
 ---> Using cache
 ---> ab9126acc6e9
Step 4/5 : COPY trainer /trainer
 ---> d5d5041a3d42
Step 5/5 : ENTRYPOINT ["python", "-m", "trainer.task"]
 ---> Running in d9edccf45ff1
Removing intermediate container d9edccf45ff1
 ---> 6925897f2b13
Successfully built 6925897f2b13
Successfully tagged gcr.io/jk-demos/imdb_bert:latest


In [59]:
! docker push {TRAIN_IMAGE}

The push refers to repository [gcr.io/jk-demos/imdb_bert]

[1B9f2883d1: Preparing 
[1B35db413e: Preparing 
[1B3b7141d8: Preparing 
[1B28f81e86: Preparing 
[1Bf77b619e: Preparing 
[1B22f25eab: Preparing 
[1B61cbfb7b: Preparing 
[1Bdb287c92: Preparing 
[1B0f881413: Preparing 
[1Be39f1882: Preparing 
[1B2059d805: Preparing 
[1Badd514e4: Preparing 
[1B40002f73: Preparing 
[1B7670164c: Preparing 
[1B88a169f3: Preparing 
[1Ba13b2926: Preparing 
[1Bd631abca: Preparing 
[1Bea8063f8: Preparing 
[1B5280894d: Preparing 
[1B65bc85a8: Preparing 
[1B00c31be3: Preparing 
[1B18b890fc: Preparing 
[1Ba7c9e3d1: Preparing 
[1B4dce1444: Preparing 
[1B30bcc944: Preparing 
[1Be116c0c0: Preparing 
[1B4df0ad6c: Preparing 
[1Bdf553184: Preparing 
[29Bf2883d1: Pushed lready exists 9kB[25A[2K[24A[2K[22A[2K[19A[2K[17A[2K[13A[2K[10A[2K[6A[2K[2A[2K[3A[2K[29A[2Klatest: digest: sha256:991c294a8fba838bc12d6c701e18f3d79ab0fc85e1367ac9fdf7fb15baf40e01 size: 6408


Alternatively you use Cloud Build.

!gcloud builds submit --tag {TRAIN_IMAGE} .

#### Testing the image locally

In [108]:
!docker run -it --gpus all {TRAIN_IMAGE} --epochs 2 --steps_per_epoch 10 --eval_steps 5

2021-03-01 07:38:09.451911: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
I0301 07:38:14.804283 140191870428992 task.py:135] Setting up training.
I0301 07:38:14.804424 140191870428992 task.py:136]    epochs: 2
I0301 07:38:14.804513 140191870428992 task.py:137]    steps_per_epoch: 10
I0301 07:38:14.804593 140191870428992 task.py:138]    eval_steps: 5
I0301 07:38:14.804671 140191870428992 task.py:139]    strategy: mirrored
2021-03-01 07:38:14.804945: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-01 07:38:14.805950: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-03-01 07:38:14.816648: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:941] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-0

### Assembling a  training job specification

A training job specification comprises two parts:
- A worker pool configuration, and
- A scheduling configuration

For single-node training, you define a single worker pool. For multi-node distributed training, multiple worker pools are defined.

Within the worker pool specification, you configure:
- Machine types and accelerators
- Configuration of what training code the worker pool runs.


defines a series of worker pools. Within the pool you specify the following settings:
- A number of replicas in a pool
- Machine type and accelerators
- A configuration of a custom container. All replicas in a pool use the same configuration



For our job we will use a `n1-standard-4` machine with 2 NVidia T4 GPUs.


In [39]:
MACHINE_TYPE = 'n1-standard-4'
TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_V100, 2)

When configuring a custom container you pass the command line parameters expected by your script through the `args` field of the container specification.

In [40]:
epochs = 3
steps_per_epoch = 200
eval_steps = 50
training_data_path = 'gs://jk-demos-bucket/tfrecords/train'
validation_data_path = 'gs://jk-demos-bucket/tfrecords/valid'
job_id = 'job-{}'.format(datetime.now().strftime("%Y%m%d%H%M%S"))
job_dir = 'gs://jk-demos-bucket/jobs/{}'.format(job_id)

worker_pool_spec = [
    {
        "replica_count": 1,
        "machine_spec": {
            "machine_type": MACHINE_TYPE,
            "accelerator_type": TRAIN_GPU,
            "accelerator_count": TRAIN_NGPU
        },
        "container_spec": {
            "image_uri": TRAIN_IMAGE,
            "args": [
                "--epochs=" + str(epochs),
                "--steps_per_epoch=" + str(steps_per_epoch),
                "--eval_steps=" + str(eval_steps),
                "--training_data_path=" + training_data_path,
                "--validation_data_path=" + validation_data_path,
                "--job_dir=" + job_dir,
                "--strategy=multiworker",
                "--auto_shard_policy=data",
            ]
        },
    },
    {
        "replica_count": 1,
        "machine_spec": {
            "machine_type": MACHINE_TYPE,
            "accelerator_type": TRAIN_GPU,
            "accelerator_count": TRAIN_NGPU
        },
        "container_spec": {
            "image_uri": TRAIN_IMAGE,
            "args": [
                "--epochs=" + str(epochs),
                "--steps_per_epoch=" + str(steps_per_epoch),
                "--eval_steps=" + str(eval_steps),
                "--training_data_path=" + training_data_path,
                "--validation_data_path=" + validation_data_path,
                "--job_dir=" + job_dir,
                "--strategy=multiworker",
                "--auto_shard_policy=data",
            ]
        },
    },
]

custom_job = {
        "display_name": f'imdb-bert-{job_id}',
        "job_spec": {
            "worker_pool_specs": worker_pool_spec
        },
    }

custom_job

NameError: name 'datetime' is not defined

#### Submitting a job

The AI Platform (Unified) SDK works as a client/server model. On your side, the Python script, you will create a client that sends requests and receives responses from the server -- AI Platform.

To submit training jobs you need to create a Job Service client.

In [153]:
client_options = {"api_endpoint": API_ENDPOINT}

client = aip.JobServiceClient(client_options=client_options)

And to submit the job.

In [154]:
response = client.create_custom_job(parent=PARENT, custom_job=custom_job)
response

name: "projects/993115309906/locations/us-central1/customJobs/6676595243669782528"
display_name: "imdb-bert-job-20210301141537"
job_spec {
  worker_pool_specs {
    machine_spec {
      machine_type: "n1-standard-4"
      accelerator_type: NVIDIA_TESLA_T4
      accelerator_count: 2
    }
    replica_count: 1
    disk_spec {
      boot_disk_type: "pd-ssd"
      boot_disk_size_gb: 100
    }
    container_spec {
      image_uri: "gcr.io/jk-demos/imdb_bert"
      args: "--epochs=3"
      args: "--steps_per_epoch=200"
      args: "--eval_steps=50"
      args: "--training_data_path=gs://jk-demos-bucket/tfrecords/train"
      args: "--validation_data_path=gs://jk-demos-bucket/tfrecords/valid"
      args: "--job_dir=gs://jk-demos-bucket/jobs/job-20210301141537"
      args: "--strategy=multiworker"
    }
  }
  worker_pool_specs {
    machine_spec {
      machine_type: "n1-standard-4"
      accelerator_type: NVIDIA_TESLA_T4
      accelerator_count: 2
    }
    replica_count: 1
    disk_spec {
  

#### Monitoring the job

You can monitor the job through GCP Console or programmaticaly by using the `client.get_custom_job()` method

In [131]:
job_name = response.name

response = client.get_custom_job(name=job_name)
response

name: "projects/993115309906/locations/us-central1/customJobs/4600435815451983872"
display_name: "imdb-bert-job-20210301080317"
job_spec {
  worker_pool_specs {
    machine_spec {
      machine_type: "n1-standard-4"
      accelerator_type: NVIDIA_TESLA_T4
      accelerator_count: 2
    }
    replica_count: 1
    disk_spec {
      boot_disk_type: "pd-ssd"
      boot_disk_size_gb: 100
    }
    container_spec {
      image_uri: "gcr.io/jk-demos/imdb_bert"
      args: "--epochs=3"
      args: "--steps_per_epoch=200"
      args: "--eval_steps=50"
      args: "--training_data_path=gs://jk-demos-bucket/tfrecords/train"
      args: "--validation_data_path=gs://jk-demos-bucket/tfrecords/valid"
      args: "--job_dir=gs://jk-demos-bucket/jobs/job-20210301080317"
      args: "--strategy=multiworker"
    }
  }
  worker_pool_specs {
    machine_spec {
      machine_type: "n1-standard-4"
      accelerator_type: NVIDIA_TESLA_T4
      accelerator_count: 2
    }
    replica_count: 1
    disk_spec {
  