# BERT fine-tuning with Vertex AI

Building example from [this notebook example](https://github.com/RajeshThallam/vertex-ai-labs/blob/main/03-distributed-training-text/03-distributed-training-vertex-ai-bert-finetuning.ipynb)

In [1]:
# !pip install --user --upgrade google-cloud-aiplatform -q
# !pip install --user --upgrade kfp -q
# !pip install --user --upgrade google-cloud-pipeline-components -q
# !pip install --user --upgrade google-cloud-bigquery-datatransfer -q
# !pip install --user tf-models-official==2.11.0 tensorflow-text==2.11.0 -q
# pip install tensorflow_io

In [2]:
# import IPython
# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

In [4]:
# !pwd

In [6]:
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import shutil
import sys
import pprint
import pandas as pd
import tensorflow as tf
import time
import csv
from datetime import datetime

import matplotlib.pyplot as plt

import google.auth
from google.protobuf.json_format import MessageToDict
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value


from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import hyperparameter_tuning as hpt
from google.cloud.aiplatform_v1beta1 import types
from google.cloud import bigquery
from google.cloud import exceptions

from google.cloud.aiplatform.utils import JobClientWithOverride

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing

from tensorflow_io import bigquery as tfio_bq

import logging
logging.disable(logging.WARNING)

In [7]:
PREFIX = 'jtv7'

In [8]:
creds, PROJECT = google.auth.default()
REGION = 'us-central1'

STAGING_BUCKET = f'gs://{PREFIX}-{PROJECT}-bucket'
BUCKET_NAME = STAGING_BUCKET
VERTEX_SA = '934903580331-compute@developer.gserviceaccount.com'

print(f"PROJECT = {PROJECT}")
print(f"STAGING_BUCKET = {STAGING_BUCKET}")
print(f"VERTEX_SA = {VERTEX_SA}")

PROJECT = hybrid-vertex
STAGING_BUCKET = gs://jtv7-hybrid-vertex-bucket
VERTEX_SA = 934903580331-compute@developer.gserviceaccount.com


In [9]:
! gsutil mb -l $REGION $STAGING_BUCKET

Creating gs://jtv7-hybrid-vertex-bucket/...


In [10]:
# TENSORBOARD_NAME = f"tb-bert-tune-{PREFIX}"  # @param {type:"string"}

# if (
#     TENSORBOARD_NAME == ""
#     or TENSORBOARD_NAME is None
#     or TENSORBOARD_NAME == "[your-tensorboard-name]"
# ):
#     TENSORBOARD_NAME = PROJECT + "-tb-" #+ UUID

# tensorboard = vertex_ai.Tensorboard.create(
#     display_name=TENSORBOARD_NAME, project=PROJECT, location=REGION
# )
# TENSORBOARD = tensorboard.gca_resource.name
# print("TENSORBOARD:", TENSORBOARD)

## Data prep

In [11]:
local_dir = os.path.expanduser('~')
local_dir = f'{local_dir}/distributed-training/datasets'

if tf.io.gfile.exists(local_dir):
    tf.io.gfile.rmtree(local_dir)
tf.io.gfile.makedirs(local_dir)

url = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'
local_path = f'{local_dir}/aclImdb_v1.tar.gz'
print(f'local_path: {local_path}')

dataset = tf.keras.utils.get_file(
    local_path
    , url
    , untar=True
    , cache_dir=local_dir
    , cache_subdir='.'
)
dataset_dir = os.path.join(os.path.dirname(dataset), 'aclImdb')
print(f'dataset_dir: {dataset_dir}')

train_dir = os.path.join(dataset_dir, 'train')
print(f'train_dir: {train_dir}')

# remove unused folders to make it easier to load the data
remove_dir = os.path.join(train_dir, 'unsup')
shutil.rmtree(remove_dir)

local_path: /home/jupyter/distributed-training/datasets/aclImdb_v1.tar.gz
Downloading data from https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
dataset_dir: /home/jupyter/distributed-training/datasets/aclImdb
train_dir: /home/jupyter/distributed-training/datasets/aclImdb/train


In [12]:
def create_splits(train_dir, test_dir, val_split, seed):
    
    train_ds = tf.keras.preprocessing.text_dataset_from_directory(
        train_dir,
        validation_split=val_split,
        subset='training',
        seed=seed)

    class_names = train_ds.class_names
    
    train_ds = train_ds.unbatch()

    val_ds = tf.keras.preprocessing.text_dataset_from_directory(
        train_dir,
        validation_split=val_split,
        subset='validation',
        seed=seed).unbatch()

    test_ds = tf.keras.preprocessing.text_dataset_from_directory(
        test_dir).unbatch()

    return train_ds, val_ds, test_ds, class_names

In [13]:
seed = 42
val_split = 0.2
test_dir = f'{dataset_dir}/test'

train_ds, val_ds, test_ds, class_names = (
    create_splits(train_dir, test_dir, val_split, seed)
)

Found 25000 files belonging to 2 classes.
Using 20000 files for training.
Found 25000 files belonging to 2 classes.
Using 5000 files for validation.
Found 25000 files belonging to 2 classes.


In [14]:
for text, label in train_ds.take(2):
    print(f'Review: {text.numpy()}')
    label = label.numpy()
    print(f'Label : {label} ({class_names[label]})')

Review: b'"Pandemonium" is a horror movie spoof that comes off more stupid than funny. Believe me when I tell you, I love comedies. Especially comedy spoofs. "Airplane", "The Naked Gun" trilogy, "Blazing Saddles", "High Anxiety", and "Spaceballs" are some of my favorite comedies that spoof a particular genre. "Pandemonium" is not up there with those films. Most of the scenes in this movie had me sitting there in stunned silence because the movie wasn\'t all that funny. There are a few laughs in the film, but when you watch a comedy, you expect to laugh a lot more than a few times and that\'s all this film has going for it. Geez, "Scream" had more laughs than this film and that was more of a horror film. How bizarre is that?<br /><br />*1/2 (out of four)'
Label : 0 (neg)
Review: b"David Mamet is a very interesting and a very un-equal director. His first movie 'House of Games' was the one I liked best, and it set a series of films with characters whose perspective of life changes as they

In [15]:
def serialize_example(text_fragment, label):
    """Serializes text fragment and label in tf.Example."""
    
    def _bytes_feature(value):
        """Returns a bytes_list from a string / byte."""
        if isinstance(value, type(tf.constant(0))):
            value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

    def _int64_feature(value):
        """Returns an int64_list from a bool / enum / int / uint."""
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    feature = {
        'text_fragment': _bytes_feature(text_fragment),
        'label': _int64_feature(label)
    }
    
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()
    
def tf_serialize_example(text_fragment, label):
  tf_string = tf.py_function(
    serialize_example,
    (text_fragment, label), 
    tf.string)      
  return tf.reshape(tf_string, ()) 

In [16]:
tfrecords_folder = '{}/tfrecords'.format(os.path.expanduser('~'))
if tf.io.gfile.exists(tfrecords_folder):
    tf.io.gfile.rmtree(tfrecords_folder)
tf.io.gfile.makedirs(tfrecords_folder)

filenames = ['train.tfrecords', 'valid.tfrecords', 'test.tfrecords']
for file_name, dataset in zip(filenames, [train_ds, val_ds, test_ds]):
    writer = tf.data.experimental.TFRecordWriter(os.path.join(tfrecords_folder, file_name))
    writer.write(dataset.map(tf_serialize_example))

In [17]:
for record in tf.data.TFRecordDataset([os.path.join(tfrecords_folder, file_name)]).take(2):
    print(record)

tf.Tensor(b"\n\xd1\x07\n\xbe\x07\n\rtext_fragment\x12\xac\x07\n\xa9\x07\n\xa6\x07Need I say more? The reason the GOOD Australian version of Kath and Kim was, as mentioned, good, was because of it's hilariously funny originality. The reason this new American-ised version is so terrible is because a lot of it is taken straight from the original. Not to mention the unfaithfulness to the characters. Kath is meant to be a dag. Kim is meant to be fat. Kel (or Phil as they have dubbed him) is meant to be pathetic. Brett (or Craig) is meant to be a loser, not a person who acts like he's on heroin and finishes every sentence with 'dude'. Thank God Szubanski didn't sell her rights to Sharon, she'd probably end up being a tall thin blonde who Kim likes.<br /><br />Kath and Kim are MOTHER AND DAUGHTER. They are not meant to look 2 years apart. And they are not meant to giggle like school girls. This show is a disgrace to even share the same title as the Australian version. America: get your own te

In [18]:
gcs_paths = [f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train',
             f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid',
             f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test']

for filename, gcs_path in zip(filenames, gcs_paths):
    local_file_path = os.path.join(tfrecords_folder, filename)
    gcs_file_path = f'{gcs_path}/{filename}'
    !gsutil cp {local_file_path} {gcs_file_path}

Copying file:///home/jupyter/tfrecords/train.tfrecords [Content-Type=application/octet-stream]...
/ [1 files][ 26.5 MiB/ 26.5 MiB]                                                
Operation completed over 1 objects/26.5 MiB.                                     
Copying file:///home/jupyter/tfrecords/valid.tfrecords [Content-Type=application/octet-stream]...
/ [1 files][  6.6 MiB/  6.6 MiB]                                                
Operation completed over 1 objects/6.6 MiB.                                      
Copying file:///home/jupyter/tfrecords/test.tfrecords [Content-Type=application/octet-stream]...
/ [1 files][ 32.3 MiB/ 32.3 MiB]                                                
Operation completed over 1 objects/32.3 MiB.                                     


In [19]:
# TRAIN_BASE_IMAGE = 'gcr.io/deeplearning-platform-release/tf2-gpu.2-4'
TRAIN_BASE_IMAGE = 'us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-11:latest'
# TRAIN_BASE_IMAGE = 'us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-11:latest'

In [21]:
! rm -rf scripts/trainer
! mkdir -p scripts/trainer
! touch scripts/trainer/__init__.py

In [22]:
%%writefile scripts/trainer/task.py


# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#            http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import os
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text

import time

from absl import app
from absl import flags
from absl import logging
from official.nlp import optimization 

import random
import string

from google.cloud import aiplatform as vertex_ai


TFHUB_HANDLE_ENCODER = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/3'
TFHUB_HANDLE_PREPROCESS = 'https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3'
LOCAL_TB_FOLDER = '/tmp/logs'
LOCAL_SAVED_MODEL_DIR = '/tmp/saved_model'

FLAGS = flags.FLAGS
flags.DEFINE_integer('steps_per_epoch', 625, 'Steps per training epoch')
flags.DEFINE_integer('eval_steps', 150, 'Evaluation steps')
flags.DEFINE_integer('epochs', 2, 'Nubmer of epochs')
flags.DEFINE_integer('per_replica_batch_size', 32, 'Per replica batch size')
flags.DEFINE_integer('TRAIN_NGPU', 1, '')
flags.DEFINE_integer('replica_count', 1, '')
flags.DEFINE_integer('reduction_cnt', 0, '')

flags.DEFINE_string('training_data_path', f'/bert-finetuning/imdb/tfrecords/train', 'Training data GCS path')
flags.DEFINE_string('validation_data_path', f'/bert-finetuning/imdb/tfrecords/valid', 'Validation data GCS path')
flags.DEFINE_string('testing_data_path', f'/bert-finetuning/imdb/tfrecords/test', 'Testing data GCS path')

flags.DEFINE_string('job_dir', f'/jobs', 'A base GCS path for jobs')
flags.DEFINE_string('job_id', 'default', 'unique_id for experiment runs')
flags.DEFINE_string('TRAIN_GPU', 'NA', '')
flags.DEFINE_string('experiment_run', 'NA', '')
flags.DEFINE_string('experiment_name', 'NA', '')


flags.DEFINE_enum('strategy', 'multiworker', ['single', 'mirrored', 'multiworker'], 'Distribution strategy')
flags.DEFINE_enum('auto_shard_policy', 'auto', ['auto', 'data', 'file', 'off'], 'Dataset sharing strategy')

auto_shard_policy = {
    'auto': tf.data.experimental.AutoShardPolicy.AUTO,
    'data': tf.data.experimental.AutoShardPolicy.DATA,
    'file': tf.data.experimental.AutoShardPolicy.FILE,
    'off': tf.data.experimental.AutoShardPolicy.OFF,
}


def create_unbatched_dataset(tfrecords_folder):
    """Creates an unbatched dataset in the format required by the 
       sentiment analysis model from the folder with TFrecords files."""
    
    feature_description = {
        'text_fragment': tf.io.FixedLenFeature([], tf.string, default_value=''),
        'label': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    }

    def _parse_function(example_proto):
        parsed_example = tf.io.parse_single_example(example_proto, feature_description)
        return parsed_example['text_fragment'], parsed_example['label']
  
    file_paths = [f'{tfrecords_folder}/{file_path}' for file_path in tf.io.gfile.listdir(tfrecords_folder)]
    dataset = tf.data.TFRecordDataset(file_paths)
    dataset = dataset.map(_parse_function)
    
    return dataset


def configure_dataset(ds, auto_shard_policy):
    """
    Optimizes the performance of a dataset.
    """
    
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = (
        auto_shard_policy
    )
    
    ds = ds.repeat(-1).cache()
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    ds = ds.with_options(options)
    return ds


def create_input_pipelines(train_dir, valid_dir, test_dir, batch_size, auto_shard_policy):
    """Creates input pipelines from Imdb dataset."""
    
    train_ds = create_unbatched_dataset(train_dir)
    train_ds = train_ds.batch(batch_size)
    train_ds = configure_dataset(train_ds, auto_shard_policy)
    
    valid_ds = create_unbatched_dataset(valid_dir)
    valid_ds = valid_ds.batch(batch_size)
    valid_ds = configure_dataset(valid_ds, auto_shard_policy)
    
    test_ds = create_unbatched_dataset(test_dir)
    test_ds = test_ds.batch(batch_size)
    test_ds = configure_dataset(test_ds, auto_shard_policy)

    return train_ds, valid_ds, test_ds


def build_classifier_model(tfhub_handle_preprocess, tfhub_handle_encoder):
    """Builds a simple binary classification model with BERT trunk."""
    
    text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name='text')
    preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess, name='preprocessing')
    encoder_inputs = preprocessing_layer(text_input)
    encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True, name='BERT_encoder')
    outputs = encoder(encoder_inputs)
    net = outputs['pooled_output']
    net = tf.keras.layers.Dropout(0.1)(net)
    net = tf.keras.layers.Dense(1, activation=None, name='classifier')(net)
    
    return tf.keras.Model(text_input, net)


