# Part 4 - Parallel Training
In this section of the lab we will train a small fully connected neural network using the bottleneck features created in the previous part of the lab.


![Transfer Learning](../images/TLArch.png)

We will use TensorFlow's Keras API to define the network and run training. 

We will run training on Azure Batch AI GPU cluster. After the model is trained, it will be registered in AML Model Registry.

![AML Arch](../images/amlarch.png)



### Create training script

The script trains a simple network with one hidden layer. The input to the network is a vector of 2048 floating point numbers - the bottleneck features created in the previous step of the lab. The output layer consists of 6 units - representing six land type classes. To control overfitting the network uses a Dropout layer between the hidden layer and the output layer and L1 and L2 regularization in the output layer.

The number of units in the hidden layer, L1 and L2 values, and batch size are all tuneable hyperparameters. The Dropout ratio is fixed at 0.5.

Since the bottleneck feature files are small (as compared to original image datasets) they can be loaded into memory all at once. 

The trained model will be saved into the `./outputs` folder. This is one of the special folders in AML. The other one is the `./logs` folder. The content in these folders is automatically uploaded to the run history.

The script uses AML `Run` object to track two performane measures: **training accuracy** and **validation accuracy**. The metrics are captured at the end of each epoch.

#### Create a folder to hold the script

In [4]:
from azureml.core import Workspace

ws = Workspace.from_config()

Found the config file in: /home/demouser/repos/AML/aml_config/config.json


In [7]:
from azureml.core.model import Model
import os


model=Model(ws, 'aerial_keras')
model.download(target_dir = './aerial_keras')

'./aerial_keras/aerial_keras.h5'

In [9]:
model.get_model_path('aerial_keras')

'aerial_keras'

In [2]:
import os
script_folder = './script'
os.makedirs(script_folder, exist_ok=True)

#### Use Jupyter `%%writefile` magic to write the script


In [3]:
%%writefile $script_folder/fine-tune.py

import os
import tensorflow as tf
from tensorflow.keras.applications import resnet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, Flatten, Input
from tensorflow.keras.regularizers import l1_l2

from azureml.core import Run

import numpy as np
import random
import h5py

import azureml.core as aml



# Create custom callback to track accuracy measures in AML Experiment
class RunCallback(tf.keras.callbacks.Callback):
    def __init__(self, run):
        self.run = run
        
    def on_epoch_end(self, batch, logs={}):
        self.run.log(name="training_acc", value=float(logs.get('acc')))
        self.run.log(name="validation_acc", value=float(logs.get('val_acc')))


# Returns a dataset based on a list of TFRecords files passsed as a parameters. 
def create_dataset(files, batch_size=32, prefetch_buffer_size=1, train=True, buffer_size=10000, num_parallel_calls=4):
    IMAGE_SHAPE = (224, 224, 3,)
    NUM_CLASSES = 6
          
    # Extract image and label from proto Example
    def _parse(example_proto):
        features = {'label': tf.FixedLenFeature((), tf.int64, default_value=0),
                    'image': tf.FixedLenFeature((), tf.string, default_value="")}
        parsed_features = tf.parse_single_example(example_proto, features)
        label = parsed_features['label']
        label = tf.one_hot(label, NUM_CLASSES)
        image = image = tf.decode_raw(parsed_features['image'], tf.uint8)
        image = tf.cast(image, tf.float32)
        image = tf.reshape(image, IMAGE_SHAPE)
                                                                  
        # Pre-process image data for VGG16
        #   RGB -> BGR
        image = image[..., ::-1]
        #   Substract the Imagenet mean for each channel
        imagenet_mean=tf.constant(-np.array([103.939, 116.779, 123.68]), dtype=tf.float32)
        image = tf.nn.bias_add(image, imagenet_mean)
        #image = tf.subtract(image, imagenet_mean)
        #image = resnet50.preprocess_input(image)
        
        return image, label

    dataset = tf.data.TFRecordDataset(files)
    dataset = dataset.map(_parse, num_parallel_calls=num_parallel_calls)
    #dataset = dataset.map(_parse)
    if train:
        dataset = dataset.shuffle(buffer_size)
    dataset = dataset.batch(batch_size=batch_size)
    dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
    dataset = dataset.repeat()

    return dataset


