# Hybrid ML with AzureClusterlessHPC

In this example we show how we can leverage AzureClusterlessHCP for hybrid machine learning. Users can run this Jupyter notebook on their laptop or a CPU machine in the cloud to develop and test their model. Once you're ready for training, you can simply remotely execute the training function on one or multiple GPU instances and fetch the trained network upon completion. There is no need for manually spinning up a GPU instance and for rerunning the full notebook.

## Setup

First, we set the required environment variables, load the package and then create an (empty) pool:

In [1]:
# Set environment variable to parameter file
ENV["PARAMETERS"] = joinpath(pwd(), "parameters.json")

# Load package
using AzureClusterlessHPC, PyPlot
batch_clear()

# Create pool of GPUs
create_pool();

Created pool 1 of 1 in southcentralus with 3 nodes.


## VGG-16 example for CIFAR10

The remainder of this notebook was taken from Flux' model zoo and contains the VGG16 model to train the CIFAR10 dataset. You can find the original example [here](https://github.com/FluxML/model-zoo/blob/master/vision/vgg_cifar10/vgg_cifar10.jl) (MIT expat license).

The only modifications with respect to the original code are the additions of the `@batchexec` macro for all function definitions and package loading statements:

In [2]:
@batchdef begin
    using Flux
    using Flux: onehotbatch, onecold, flatten
    using Flux.Losses: logitcrossentropy
    using Flux.Data: DataLoader
    using Parameters: @with_kw
    using Statistics: mean
    using CUDA
    using MLDatasets: CIFAR10
    using MLDataPattern: splitobs
    ENV["DATADEPS_ALWAYS_ACCEPT"] = "true"
end;

Next, we define the data loader function for the training data, which downloads the CIFAR10 dataset if it is not available locally:

In [3]:
@batchdef function get_processed_data(args)
    x, y = CIFAR10.traindata()

    (train_x, train_y), (val_x, val_y) = splitobs((x, y), at=1-args.valsplit)

    train_x = float(train_x)
    train_y = onehotbatch(train_y, 0:9)
    val_x = float(val_x)
    val_y = onehotbatch(val_y, 0:9)
    
    return (train_x, train_y), (val_x, val_y)
end;

Similarily, we supply an equivalent function for the testing data:

In [4]:
@batchdef function get_test_data()
    test_x, test_y = CIFAR10.testdata()
   
    test_x = float(test_x)
    test_y = onehotbatch(test_y, 0:9)
    
    return test_x, test_y
end;

Next, we implement a function that creates an instance of the VGG16 model:

In [5]:
# VGG16 and VGG19 models
@batchdef function vgg16()
    Chain(
        Conv((3, 3), 3 => 64, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(64),
        Conv((3, 3), 64 => 64, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(64),
        MaxPool((2,2)),
        Conv((3, 3), 64 => 128, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(128),
        Conv((3, 3), 128 => 128, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(128),
        MaxPool((2,2)),
        Conv((3, 3), 128 => 256, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(256),
        Conv((3, 3), 256 => 256, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(256),
        Conv((3, 3), 256 => 256, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(256),
        MaxPool((2,2)),
        Conv((3, 3), 256 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        Conv((3, 3), 512 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        Conv((3, 3), 512 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        MaxPool((2,2)),
        Conv((3, 3), 512 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        Conv((3, 3), 512 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        Conv((3, 3), 512 => 512, relu, pad=(1, 1), stride=(1, 1)),
        BatchNorm(512),
        MaxPool((2,2)),
        flatten,
        Dense(512, 4096, relu),
        Dropout(0.5),
        Dense(4096, 4096, relu),
        Dropout(0.5),
        Dense(4096, 10)
    )
end;

Additionally, we create a structure of default arguments that we pass to the training function. The arguments are hyper-parameters such as the batch size and learning rate:

In [6]:
@batchdef @with_kw mutable struct Args
    batchsize::Int = 128
    lr::Float64 = 3e-4
    epochs::Int = 50
    valsplit::Float64 = 0.1
end;

Finally, we implement our training function. This function takes the above defined optional input arguments and then trains the VGG16 model for a specified number of epochs. The function returns the trained network upon completion. Training is performed on a GPU if it is locally available and defaults to CPU otherwise:

In [7]:
@batchdef function train(; kws...)
    # Initialize the hyperparameters
    args = Args(; kws...)
    if CUDA.has_cuda()
        @info "Training on GPU"
    else
        @info "Training on CPU"
    end
    
    # Load the train, validation data 
    train_data, val_data = get_processed_data(args)
    
    train_loader = DataLoader(train_data, batchsize=args.batchsize, shuffle=true)
    val_loader = DataLoader(val_data, batchsize=args.batchsize)

    # Move to gpu if available
    @info("Constructing Model")
    m = vgg16() |> gpu

    loss(x, y) = logitcrossentropy(m(x), y)

    ## Training
    # Defining the optimizer
    opt = ADAM(args.lr)
    ps = Flux.params(m)

    @info("Training....")
    # Starting to train models
    for epoch in 1:args.epochs
        @info "Epoch $epoch"

        for (x, y) in train_loader
            x, y = x |> gpu, y |> gpu
            gs = Flux.gradient(() -> loss(x,y), ps)
            Flux.update!(opt, ps, gs)
        end

        validation_loss = 0f0
        for (x, y) in val_loader
            x, y = x |> gpu, y |> gpu
            validation_loss += loss(x, y)
        end
        validation_loss /= length(val_loader)
        @show validation_loss
    end

    m = m |> cpu
    return m
end;

Similar to the training function we define a test function, which takes the trained model as an input argument and return the testing accuracy:

In [8]:
@batchdef function test(m; kws...)
    args = Args(kws...)

    test_data = get_test_data()
    test_loader = DataLoader(test_data, batchsize=args.batchsize)

    correct, total = 0, 0
    for (x, y) in test_loader
        x, y = x |> gpu, y |> gpu
        correct += sum(onecold(cpu(m(x))) .== onecold(cpu(y)))
        total += size(y, 2)
    end
    test_accuracy = correct / total

    # Print the final accuracy
    @show test_accuracy
end;

## Train network and hyperparameter tuning

After implemeting the data loaders and network, we want to locally test our network and check that the output has the right dimensions: 

In [None]:
# Get data
test_x, test_y = get_test_data()

# Create network and test forward pass
network = vgg16()
ȳ = network(test_x[:,:,:,1:4])

# Check output has correct dimensions
if size(ȳ) == size(test_y[:,1:4])
    @info "Output dimenions match label dimensions."
end

# Plot images
for j=1:4
    subplot(1,4,j); imshow(permutedims(test_x[:,:,:,j], (2,1,3)))
end
tight_layout()

Once we have implemented (and tested) the network and data loader, we can perform the network training. We can either:

- Call the `train()` function directly, which runs the training on our local machine/laptop

- Call `@batchexec train()`, which will execute the training on a remote GPU instance in the cloud

To execute our training function on a remote Azure GPU instance, we use the latter command:

In [None]:
# Train on GPU remotely
bctrl = @batchexec train();

We can wait for training to complete and fetch the trained model to our notebook:

In [None]:
# Fetch trained model
m = fetch(bctrl)
y_pred = m(test_x);

In addition to running the training on a single GPU, we can even run multiple instances of our function in parallel. This is e.g. useful for hyperparameter tuning. Here, we supply a range of learning rates and then train 3 models in parallel:

In [None]:
# Hyperparameter tuning in parallel
steps = [1e-2, 1e-3, 1e-4]
bctrl = @batchexec pmap(α -> train(; lr=α), steps);

Once all trainings have finished and we select the best model (e.g. by monitoring the validation loss), we can copy the best model back to our notebook (e.g. here from worker 2):

In [None]:
# Fetch best model
m = fetch(bctrl, 2);

After fetching the best trained network, we evaluate it on our testing data. Again, we can either perform testing on our local machine or remotely on our GPU VM:

In [None]:
# Test model locally
test(m);

## Clean up

The final step is to delete the pool and all consumed Azure resources:

In [None]:
destroy!(bctrl);