# Distributed training with Vertex Reduction server

In [161]:
import os
import pprint
import sys
import time

from google.cloud import aiplatform
from google.cloud.aiplatform_v1beta1 import types


## Set up environment

In [160]:
PROJECT = 'jk-mlops-dev'
REGION = 'us-central1'
API_ENDPOINT = f'{REGION}-aiplatform.googleapis.com'
ARTIFACT_REPO = f'jk-docker-repo-{REGION}'

## Prepare  a training container

In [None]:
options = dict(api_endpoint=API_ENDPOINT)
client = JobServiceClient(client_options=options)

### Create a Dockerfile

In [155]:
BASE_IMAGE = 'gcr.io/deeplearning-platform-release/tf2-gpu.2-5'
MODEL_GARDEN_VERSION = '2.5.0'
TF_TEXT='2.5.0'
TRAIN_IMAGE = f'{REGION}-docker.pkg.dev/{PROJECT}/{ARTIFACT_REPO}/model_garden'

dockerfile = f'''
FROM {BASE_IMAGE}

RUN apt remove -y google-fast-socket && \
    echo "deb https://packages.cloud.google.com/apt google-fast-socket main" | tee /etc/apt/sources.list.d/google-fast-socket.list && \
    curl -s -L https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
    apt update && apt install -y google-reduction-server

RUN pip install tf-models-official=={MODEL_GARDEN_VERSION} tensorflow-text=={TF_TEXT}

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

ENTRYPOINT ["python"]
CMD ["-c", "print('Hello')"]
'''

with open('Dockerfile', 'w') as f:
    f.write(dockerfile)

### Build a container image

In [156]:
! docker build -t {TRAIN_IMAGE} .

Sending build context to Docker daemon  543.2kB
Step 1/7 : FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-5
 ---> b963122c3c2c
Step 2/7 : RUN apt remove -y google-fast-socket &&     echo "deb https://packages.cloud.google.com/apt google-fast-socket main" | tee /etc/apt/sources.list.d/google-fast-socket.list &&     curl -s -L https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - &&     apt update && apt install -y google-reduction-server
 ---> Using cache
 ---> b5c475a6b8e1
Step 3/7 : RUN pip install tf-models-official==2.5.0 tensorflow-text==2.5.0
 ---> Using cache
 ---> 13bdf2be97e5
Step 4/7 : WORKDIR /
 ---> Using cache
 ---> 8c4ebbb80601
Step 5/7 : COPY trainer /trainer
 ---> Using cache
 ---> 247e00d0adc6
Step 6/7 : ENTRYPOINT ["python"]
 ---> Using cache
 ---> 9a74954f3825
Step 7/7 : CMD ["-c", "print('Hello')"]
 ---> Using cache
 ---> 1ba3b2f26eb2
Successfully built 1ba3b2f26eb2
Successfully tagged us-central1-docker.pkg.dev/jk-mlops-dev/jk-docker-repo-us-cen

### Push the container to Container Registry

In [157]:
! docker push {TRAIN_IMAGE}

Using default tag: latest
The push refers to repository [us-central1-docker.pkg.dev/jk-mlops-dev/jk-docker-repo-us-central1/model_garden]