def get_num_of_records(files):
    count = 0
    for file in files:
        for record in tf.python_io.tf_record_iterator(file):
            count += 1
    return count


# Training regime
def train_evaluate(run):
   
    
    # Creating datasets
    training_files = [os.path.join(FLAGS.training_dir, file) for file in os.listdir(FLAGS.training_dir)]
    validation_files = [os.path.join(FLAGS.validation_dir, file) for file in os.listdir(FLAGS.validation_dir)]
                                                                          
    train_dataset = create_dataset(training_files, batch_size = 32, train=True)
    valid_dataset = create_dataset(validation_files, batch_size = 32, train=False)
     
    
    # Create a model
    # Load the top model
    model=Model(ws, FLAGS.model_name)
    model.download(target_dir = '.')
    top_model = keras.models.load_model(FLAGS.model_name)

    # Create a vgg16 trunk
    trunk = vgg16.VGG16(
                weights = 'imagenet', 
                input_shape=(224,224,3), 
                include_top = False,
                pooling = 'avg')

    # Stack the models
    full_model = Model(inputs=trunk.input, outputs=top_model(trunk.output))
    full_model.summary()
    
    # Freeze the bottom layers. Up to the last conv block
    for layer in trunk.layers[:15]:
        layer.trainable = False
    
    for layer in trunk.layers[15:]:
        layer.trainable = True
    
    # compile the model with a SGD/momentum optimizer
    # and a very slow learning rate
    optimizer = SGD(lr=0.00001, momentum=0.9)
    loss = 'categorical_crossentropy'
    metrics = ['accuracy']
    full_model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

    # Create a callback
    run_callback = RunCallback(run)
    
    # Set up training
    steps_per_epoch = get_num_of_records(training_files)//FLAGS.batch_size
    validation_steps = get_num_of_records(validation_files)//FLAGS.batch_size
    
    # Start training
    print("Starting training")
    full_model.fit(train_dataset,
        epochs = FLAGS.epochs,
        steps_per_epoch = steps_per_epoch,
        validation_data = valid_dataset,
        validation_steps = validation_steps)
          
    # Save the trained model to outp'uts which is a standard folder expected by AML
    print("Training completed.")
    os.makedirs('outputs', exist_ok=True)
    model_file = os.path.join('outputs', 'aerial_keras_full.h5')
    print("Saving model to: {0}".format(model_file))
    model.save(model_file)
    

FLAGS = tf.app.flags.FLAGS

# Default global parameters
tf.app.flags.DEFINE_integer('batch_size', 32, "Number of images per batch")
tf.app.flags.DEFINE_integer('epochs', 10, "Number of epochs to train")
tf.app.flags.DEFINE_string('training_dir', 'tfrecords', "Directory with training files")
tf.app.flags.DEFINE_string('validation_dir', 'tfrecords', "Directory with validation files")
tf.app.flags.DEFINE_string('model_name', 'aerial_keras.h5', "Model name")

def main(argv=None):
    # get hold of the current run
    run = Run.get_submitted_run()
    train_evaluate(run)
  

if __name__ == '__main__':
    tf.app.run()

Writing ./script/fine-tune.py


### Connect to AML workspace


In [2]:
import azureml.core
from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep='\n')

Found the config file in: /home/demouser/repos/HighPerformanceTensorFlowOnAzure/aml_config/config.json
jkaml
jkaml
eastus2
952a710c-8d9c-40c1-9fec-f752138cc0b3


### Create Experiment

We will create a new experiment to manage training logs

In [None]:
experiment_name = 'aerial-fine-tune'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