def copy_tensorboard_logs(local_path: str, gcs_path: str):
    """Copies Tensorboard logs from a local dir to a GCS location.
    
    After training, batch copy Tensorboard logs locally to a GCS location. This can result
    in faster pipeline runtimes over streaming logs per batch to GCS that can get bottlenecked
    when streaming large volumes.
    
    Args:
      local_path: local filesystem directory uri.
      gcs_path: cloud filesystem directory uri.
    Returns:
      None.
    """
    pattern = '{}/*/events.out.tfevents.*'.format(local_path)
    local_files = tf.io.gfile.glob(pattern)
    gcs_log_files = [local_file.replace(local_path, gcs_path) for local_file in local_files]
    for local_file, gcs_file in zip(local_files, gcs_log_files):
        tf.io.gfile.copy(local_file, gcs_file)


def main(argv):
    del argv
    
    def _is_chief(task_type, task_id):
        return ((task_type == 'chief' or task_type == 'worker') and task_id == 0) or task_type is None
        
    
    logging.info('Setting up training.')
    logging.info('   epochs: {}'.format(FLAGS.epochs))
    logging.info('   steps_per_epoch: {}'.format(FLAGS.steps_per_epoch))
    logging.info('   eval_steps: {}'.format(FLAGS.eval_steps))
    logging.info('   strategy: {}'.format(FLAGS.strategy))
    logging.info('   job_id: {}'.format(FLAGS.job_id))
    logging.info('   TRAIN_GPU: {}'.format(FLAGS.TRAIN_GPU))
    logging.info('   TRAIN_NGPU: {}'.format(FLAGS.TRAIN_NGPU))
    logging.info('   replica_count: {}'.format(FLAGS.replica_count))
    logging.info('   reduction_cnt: {}'.format(FLAGS.reduction_cnt))
    logging.info('   experiment_name: {}'.format(FLAGS.experiment_name))
    logging.info('   experiment_run: {}'.format(FLAGS.experiment_run))
    
    tb_dir = os.getenv('AIP_TENSORBOARD_LOG_DIR', LOCAL_TB_FOLDER)
    model_dir = os.getenv('AIP_MODEL_DIR', LOCAL_SAVED_MODEL_DIR)
    logging.info(f'AIP_TENSORBOARD_LOG_DIR = {tb_dir}')
    logging.info(f'AIP_MODEL_DIR = {model_dir}')
    
    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    
    vertex_ai.init(
        project=project_number,
        location='us-central1',
        experiment=FLAGS.experiment_name
    )

    # Single Machine, single compute device
    if FLAGS.strategy == 'single':
        if tf.test.is_gpu_available():
            strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
        else:
            strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
        logging.info("Single device training")
    
    # Single Machine, multiple compute device
    elif FLAGS.strategy == 'mirrored':
        strategy = tf.distribute.MirroredStrategy()
        logging.info("Mirrored Strategy distributed training")
    
    # Multi Machine, multiple compute device
    elif FLAGS.strategy == 'multiworker':
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
        logging.info("Multi-worker Strategy distributed training")
        logging.info('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
   
    # Single Machine, multiple TPU devices
    elif FLAGS.strategy == 'tpu':
        cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local")
        tf.config.experimental_connect_to_cluster(cluster_resolver)
        tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
        strategy = tf.distribute.TPUStrategy(cluster_resolver)
        print("All devices: ", tf.config.list_logical_devices('TPU'))

    logging.info('num_replicas_in_sync = {}'.format(strategy.num_replicas_in_sync))
    
    if strategy.cluster_resolver:    
        task_type, task_id = (strategy.cluster_resolver.task_type,
                              strategy.cluster_resolver.task_id)
    else:
        task_type, task_id =(None, None)
        
    logging.info('task_type = {}'.format(task_type))
    logging.info('task_id = {}'.format(task_id))
    
    global_batch_size = (
        strategy.num_replicas_in_sync *
        FLAGS.per_replica_batch_size
    )
    
    train_ds, valid_ds, test_ds = create_input_pipelines(
        FLAGS.training_data_path,
        FLAGS.validation_data_path,
        FLAGS.testing_data_path,
        global_batch_size,
        auto_shard_policy[FLAGS.auto_shard_policy])
        
    num_train_steps = FLAGS.steps_per_epoch * FLAGS.epochs
    num_warmup_steps = int(0.1*num_train_steps)
    init_lr = 3e-5
    
    with strategy.scope():
        
        model = build_classifier_model(TFHUB_HANDLE_PREPROCESS, TFHUB_HANDLE_ENCODER)
        
        loss = tf.keras.losses.BinaryCrossentropy(from_logits=True)
        
        metrics = tf.metrics.BinaryAccuracy()
        
        optimizer = optimization.create_optimizer(
            init_lr=init_lr
            , num_train_steps=num_train_steps
            , num_warmup_steps=num_warmup_steps
            , optimizer_type='adamw'
        )

        model.compile(
            optimizer=optimizer
            , loss=loss
            , metrics=metrics
        )
        
    # Configure BackupAndRestore callback
    if FLAGS.strategy == 'single':
        callbacks = []
        logging.info("No backup and restore")
    else:
        backup_dir = '{}/backupandrestore'.format(FLAGS.job_dir)
        callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir)]
        logging.info(f"saved backup and restore t0: {backup_dir}")
    
    # Configure TensorBoard callback on Chief
    if _is_chief(task_type, task_id):
        callbacks.append(
            tf.keras.callbacks.TensorBoard(
                log_dir=tb_dir
                , update_freq='batch'
                , histogram_freq=1
            )
        )
    
    logging.info('Starting training ...')
    
    if _is_chief(task_type, task_id):
        start_time = time.time()
    
    history = model.fit(
        x=train_ds
        , validation_data=valid_ds
        , steps_per_epoch=FLAGS.steps_per_epoch
        , validation_steps=FLAGS.eval_steps
        , epochs=FLAGS.epochs
        , callbacks=callbacks
    )
    
    # ====================================================
    # log Vertex Experiments
    # ====================================================
    SESSION_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=3))
    
    if _is_chief(task_type, task_id):
        end_time = time.time()
        # val metrics
        val_keys = [v for v in history.history.keys()]
        total_train_time = int((end_time - start_time) / 60)

        metrics_dict = {"total_train_time": total_train_time}
        logging.info(f"total_train_time: {total_train_time}")
        _ = [metrics_dict.update({key: history.history[key][-1]}) for key in val_keys]
    
        logging.info(f" task_type logging experiments: {task_type}")
        logging.info(f" task_id logging experiments: {task_id}")
        logging.info(f" logging data to experiment run: {FLAGS.experiment_run}-{SESSION_id}")
        
        with vertex_ai.start_run(
            f'{FLAGS.experiment_run}-{SESSION_id}', 
        ) as my_run:
            
            logging.info(f"logging metrics...")
            my_run.log_metrics(metrics_dict)

            logging.info(f"logging metaparams...")
            my_run.log_params(
                {
                    "epochs": FLAGS.epochs,
                    "strategy": FLAGS.strategy,
                    "per_replica_batch_size": FLAGS.per_replica_batch_size,
                    "TRAIN_GPU": FLAGS.TRAIN_GPU,
                    "TRAIN_NGPU": FLAGS.TRAIN_NGPU,
                    "replica_count": FLAGS.replica_count,
                    "reduction_cnt": FLAGS.reduction_cnt,
                    "global_batch_size": global_batch_size,
                }
            )

            vertex_ai.end_run()
            logging.info(f"EXPERIMENT RUN: '{FLAGS.experiment_run}-{SESSION_id}' has ended")

    if _is_chief(task_type, task_id):
        # Copy tensorboard logs to GCS
        # tb_logs = '{}/tb_logs'.format(FLAGS.job_dir)
        # logging.info('Copying TensorBoard logs to: {}'.format(tb_logs))
        # copy_tensorboard_logs(LOCAL_TB_FOLDER, tb_logs)
        # saved_model_dir = '{}/saved_model'.format(model_dir)
        logging.info('Training completed. Saving the trained model to: {}'.format(model_dir))
        model.save(model_dir)
    # else:
    #     # saved_model_dir = model_dir
    #     logging.info('Training completed. Saving the trained model to: {}'.format(model_dir))
    #     model.save(model_dir)
        
    # Save trained model
    # saved_model_dir = '{}/saved_model'.format(model_dir)
    # logging.info('Training completed. Saving the trained model to: {}'.format(saved_model_dir))
    # model.save(saved_model_dir)
    #tf.saved_model.save(model, saved_model_dir)
    
    
