# Training on a single node

## Prepare a training script

In [None]:
import os

script_folder = './train'
os.makedirs(script_folder, exist_ok=True)

In [None]:
%%writefile $script_folder/train.py

from absl import flags
from absl import app

import os
import tensorflow as tf
import numpy as np

from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dropout, Dense, add

import horovod.tensorflow.keras as hvd

#tf.enable_eager_execution()


IMAGE_SHAPE = (32, 32, 3)
NUM_CLASSES = 10

def toy_resnet_model():
    inputs = Input(shape=IMAGE_SHAPE, name='image')
    x = Conv2D(32, 3, activation='relu')(inputs)
    x = Conv2D(64, 3, activation='relu')(x)
    block_1_output = MaxPooling2D(3)(x)
    
    x = Conv2D(64, 3, activation='relu', padding='same')(block_1_output)
    x = Conv2D(64, 3, activation='relu', padding='same')(x)
    block_2_output = add([x, block_1_output])
    
    x = Conv2D(64, 3, activation='relu', padding='same')(x)
    x = Conv2D(64, 3, activation='relu', padding='same')(x)
    block_3_output = add([x, block_2_output])
    
    x = Conv2D(64, 3, activation='relu')(block_3_output)
    x = GlobalAveragePooling2D()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    outputs = Dense(10, activation='softmax')(x)
    
    model = Model(inputs, outputs, name='toy_resnet')
    
    return model


def prepare_datasets():
    def _parse_record(example_proto):
        features = {
            'image': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64, default_value=0)
        }
        
        parsed_features = tf.parse_single_example(example_proto, features)
        image = parsed_features['image']
        label = parsed_features['label']
        
        image = tf.image.decode_png(image, channels=3)
        image = tf.cast(image, tf.float32)
        image = image / 255
        
        label = tf.one_hot(label, NUM_CLASSES)
        
        return image, label

    
    train_dataset = tf.data.TFRecordDataset(FLAGS.train_files)
    eval_dataset = tf.data.TFRecordDataset(FLAGS.eval_files)
    
    train_dataset = train_dataset.map(_parse_record)
    eval_dataset = eval_dataset.map(_parse_record)
    
    train_dataset = train_dataset.shuffle(4096).batch(FLAGS.batch_size).repeat()
    eval_dataset = eval_dataset.batch(FLAGS.batch_size).repeat()
    
    return train_dataset, eval_dataset


def train_evaluate():
    
    # Initialize Horovod
    hvd.init()
    
    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    config.gpu_options.visible_device_list = str(hvd.local_rank())
    tf.keras.backend.set_session(tf.Session(config=config))
    
    train_dataset, eval_dataset = prepare_datasets()
    
    model = toy_resnet_model()
    
    # Wrap an optimizer in Horovod
    optimizer = hvd.DistributedOptimizer(optimizers.Adadelta())
  
    model.compile(optimizer=optimizer,
             loss="categorical_crossentropy",
             metrics=["accuracy"]
             )

    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with loaded weights.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
        # Horovod: average metrics among workers at the end of every epoch.
        #
        # Note: This callback must be in the list before the ReduceLROnPlateau,
        # TensorBoard, or other metrics-based callbacks.
        hvd.callbacks.MetricAverageCallback()
    ]
    
    # Horovod: save checkpoints only on worker 0 (master) to prevent other workers from corrupting them.
    # Configure Tensorboard and Azure ML Tracking
    if hvd.rank() == 0:
        #callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
        callbacks.append(tf.keras.callbacks.TensorBoard(log_dir=FLAGS['job-dir'].value, update_freq='epoch'))
    
    model.fit(train_dataset,
         epochs=FLAGS.epochs,
         steps_per_epoch=1000,
         callbacks=callbacks,
         validation_data=eval_dataset,
         validation_steps=200)                    
    
    # Save the trained model to outputs folder on the master
    #if hvd.rank() == 0:  
    #    print("Training completed.")
    #    os.makedirs('outputs', exist_ok=True)
    #    model_file = os.path.join('outputs', 'aerial_model_fine_tune.h5')
    #    model.save(model_file)

    

FLAGS = flags.FLAGS
flags.DEFINE_list("train_files", None, "Training TFRecord files")
flags.DEFINE_list("eval_files", None, "Evaluation TFRecord files")

flags.DEFINE_integer("epochs", 5, "Number of epochs to train")
flags.DEFINE_integer("batch_size", 32, "Batch size")
flags.DEFINE_integer("steps_per_epoch", 1000, "Steps per epoch")
flags.DEFINE_integer("validation_steps", 20, "Batch size")