### Create  compute target

We will reuse the GPU VM created in the previous step of the lab. 

In [3]:
from azureml.core.compute import DsvmCompute
from azureml.core.compute_target import ComputeTargetException

compute_target_name = 'gpudsvm'
compute_target_type = 'Standard_NC6'

try:
    dsvm_compute = DsvmCompute(workspace=ws, name=compute_target_name)
    print('Found existing DSVM:', dsvm_compute.name)
except ComputeTargetException:
    dsvm_config = DsvmCompute.provisioning_configuration(vm_size=compute_target_type)
    dsvm_compute = DsvmCompute.create(ws, name=compute_target_name, provisioning_configuration=dsvm_config)
    dsvm_compute.wait_for_completion(show_output=True)

Found existing DSVM: gpudsvm


### Configure datastore

The bottleneck files have been uploaded to the workspace's default datastore during the previous step. We will mount the store on the nodes of the cluster.

In [None]:
from azureml.core import Datastore

ds = ws.get_default_datastore()
print("Using the default datastore for training data: ")
print(ds.name, ds.datastore_type, ds.account_name, ds.container_name)


### Create an estimator

As in the previous step of the lab, we will use **Estimator** to submit a run.


In [None]:
from azureml.train.estimator import Estimator

script_params = {
    'training_dir': ds.path('training').as_download(),
    'validation_dir': ds.path('validation').as_download(),
    'batch_size': 32,
    'epochs': 50,
    'model_name': 'aerial_keras'
}

pip_packages = ['h5py','pillow','tensorflow-gpu']

est = Estimator(source_directory=script_folder,
                script_params=script_params,
                compute_target=dsvm_compute,
                entry_script='train.py',
                node_count=1,
                process_count_per_node=1,
                use_gpu=True,
                pip_packages=pip_packages
                )

### Run the job

Run the experiment by submitting the estimator object.

In [None]:
run = exp.submit(config=est)
run

Since the call is asynchronous, it returns a **Preparing** or **Running** state as soon as the job is started.

### Monitor a remote run

In total, the first run takes **approximately 10 minutes**. But for subsequent runs, as long as the script dependencies don't change, the same image is reused and hence the container start up time is much faster.

Here is what's happening while you wait:

- **Image creation**: A Docker image is created matching the Python environment specified by the estimator. The image is uploaded to the workspace. This stage happens once for each Python environment since the container is cached for subsequent runs.  During image creation, logs are streamed to the run history. You can monitor the image creation progress using these logs.

- **Scaling**: If the remote cluster requires more nodes to execute the run than currently available, additional nodes are added automatically. 

- **Running**: In this stage, the necessary scripts and files are sent to the compute target, then data stores are mounted/copied, then the entry_script is run. While the job is running, stdout and the ./logs directory are streamed to the run history. You can monitor the run's progress using these logs.

- **Post-Processing**: The ./outputs directory of the run is copied over to the run history in your workspace so you can access these results.


You can check the progress of a running job in multiple ways. This tutorial uses a Jupyter widget as well as a `wait_for_completion` method. 

### Jupyter widget

Watch the progress of the run with a Jupyter widget.  Like the run submission, the widget is asynchronous and provides live updates every 10-15 seconds until the job completes. 

Note: Currently, there is a problem with RunDetails widget in DSVM. 

In [None]:
from azureml.train.widgets import RunDetails
RunDetails(run).show()

### Get log results upon completion

Model training and monitoring happen in the background. Wait until the model has completed training before running more code. 

In [None]:
run.wait_for_completion(show_output=False) # specify True for a verbose log

### Display run results

The training has completed. You can see the logs generated during the run by executing `Run.get_file_names()` method.

In [None]:
print(run.get_file_names())

## Next Step

|

## Clean up resources

Before you move to the next step, you can delete the GPU VM. We will not need it anymore.

In [4]:
dsvm_compute.delete()