[1Be1fe4344: Preparing 
[1B015e2c00: Preparing 
[1B7f4f633d: Preparing 
[1B961a296c: Preparing 
[1B53abc6c2: Preparing 
[1B3723ef37: Preparing 
[1B0089a9c0: Preparing 
[1B3e41a2c0: Preparing 
[1B25162004: Preparing 
[1B99d982dd: Preparing 
[1B6603d114: Preparing 
[1Bc97a79f1: Preparing 
[1Be02b8502: Preparing 
[1Bd34a65ac: Preparing 
[1Bce22e436: Preparing 
[1B7e013d33: Preparing 
[1Baff4f6ee: Preparing 
[1Be4ccb381: Preparing 
[1B90ceec1e: Preparing 
[1B0ab30137: Preparing 
[1Bed8ae595: Preparing 
[1B855df562: Preparing 
[1Bdb3c5655: Preparing 
[1B0a9a6a11: Preparing 
[1B7e8b38e6: Preparing 
[1B8f196cf4: Preparing 
[1B01dbc7de: Preparing 
[1B31d2d72b: Preparing 
[1Ba966f459: Preparing 
[1Bb9e63cdf: Preparing 
[1B49f5bf51: Preparing 
[1Baa2fa9fe: Preparing 
[1B325cc380: Preparing 
[1Bdd81f9fa: Preparing 
[1B09cad0b

## Submit Vertext Training jobs


### Define helper functions

In [158]:
def prepare_worker_pool_specs(
    image_uri,
    args,
    cmd, 
    replica_count=1,
    machine_type='n1-standard-4',
    accelerator_count=0,
    accelerator_type='ACCELERATOR_TYPE_UNSPECIFIED',
    reduction_server_count=0,
    reduction_server_machine_type='n1-highcpu-16',
    reduction_server_image_uri='us-docker.pkg.dev/vertex-ai-restricted/training/reductionserver:latest'
):

    if accelerator_count > 0:
        machine_spec = {
            'machine_type': machine_type,
            'accelerator_type': accelerator_type,
            'accelerator_count': accelerator_count,
        }
    else:
        machine_spec = {
            'machine_type': machine_type
        }
    
    container_spec = {
        'image_uri': image_uri,
        'args': args,
        'command': cmd,
    }
    
    chief_spec = {
        'replica_count': 1,
        'machine_spec': machine_spec,
        'container_spec': container_spec
    }

    worker_pool_specs = [chief_spec]
    if replica_count > 1:
        workers_spec = {
            'replica_count': replica_count - 1,
            'machine_spec': machine_spec,
            'container_spec': container_spec
        }
        worker_pool_specs.append(workers_spec)
        
    if reduction_server_count > 1:
        workers_spec = {
            'replica_count': reduction_server_count,
            'machine_spec': {
                'machine_type': reduction_server_machine_type,
            },
            'container_spec': {
                'image_uri': reduction_server_image_uri
            }
        }
        worker_pool_specs.append(workers_spec)
    
    return worker_pool_specs

### Prepare a worker pool specification

In [186]:
# Task config
MNLI_TRAIN_SPLIT = 'gs://jk-vertex-demos/datasets/MNLI/mnli_train.tf_record'
MNLI_VALID_SPLIT = 'gs://jk-vertex-demos/datasets/MNLI/mnli_valid.tf_record'
BERT_HUB_URL = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-24_H-1024_A-16/4'
job_name = "JOB_{}".format(time.strftime("%Y%m%d_%H%M%S"))
output_dir = f'gs://jk-vertex-demos/jobs'
model_dir = f'{output_dir}/{job_name}/model'
tfhub_cache_dir = f'{output_dir}/tfhub-cache'
config_file = 'trainer/glue_mnli_matched.yaml'
#mode = 'train_and_eval'
mode = 'train'
experiment = 'bert/sentence_prediction'

# Chief and workers config
#machine_type = 'n1-standard-8'
machine_type = 'a2-highgpu-1g'
accelerator_count = 1
#accelerator_type = 'NVIDIA_TESLA_T4' # 16 Gps egress
accelerator_type = 'NVIDIA_TESLA_A100'
all_reduce_alg = 'nccl'
strategy = 'multi_worker_mirrored'
replica_count = 8

# Reduction server config
reduction_server_count=0
reduction_server_machine_type='n1-highcpu-16' # 16 Gbps egress

# Trainer config
train_steps = 1000
steps_per_loop = 100
summary_interval = 100
validation_interval = 1200
checkpoint_interval = 1200
global_batch_size = 128


params_override = [
    'task.train_data.input_path=' + MNLI_TRAIN_SPLIT,
    'task.validation_data.input_path=' + MNLI_VALID_SPLIT,
    'task.train_data.global_batch_size=' + str(global_batch_size),
    'task.validation_data.global_batch_size=' + str(global_batch_size),
    'task.hub_module_url=' + BERT_HUB_URL,
    'runtime.num_gpus=' + str(accelerator_count),
    'runtime.distribution_strategy=' + strategy,
    'runtime.all_reduce_alg=' + all_reduce_alg,
    'trainer.train_steps=' + str(train_steps),
    'trainer.steps_per_loop=' + str(steps_per_loop),
    'trainer.summary_interval=' + str(summary_interval),
    'trainer.validation_interval=' + str(validation_interval),
    'trainer.checkpoint_interval=' + str(checkpoint_interval),
]


cmd = [
    "python", "trainer/train.py"
]
args = [
    '--experiment=' + experiment,
    '--mode=' + mode,
    '--model_dir=' + model_dir,
    '--config_file=' + config_file,
#    '--tfhub_cache_dir=' + tfhub_cache_dir,
    '--params_override=' + ','.join(params_override),
]

worker_pool_specs = prepare_worker_pool_specs(
    image_uri=TRAIN_IMAGE,
    args=args,
    cmd=cmd,
    replica_count=replica_count,
    machine_type=machine_type,
    accelerator_count=accelerator_count,
    accelerator_type=accelerator_type,
    reduction_server_count=reduction_server_count,
    reduction_server_machine_type=reduction_server_machine_type,
)

#pp = pprint.PrettyPrinter()
#print(pp.pformat(worker_pool_specs))


custom_job_spec = {
    'display_name': job_name,
    'job_spec': {
        'worker_pool_specs': worker_pool_specs
    }
}
pp = pprint.PrettyPrinter()
print(pp.pformat(custom_job_spec))

{'display_name': 'JOB_20210614_194650',
 'job_spec': {'worker_pool_specs': [{'container_spec': {'args': ['--experiment=bert/sentence_prediction',
                                                                 '--mode=train',
                                                                 '--model_dir=gs://jk-vertex-demos/jobs/JOB_20210614_194650/model',
                                                                 '--config_file=trainer/glue_mnli_matched.yaml',
                                                                 '--params_override=task.train_data.input_path=gs://jk-vertex-demos/datasets/MNLI/mnli_train.tf_record,task.validation_data.input_path=gs://jk-vertex-demos/datasets/MNLI/mnli_valid.tf_record,task.train_data.global_batch_size=128,task.validation_data.global_batch_size=128,task.hub_module_url=https://tfhub.dev/tensorflow/bert_en_uncased_L-24_H-1024_A-16/4,runtime.num_gpus=1,runtime.distribution_strategy=multi_worker_mirrored,runtime.all_reduce_alg=nccl,trainer.t

### Submit and monitor the job

In [187]:
#from google.cloud.aiplatform.gapic import \
#    JobServiceClient

from google.cloud.aiplatform_v1beta1.services.job_service import \
    JobServiceClient

options = dict(api_endpoint=API_ENDPOINT)
client = JobServiceClient(client_options=options)

parent = f"projects/{PROJECT}/locations/{REGION}"


response = client.create_custom_job(
    parent=parent, custom_job=custom_job_spec
)

response

name: "projects/895222332033/locations/us-central1/customJobs/2888711715283795968"
display_name: "JOB_20210614_194650"
job_spec {
  worker_pool_specs {
    machine_spec {
      machine_type: "a2-highgpu-1g"
      accelerator_type: NVIDIA_TESLA_A100
      accelerator_count: 1
    }
    replica_count: 1
    disk_spec {
      boot_disk_type: "pd-ssd"
      boot_disk_size_gb: 100
    }
    container_spec {
      image_uri: "us-central1-docker.pkg.dev/jk-mlops-dev/jk-docker-repo-us-central1/model_garden"
      command: "python"
      command: "trainer/train.py"
      args: "--experiment=bert/sentence_prediction"
      args: "--mode=train"
      args: "--model_dir=gs://jk-vertex-demos/jobs/JOB_20210614_194650/model"
      args: "--config_file=trainer/glue_mnli_matched.yaml"
      args: "--params_override=task.train_data.input_path=gs://jk-vertex-demos/datasets/MNLI/mnli_train.tf_record,task.validation_data.input_path=gs://jk-vertex-demos/datasets/MNLI/mnli_valid.tf_record,task.train_data.glo

## Upload logs to Tensorboard

In [138]:
print('TENSORBOARD={}'.format('projects/895222332033/locations/us-central1/tensorboards/5983067289333792768'))
print('LOGDIR={}'.format(model_dir))
print('EXPERIMENT={}'.format(job_name))
print('./tb-gcp-uploader --tensorboard_resource_name $TENSORBOARD   --logdir=$LOGDIR   --experiment_name=$EXPERIMENT --one_shot=True')

TENSORBOARD=projects/895222332033/locations/us-central1/tensorboards/5983067289333792768
LOGDIR=gs://jk-vertex-demos/jobs/JOB_20210613_155939/model
EXPERIMENT=JOB_20210613_155939
./tb-gcp-uploader --tensorboard_resource_name $TENSORBOARD   --logdir=$LOGDIR   --experiment_name=$EXPERIMENT --one_shot=True


### Test the container image locally

In [None]:
MNLI_TRAIN_SPLIT = 'gs://jk-vertex-demos/datasets/MNLI/mnli_train.tf_record'
MNLI_VALID_SPLIT = 'gs://jk-vertex-demos/datasets/MNLI/mnli_valid.tf_record'
BERT_HUB_URL = 'https://tfhub.dev/tensorflow/bert_en_uncased_L-24_H-1024_A-16/4'

num_gpus = 2 
strategy = 'mirrored'
#strategy = 'multi_worker_mirrored'

params_override = [
    'task.train_data.input_path=' + MNLI_TRAIN_SPLIT,
    'task.validation_data.input_path=' + MNLI_VALID_SPLIT,
    'task.hub_module_url=' + BERT_HUB_URL,
    'runtime.num_gpus=' + str(num_gpus),
    'runtime.distribution_strategy=' + strategy,
]

params = ','.join(params_override)

In [None]:
!docker run -it --rm --gpus all {TRAIN_IMAGE} trainer/train.py \
--experiment=bert/sentence_prediction \
--mode=train_and_eval \
--model_dir={STAGING_BUCKET}/test \
--config_file=trainer/glue_mnli_matched.yaml \
--params_override={params}  


In [None]:
#from google.cloud.aiplatform.gapic import \
#    JobServiceClient

In [150]:
STAGING_BUCKET = f'gs://jk-vertex-{REGION}'

aiplatform.init(
    project=PROJECT,
    location=REGION,
    staging_bucket=STAGING_BUCKET
)
REGION

'us-central1'

In [151]:
display_name = job_name

job = aiplatform.CustomJob(
    display_name=display_name,
    worker_pool_specs=worker_pool_specs,
)

job.run(sync=True)

INFO:google.cloud.aiplatform.jobs:Creating CustomJob


InvalidArgument: 400 Machine type "a2_highgpu-1g" is not supported.