if __name__ == '__main__':
    logging.set_verbosity(logging.INFO)
    app.run(main)

Writing scripts/trainer/task.py


In [23]:
TRAIN_IMAGE = f'gcr.io/{PROJECT}/imdb_bert'

In [24]:
dockerfile = f'''
FROM {TRAIN_BASE_IMAGE}

RUN pip install pip install tf-models-official==2.11.0 tensorflow-text==2.11.0

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

RUN apt update && apt -y install nvtop

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
'''

with open('scripts/Dockerfile', 'w') as f:
    f.write(dockerfile)

In [25]:
! docker build -t {TRAIN_IMAGE} scripts/

Sending build context to Docker daemon  19.46kB
Step 1/6 : FROM us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-11:latest
 ---> 1fc7736bd93e
Step 2/6 : RUN pip install pip install tf-models-official==2.11.0 tensorflow-text==2.11.0
 ---> Using cache
 ---> 21a04e0b97f8
Step 3/6 : WORKDIR /
 ---> Using cache
 ---> 9e7e18cb76dc
Step 4/6 : COPY trainer /trainer
 ---> 8c4df82a0924
Step 5/6 : RUN apt update && apt -y install nvtop
 ---> Running in f52dc9f3f7c9
[91m

[0mGet:1 http://packages.cloud.google.com/apt cloud-sdk InRelease [6361 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1581 B]
Get:3 http://packages.cloud.google.com/apt cloud-sdk/main amd64 Packages [464 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [361 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy InRelease [270 kB]
Get:7 http://security.ubuntu.com/ubuntu 

In [26]:
! docker push {TRAIN_IMAGE}

Using default tag: latest
The push refers to repository [gcr.io/hybrid-vertex/imdb_bert]

[1B547609c0: Preparing 
[1B43e55906: Preparing 
[1B65c47b32: Preparing 
[1B95c7b436: Preparing 
[1Bf663075e: Preparing 
[1B7fbea69f: Preparing 
[1Bdcb6c992: Preparing 
[1B57d3600a: Preparing 
[1B98aef500: Preparing 
[1B937e6451: Preparing 
[1Bf16690a3: Preparing 
[1B0ba8f8e0: Preparing 
[1B061c8df0: Preparing 
[1B20bdc5a8: Preparing 
[1B9623cc67: Preparing 
[1Bf5f47ef7: Preparing 
[1Be3a670db: Preparing 
[1Bab0a5210: Preparing 
[1B27b973c2: Preparing 
[1B003a8778: Preparing 
[1B6eb0eac1: Preparing 
[1Ba842d5cf: Preparing 
[1B7c820400: Preparing 
[1B2eabba29: Preparing 
[1B9e83e652: Preparing 
[1B8f4121e3: Preparing 
[20B7d3600a: Waiting g 
[20B8aef500: Waiting g 
[1Bd7cd1026: Preparing 
[21B37e6451: Waiting g 
[1B06a133b8: Preparing 
[22B16690a3: Waiting g 
[1Bd1f80fca: Preparing 
[1Bf0edb23d: Preparing 
[35B47609c0: Pushed   44.08MB/44.04MB[30A[2K[27A[2K[24

## Submitting training jobs

In [27]:
vertex_ai.init(
    project=PROJECT,
    location=REGION,
    staging_bucket=STAGING_BUCKET
)

## 1 Replica, 1 GPU

### set Experiment

In [28]:
import time

EXPERIMENT_PREFIX = 'bert'
EXPERIMENT_NAME=f'{PREFIX}-bert-tune'
RUN_NAME = f'run-{time.strftime("%Y%m%d-%H%M%S")}'

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")
print(f"RUN_NAME: {RUN_NAME}")

EXPERIMENT_NAME: jtv7-bert-tune
RUN_NAME: run-20230620-220732


In [29]:
MACHINE_TYPE = 'n1-standard-16'
TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_T4', 1) # NVIDIA_TESLA_T4 NVIDIA_TESLA_V100

REPLICA_COUNT = 1
DISTRIBUTION_STRATEGY = "single" # single, mirrored, multiworker, tpu

print(f"MACHINE_TYPE          : {MACHINE_TYPE}")
print(f"TRAIN_GPU             : {TRAIN_GPU}")
print(f"TRAIN_NGPU            : {TRAIN_NGPU}")
print(f"REPLICA_COUNT         : {REPLICA_COUNT}")
print(f"DISTRIBUTION_STRATEGY : {DISTRIBUTION_STRATEGY}")

MACHINE_TYPE          : n1-standard-16
TRAIN_GPU             : NVIDIA_TESLA_T4
TRAIN_NGPU            : 1
REPLICA_COUNT         : 1
DISTRIBUTION_STRATEGY : single


In [30]:
epochs = 10
steps_per_epoch = 200
eval_steps = 50
PER_REPLICA_BATCH_SIZE = 32

REDUCTION_SERVER_COUNT = 0
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

training_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train'
validation_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid'
testing_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test'

job_id = f'job-{datetime.now().strftime("%Y%m%d%H%M%S")}'.format()
job_dir = f'{STAGING_BUCKET}/jobs/{job_id}'

WORKER_ARGS = [
    "--epochs=" + str(epochs)
    , "--steps_per_epoch=" + str(steps_per_epoch)
    , "--eval_steps=" + str(eval_steps)
    , "--per_replica_batch_size=" + str(PER_REPLICA_BATCH_SIZE)
    , "--training_data_path=" + training_data_path
    , "--validation_data_path=" + validation_data_path
    , "--testing_data_path=" + testing_data_path
    , "--job_dir=" + job_dir
    , f"--strategy={DISTRIBUTION_STRATEGY}"
    , "--auto_shard_policy=auto" #data
    , f"--job_id={job_id}"
    , f"--TRAIN_GPU={TRAIN_GPU}"
    , f"--TRAIN_NGPU={TRAIN_NGPU}"
    , f"--reduction_cnt={REDUCTION_SERVER_COUNT}"
    , f"--replica_count={REPLICA_COUNT}"
    , f"--experiment_name={EXPERIMENT_NAME}"
    , f"--experiment_run={RUN_NAME}"
]

from utils import workerpool_specs

WORKER_POOL_SPECS = workerpool_specs.prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_count=TRAIN_NGPU,
    accelerator_type=TRAIN_GPU,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

from pprint import pprint
pprint(WORKER_POOL_SPECS)

[{'container_spec': {'args': ['--epochs=10',
                              '--steps_per_epoch=200',
                              '--eval_steps=50',
                              '--per_replica_batch_size=32',
                              '--training_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/train',
                              '--validation_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/valid',
                              '--testing_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/test',
                              '--job_dir=gs://jtv7-hybrid-vertex-bucket/jobs/job-20230620220736',
                              '--strategy=single',
                              '--auto_shard_policy=auto',
                              '--job_id=job-20230620220736',
                              '--TRAIN_GPU=NVIDIA_TESLA_T4',
                              '--TRAIN_NGPU=1',
                              '--reduction_cnt=

In [31]:
vertex_ai_tb = vertex_ai.Tensorboard.create()
TENSORBOARD = vertex_ai_tb.gca_resource.name
print(TENSORBOARD)

projects/934903580331/locations/us-central1/tensorboards/949934065933352960


In [32]:
vertex_ai.init(
    experiment=EXPERIMENT_NAME
    # , experiment_tensorboard=vertex_ai_tb
)

In [33]:
custom_job = vertex_ai.CustomJob(
    display_name=f'imdb-bert-{RUN_NAME}-{DISTRIBUTION_STRATEGY}-{REPLICA_COUNT}-{TRAIN_NGPU}',
    worker_pool_specs=WORKER_POOL_SPECS,
    staging_bucket=f'{STAGING_BUCKET}/{EXPERIMENT_NAME}/{RUN_NAME}'
)

In [34]:
custom_job.run(
    sync=False
    , service_account=VERTEX_SA
    , tensorboard=TENSORBOARD
    , restart_job_on_worker_restart=False
    , enable_web_access=True
)

In [35]:
print(f"Job Name: {custom_job.display_name}")
print(f"Job Resource Name: {custom_job.resource_name}\n")
# print(f"Check training progress at {custom_job._dashboard_uri()}")

Job Name: imdb-bert-run-20230620-220732-single-1-1
Job Resource Name: projects/934903580331/locations/us-central1/customJobs/12692624792092672



## 1 Replica, 2 GPUs

### set Experiment Run

In [36]:
RUN_NAME = f'run-{time.strftime("%Y%m%d-%H%M%S")}'

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")
print(f"RUN_NAME: {RUN_NAME}")

EXPERIMENT_NAME: jtv7-bert-tune
RUN_NAME: run-20230620-220830


In [37]:
MACHINE_TYPE = 'n1-standard-32'
TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_T4', 2) # NVIDIA_TESLA_T4 NVIDIA_TESLA_V100

REPLICA_COUNT = 1
DISTRIBUTION_STRATEGY = "mirrored" # single, mirrored, multiworker, tpu

print(f"MACHINE_TYPE          : {MACHINE_TYPE}")
print(f"TRAIN_GPU             : {TRAIN_GPU}")
print(f"TRAIN_NGPU            : {TRAIN_NGPU}")
print(f"REPLICA_COUNT         : {REPLICA_COUNT}")
print(f"DISTRIBUTION_STRATEGY : {DISTRIBUTION_STRATEGY}")

MACHINE_TYPE          : n1-standard-32
TRAIN_GPU             : NVIDIA_TESLA_T4
TRAIN_NGPU            : 2
REPLICA_COUNT         : 1
DISTRIBUTION_STRATEGY : mirrored


In [38]:
epochs = 10
steps_per_epoch = 200
eval_steps = 50
# reduction_serv='False'
PER_REPLICA_BATCH_SIZE = 32

REDUCTION_SERVER_COUNT = 0
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

training_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train'
validation_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid'
testing_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test'

job_id = f'job-{datetime.now().strftime("%Y%m%d%H%M%S")}'.format()
job_dir = f'{STAGING_BUCKET}/jobs/{job_id}'

WORKER_ARGS = [
    "--epochs=" + str(epochs)
    , "--steps_per_epoch=" + str(steps_per_epoch)
    , "--eval_steps=" + str(eval_steps)
    , "--per_replica_batch_size=" + str(PER_REPLICA_BATCH_SIZE)
    , "--training_data_path=" + training_data_path
    , "--validation_data_path=" + validation_data_path
    , "--testing_data_path=" + testing_data_path
    , "--job_dir=" + job_dir
    , f"--strategy={DISTRIBUTION_STRATEGY}"
    , "--auto_shard_policy=auto" #data
    , f"--job_id={job_id}"
    , f"--TRAIN_GPU={TRAIN_GPU}"
    , f"--TRAIN_NGPU={TRAIN_NGPU}"
    , f"--reduction_cnt={REDUCTION_SERVER_COUNT}"
    , f"--replica_count={REPLICA_COUNT}"
    , f"--experiment_name={EXPERIMENT_NAME}"
    , f"--experiment_run={RUN_NAME}"
]

from utils import workerpool_specs

WORKER_POOL_SPECS = workerpool_specs.prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_count=TRAIN_NGPU,
    accelerator_type=TRAIN_GPU,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

from pprint import pprint
pprint(WORKER_POOL_SPECS)

[{'container_spec': {'args': ['--epochs=10',
                              '--steps_per_epoch=200',
                              '--eval_steps=50',
                              '--per_replica_batch_size=32',
                              '--training_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/train',
                              '--validation_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/valid',
                              '--testing_data_path=gs://jtv7-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/test',
                              '--job_dir=gs://jtv7-hybrid-vertex-bucket/jobs/job-20230620220833',
                              '--strategy=mirrored',
                              '--auto_shard_policy=auto',
                              '--job_id=job-20230620220833',
                              '--TRAIN_GPU=NVIDIA_TESLA_T4',
                              '--TRAIN_NGPU=2',
                              '--reduction_cn

In [39]:
vertex_ai_tb = vertex_ai.Tensorboard.create()
TENSORBOARD = vertex_ai_tb.gca_resource.name
print(TENSORBOARD)

projects/934903580331/locations/us-central1/tensorboards/6822627980024479744


In [40]:
vertex_ai.init(
    experiment=EXPERIMENT_NAME
    # , experiment_tensorboard=vertex_ai_tb
)

In [41]:
custom_m_job = vertex_ai.CustomJob(
    display_name=f'imdb-bert-{RUN_NAME}-{DISTRIBUTION_STRATEGY}-{REPLICA_COUNT}-{TRAIN_NGPU}',
    worker_pool_specs=WORKER_POOL_SPECS,
    staging_bucket=f'{STAGING_BUCKET}/{EXPERIMENT_NAME}/{RUN_NAME}'
)

In [42]:
custom_m_job.run(
    sync=False
    , service_account=VERTEX_SA
    , tensorboard=TENSORBOARD
    , restart_job_on_worker_restart=False
    , enable_web_access=True
)

In [43]:
print(f"Job Name: {custom_m_job.display_name}")
print(f"Job Resource Name: {custom_m_job.resource_name}\n")
# print(f"Check training progress at {custom_job._dashboard_uri()}")

Job Name: imdb-bert-run-20230620-220830-mirrored-1-2
Job Resource Name: projects/934903580331/locations/us-central1/customJobs/1140281381494980608



## 2 Replicas, 1 GPU each

> Now increase `replica_count` from 1 to 2

### set Experiment Run

In [65]:
RUN_NAME = f'run-{time.strftime("%Y%m%d-%H%M%S")}'

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")
print(f"RUN_NAME: {RUN_NAME}")

EXPERIMENT_NAME: jtv6-bert-tune
RUN_NAME: run-20230620-175546


In [66]:
MACHINE_TYPE = 'n1-standard-16'
TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_T4', 1) # NVIDIA_TESLA_T4 NVIDIA_TESLA_V100

REPLICA_COUNT = 2
DISTRIBUTION_STRATEGY = "multiworker" # single, mirrored, multiworker, tpu

print(f"MACHINE_TYPE          : {MACHINE_TYPE}")
print(f"TRAIN_GPU             : {TRAIN_GPU}")
print(f"TRAIN_NGPU            : {TRAIN_NGPU}")
print(f"REPLICA_COUNT         : {REPLICA_COUNT}")
print(f"DISTRIBUTION_STRATEGY : {DISTRIBUTION_STRATEGY}")

MACHINE_TYPE          : n1-standard-8
TRAIN_GPU             : NVIDIA_TESLA_V100
TRAIN_NGPU            : 1
REPLICA_COUNT         : 2
DISTRIBUTION_STRATEGY : multiworker


In [67]:
epochs = 10
steps_per_epoch = 200
eval_steps = 50
# reduction_serv='False'
PER_REPLICA_BATCH_SIZE = 32

REDUCTION_SERVER_COUNT = 0
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

training_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train'
validation_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid'
testing_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test'
job_id = 'job-{}'.format(datetime.now().strftime("%Y%m%d%H%M%S"))
job_dir = f'{STAGING_BUCKET}/jobs/{job_id}'

WORKER_ARGS = [
    "--epochs=" + str(epochs)
    , "--steps_per_epoch=" + str(steps_per_epoch)
    , "--eval_steps=" + str(eval_steps)
    , "--per_replica_batch_size=" + str(PER_REPLICA_BATCH_SIZE)
    , "--training_data_path=" + training_data_path
    , "--validation_data_path=" + validation_data_path
    , "--testing_data_path=" + testing_data_path
    , "--job_dir=" + job_dir
    , f"--strategy={DISTRIBUTION_STRATEGY}"
    , "--auto_shard_policy=auto" # data
    , f"--job_id={job_id}"
    , f"--TRAIN_GPU={TRAIN_GPU}"
    , f"--TRAIN_NGPU={TRAIN_NGPU}"
    , f"--reduction_cnt={REDUCTION_SERVER_COUNT}"
    , f"--replica_count={REPLICA_COUNT}"
    , f"--experiment_name={EXPERIMENT_NAME}"
    , f"--experiment_run={RUN_NAME}"
]

from utils import workerpool_specs

WORKER_POOL_SPECS = workerpool_specs.prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_count=TRAIN_NGPU,
    accelerator_type=TRAIN_GPU,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

from pprint import pprint
pprint(WORKER_POOL_SPECS)

[{'container_spec': {'args': ['--epochs=25',
                              '--steps_per_epoch=200',
                              '--eval_steps=50',
                              '--per_replica_batch_size=32',
                              '--training_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/train',
                              '--validation_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/valid',
                              '--testing_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/test',
                              '--job_dir=gs://jtv6-hybrid-vertex-bucket/jobs/job-20230620175548',
                              '--strategy=multiworker',
                              '--auto_shard_policy=data',
                              '--job_id=job-20230620175548',
                              '--TRAIN_GPU=NVIDIA_TESLA_V100',
                              '--TRAIN_NGPU=1',
                              '--reducti

In [68]:
vertex_ai_tb = vertex_ai.Tensorboard.create()
TENSORBOARD = vertex_ai_tb.gca_resource.name
print(TENSORBOARD)

projects/934903580331/locations/us-central1/tensorboards/6689771791017050112


In [69]:
vertex_ai.init(
    experiment=EXPERIMENT_NAME
    # , experiment_tensorboard=vertex_ai_tb
)

In [70]:
custom_mm_job = vertex_ai.CustomJob(
    display_name=f'imdb-bert-{RUN_NAME}-{DISTRIBUTION_STRATEGY}-{REPLICA_COUNT}-{TRAIN_NGPU}',
    worker_pool_specs=WORKER_POOL_SPECS,
    staging_bucket=f'{STAGING_BUCKET}/{EXPERIMENT_NAME}/{RUN_NAME}'
)

In [71]:
custom_mm_job.run(
    sync=False
    , service_account=VERTEX_SA
    , tensorboard=TENSORBOARD
    , restart_job_on_worker_restart=False
    , enable_web_access=True
)

In [72]:
print(f"Job Name: {custom_mm_job.display_name}")
print(f"Job Resource Name: {custom_mm_job.resource_name}\n")
# print(f"Check training progress at {custom_m_job._dashboard_uri()}")

Job Name: imdb-bert-run-20230620-175546-multiworker-2-1
Job Resource Name: projects/934903580331/locations/us-central1/customJobs/241760479276433408



## 2 Replicas, 1 GPU each + Reduction Server

### set Experiment Run

In [73]:
RUN_NAME = f'run-{time.strftime("%Y%m%d-%H%M%S")}'

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")
print(f"RUN_NAME: {RUN_NAME}")

EXPERIMENT_NAME: jtv6-bert-tune
RUN_NAME: run-20230620-175824


In [74]:
MACHINE_TYPE = 'n1-standard-16'
TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_T4', 1) # NVIDIA_TESLA_T4 NVIDIA_TESLA_V100

REPLICA_COUNT = 2
DISTRIBUTION_STRATEGY = "multiworker" # single, mirrored, multiworker, tpu

print(f"MACHINE_TYPE          : {MACHINE_TYPE}")
print(f"TRAIN_GPU             : {TRAIN_GPU}")
print(f"TRAIN_NGPU            : {TRAIN_NGPU}")
print(f"REPLICA_COUNT         : {REPLICA_COUNT}")
print(f"DISTRIBUTION_STRATEGY : {DISTRIBUTION_STRATEGY}")

MACHINE_TYPE          : n1-standard-8
TRAIN_GPU             : NVIDIA_TESLA_V100
TRAIN_NGPU            : 1
REPLICA_COUNT         : 2
DISTRIBUTION_STRATEGY : multiworker


In [75]:
epochs = 10
steps_per_epoch = 200
eval_steps = 50
# reduction_serv='False'
PER_REPLICA_BATCH_SIZE = 32

DISTRIBUTION_STRATEGY = "multiworker" # single, mirrored, multiworker, tpu

REDUCTION_SERVER_COUNT = 4
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

training_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train'
validation_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid'
testing_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test'
job_id = 'job-{}'.format(datetime.now().strftime("%Y%m%d%H%M%S"))
job_dir = f'{STAGING_BUCKET}/jobs/{job_id}'

WORKER_ARGS = [
    "--epochs=" + str(epochs)
    , "--steps_per_epoch=" + str(steps_per_epoch)
    , "--eval_steps=" + str(eval_steps)
    , "--per_replica_batch_size=" + str(PER_REPLICA_BATCH_SIZE)
    , "--training_data_path=" + training_data_path
    , "--validation_data_path=" + validation_data_path
    , "--testing_data_path=" + testing_data_path
    , "--job_dir=" + job_dir
    , f"--strategy={DISTRIBUTION_STRATEGY}"
    , "--auto_shard_policy=auto" # data
    , f"--job_id={job_id}"
    , f"--TRAIN_GPU={TRAIN_GPU}"
    , f"--TRAIN_NGPU={TRAIN_NGPU}"
    , f"--reduction_cnt={REDUCTION_SERVER_COUNT}"
    , f"--replica_count={REPLICA_COUNT}"
    , f"--experiment_name={EXPERIMENT_NAME}"
    , f"--experiment_run={RUN_NAME}"
]

from utils import workerpool_specs

WORKER_POOL_SPECS = workerpool_specs.prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_count=TRAIN_NGPU,
    accelerator_type=TRAIN_GPU,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

from pprint import pprint
pprint(WORKER_POOL_SPECS)

[{'container_spec': {'args': ['--epochs=25',
                              '--steps_per_epoch=200',
                              '--eval_steps=50',
                              '--per_replica_batch_size=32',
                              '--training_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/train',
                              '--validation_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/valid',
                              '--testing_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/test',
                              '--job_dir=gs://jtv6-hybrid-vertex-bucket/jobs/job-20230620175832',
                              '--strategy=multiworker',
                              '--auto_shard_policy=data',
                              '--job_id=job-20230620175832',
                              '--TRAIN_GPU=NVIDIA_TESLA_V100',
                              '--TRAIN_NGPU=1',
                              '--reducti

In [76]:
vertex_ai_tb = vertex_ai.Tensorboard.create()
TENSORBOARD = vertex_ai_tb.gca_resource.name
print(TENSORBOARD)

projects/934903580331/locations/us-central1/tensorboards/2853830808404230144


In [77]:
vertex_ai.init(
    experiment=EXPERIMENT_NAME
    # , experiment_tensorboard=vertex_ai_tb
)

In [78]:
custom_mmr_job = vertex_ai.CustomJob(
    display_name=f'imdb-bert-{RUN_NAME}-{DISTRIBUTION_STRATEGY}-{REPLICA_COUNT}-{TRAIN_NGPU}-r',
    worker_pool_specs=WORKER_POOL_SPECS,
    staging_bucket=f'{STAGING_BUCKET}/{EXPERIMENT_NAME}/{RUN_NAME}'
)

In [79]:
custom_mmr_job.run(
    sync=False
    , service_account=VERTEX_SA
    , tensorboard=TENSORBOARD
    , restart_job_on_worker_restart=False
    , enable_web_access=True
)

In [80]:
print(f"Job Name: {custom_mmr_job.display_name}")
print(f"Job Resource Name: {custom_mmr_job.resource_name}\n")
# print(f"Check training progress at {custom_m_job._dashboard_uri()}")

Job Name: imdb-bert-run-20230620-175824-multiworker-2-1-r
Job Resource Name: projects/934903580331/locations/us-central1/customJobs/961210519748870144



## 2 Replicas, 2 GPUs each + Reduction Server

### set Experiment Run

In [81]:
RUN_NAME = f'run-{time.strftime("%Y%m%d-%H%M%S")}'

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")
print(f"RUN_NAME: {RUN_NAME}")

EXPERIMENT_NAME: jtv6-bert-tune
RUN_NAME: run-20230620-180730


In [82]:
# MACHINE_TYPE = 'a2-highgpu-2g'
# TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_A100', 2)

MACHINE_TYPE = 'n1-standard-16'
TRAIN_GPU, TRAIN_NGPU = ('NVIDIA_TESLA_T4', 2)

REPLICA_COUNT = 2
DISTRIBUTION_STRATEGY = "multiworker" # single, mirrored, multiworker, tpu

print(f"MACHINE_TYPE          : {MACHINE_TYPE}")
print(f"TRAIN_GPU             : {TRAIN_GPU}")
print(f"TRAIN_NGPU            : {TRAIN_NGPU}")
print(f"REPLICA_COUNT         : {REPLICA_COUNT}")
print(f"DISTRIBUTION_STRATEGY : {DISTRIBUTION_STRATEGY}")

MACHINE_TYPE          : n1-standard-16
TRAIN_GPU             : NVIDIA_TESLA_T4
TRAIN_NGPU            : 2
REPLICA_COUNT         : 2
DISTRIBUTION_STRATEGY : multiworker


In [83]:
epochs = 10
steps_per_epoch = 200
eval_steps = 50
PER_REPLICA_BATCH_SIZE = 32

REDUCTION_SERVER_COUNT = 4
REDUCTION_SERVER_MACHINE_TYPE = "n1-highcpu-16"

training_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/train'
validation_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/valid'
testing_data_path = f'{STAGING_BUCKET}/bert-finetuning/imdb/tfrecords/test'
job_id = 'job-{}'.format(datetime.now().strftime("%Y%m%d%H%M%S"))
job_dir = f'{STAGING_BUCKET}/jobs/{job_id}'

WORKER_ARGS = [
    "--epochs=" + str(epochs)
    , "--steps_per_epoch=" + str(steps_per_epoch)
    , "--eval_steps=" + str(eval_steps)
    , "--per_replica_batch_size=" + str(PER_REPLICA_BATCH_SIZE)
    , "--training_data_path=" + training_data_path
    , "--validation_data_path=" + validation_data_path
    , "--testing_data_path=" + testing_data_path
    , "--job_dir=" + job_dir
    , f"--strategy={DISTRIBUTION_STRATEGY}"
    , "--auto_shard_policy=auto" # data
    , f"--job_id={job_id}"
    , f"--TRAIN_GPU={TRAIN_GPU}"
    , f"--TRAIN_NGPU={TRAIN_NGPU}"
    , f"--reduction_cnt={REDUCTION_SERVER_COUNT}"
    , f"--replica_count={REPLICA_COUNT}"
    , f"--experiment_name={EXPERIMENT_NAME}"
    , f"--experiment_run={RUN_NAME}"
]

from utils import workerpool_specs

WORKER_POOL_SPECS = workerpool_specs.prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=WORKER_ARGS,
    replica_count=REPLICA_COUNT,
    machine_type=MACHINE_TYPE,
    accelerator_count=TRAIN_NGPU,
    accelerator_type=TRAIN_GPU,
    reduction_server_count=REDUCTION_SERVER_COUNT,
    reduction_server_machine_type=REDUCTION_SERVER_MACHINE_TYPE,
)

from pprint import pprint
pprint(WORKER_POOL_SPECS)

[{'container_spec': {'args': ['--epochs=25',
                              '--steps_per_epoch=200',
                              '--eval_steps=50',
                              '--per_replica_batch_size=32',
                              '--training_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/train',
                              '--validation_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/valid',
                              '--testing_data_path=gs://jtv6-hybrid-vertex-bucket/bert-finetuning/imdb/tfrecords/test',
                              '--job_dir=gs://jtv6-hybrid-vertex-bucket/jobs/job-20230620180732',
                              '--strategy=multiworker',
                              '--auto_shard_policy=data',
                              '--job_id=job-20230620180732',
                              '--TRAIN_GPU=NVIDIA_TESLA_T4',
                              '--TRAIN_NGPU=2',
                              '--reduction

In [84]:
vertex_ai_tb = vertex_ai.Tensorboard.create()
TENSORBOARD = vertex_ai_tb.gca_resource.name
print(TENSORBOARD)

projects/934903580331/locations/us-central1/tensorboards/1523017118516248576


In [85]:
vertex_ai.init(
    experiment=EXPERIMENT_NAME
    # , experiment_tensorboard=vertex_ai_tb
)

In [86]:
custom_r4_job = vertex_ai.CustomJob(
    display_name=f'imdb-bert-{RUN_NAME}-{DISTRIBUTION_STRATEGY}-{REPLICA_COUNT}-{TRAIN_NGPU}-r',
    worker_pool_specs=WORKER_POOL_SPECS,
    staging_bucket=f'{STAGING_BUCKET}/{EXPERIMENT_NAME}/{RUN_NAME}'
)

In [87]:
custom_r4_job.run(
    sync=False
    , service_account=VERTEX_SA
    , tensorboard=TENSORBOARD
    , restart_job_on_worker_restart=False
    , enable_web_access=True
)

In [88]:
print(f"Job Name: {custom_r4_job.display_name}")
print(f"Job Resource Name: {custom_r4_job.resource_name}\n")
# print(f"Check training progress at {custom_m_job._dashboard_uri()}")

Job Name: imdb-bert-run-20230620-180730-multiworker-2-2-r
Job Resource Name: projects/934903580331/locations/us-central1/customJobs/8357247007798067200