flags.DEFINE_string("job-dir", None, "Job dir")

# Required flags
flags.mark_flag_as_required("train_files")
flags.mark_flag_as_required("eval_files")


def main(argv):
    del argv #Unused
    
    train_evaluate()
     

if __name__ == '__main__':
    
    app.run(main)


## Set up training 

### Build the container

In [10]:
PROJECT_ID='sandbox-235500'
IMAGE_REPO_NAME='horovod'
IMAGE_TAG='gpu'
IMAGE_URI='gcr.io/' + PROJECT_ID + '/' + IMAGE_REPO_NAME + ':' + IMAGE_TAG

In [11]:
IMAGE_URI

'gcr.io/sandbox-235500/horovod:gpu'

In [34]:
!docker build -f Dockerfile -t $IMAGE_URI ./

Sending build context to Docker daemon  59.39kB
Step 1/21 : FROM nvidia/cuda:9.0-devel-ubuntu16.04
 ---> c24bd4961e81
Step 2/21 : ENV TENSORFLOW_VERSION=1.12.0
 ---> Using cache
 ---> 69f25775acd2
Step 3/21 : ENV CUDNN_VERSION=7.4.1.5-1+cuda9.0
 ---> Using cache
 ---> 1f6e7a9c6153
Step 4/21 : ENV NCCL_VERSION=2.3.7-1+cuda9.0
 ---> Using cache
 ---> 0006b651299d
Step 5/21 : ARG python=3.5
 ---> Using cache
 ---> 42b54bdb5ef5
Step 6/21 : ENV PYTHON_VERSION=${python}
 ---> Using cache
 ---> e47eeaf95b73
Step 7/21 : RUN apt-get update && apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends         build-essential         cmake         git         curl         vim         wget         jq         ca-certificates         libcudnn7=${CUDNN_VERSION}         libnccl2=${NCCL_VERSION}         libnccl-dev=${NCCL_VERSION}         libjpeg-dev         libpng-dev         python${PYTHON_VERSION}         python${PYTHON_VERSION}-dev
 ---> Using cache
 ---> bea2f239d7a

## Run the container locally

In [2]:
TRAIN_DATA='gs://jkdatasets/cifar10/cifar10-train.tfrecord-00000-of-00010,gs://jkdatasets/cifar10/cifar10-train.tfrecord-00001-of-00010'
EVAL_DATA='gs://jkdatasets/cifar10/cifar10-test.tfrecord-00000-of-00001'
JOB_DIR='/tmp'

In [None]:
!docker run --rm --runtime=nvidia $IMAGE_URI \
--train_files=$TRAIN_DATA \
--eval_files=$EVAL_DATA \
--epochs=2 \
--job-dir=$JOB_DIR

In [35]:
!docker push $IMAGE_URI

The push refers to repository [gcr.io/sandbox-235500/horovod]

[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[1B
[21Bgpu: digest: sha256:111fd7f2b084bbb842bdcdc5b96a06b02408b0613242f6c045d329eef8efbd9a size: 4722


## Run the job on CMLE

In [6]:
import datetime

REGION='us-west1'
BUCKET_NAME='gs://jkcmle'
JOB_NAME = 'horovod_' + datetime.datetime.today().strftime('%Y%M%d_%H%M%S')
JOB_DIR=BUCKET_NAME + '/jobs/' + JOB_NAME

In [36]:
REGION='us-west1'
BUCKET_NAME='gs://jkcmle'
JOB_NAME = 'horovod_' + datetime.datetime.today().strftime('%Y%M%d_%H%M%S')
JOB_DIR=BUCKET_NAME + '/jobs/' + JOB_NAME

!gcloud beta ml-engine jobs submit training $JOB_NAME \
--region $REGION \
--scale-tier custom \
--master-image-uri $IMAGE_URI \
--master-machine-type standard \
--worker-image-uri $IMAGE_URI \
--worker-machine-type standard \
--worker-server-count 2





Job [horovod_20194702_054716] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe horovod_20194702_054716

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs horovod_20194702_054716
jobId: horovod_20194702_054716
state: QUEUED


In [None]:
!gcloud beta ml-engine jobs submit training $JOB_NAME \
--region $REGION \
--master-image-uri $IMAGE_URI \
--scale-tier BASIC_GPU \
-- \
--train_files $TRAIN_DATA \
--eval_files $EVAL_DATA \
--epochs 2
