# Train Image Classifier on GPU Cluster

This tutorial uses Saturn Cloud to access a GPU cluster. This is free for up to three hours per month of GPU usage. For more information about setup, visit https://www.saturncloud.io/docs/. 

Here I'm also using Weights and Biases, a model performance monitoring tool, to demonstrate the training speed and performance. To learn more about using Weights and Biases with a Saturn Cloud cluster, check out the tutorial at https://github.com/saturncloud/weights-and-biases/.

* https://www.saturncloud.io/
* https://wandb.ai/

### Specific libraries for distributed training

In [1]:
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
from torch.nn.parallel import DistributedDataParallel as DDP
from dask_pytorch_ddp import data, dispatch
import torch.distributed as dist
from dask.distributed import Client, progress

### Import helper functions and some additional libraries
* Label formatting
* Data preprocessing
* Plotting results


```
train_loader = torch.utils.data.DataLoader(
    data, 
    sampler=train_sampler, 
    batch_size=batch_size, 
    num_workers=num_workers, 
    multiprocessing_context=mp.get_context('fork')
)
```

In [2]:
%run -i fns.py

## Setup
Set preference for GPU resources and assign model hyperparameters, training data location, and [Saturn Cloud project ID for accessing GPU cluster](https://www.saturncloud.io/docs/getting-started/external_connect/).

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
### ============== Constants ============== ###
# Fill in your preferred values, including your Saturn Cloud project ID
model_params = {'n_epochs': 6, 
    'batch_size': 100,
    'base_lr': .01,
    'train_pct': .7,
    'downsample_to':1,
    'subset': True, # Whether to break data into N pieces for training
    'worker_ct': 6, # N of pieces to break into
    'bucket': "saturn-public-data",
    'prefix': "dogs/Images",
    'pretrained_classes':imagenetclasses} 

wbargs = {**model_params,
    'classes':120,
    'dataset':"StanfordDogs",
    'architecture':"ResNet"}

project_id = 'a2ae799b6f234f09bd0341aa9769971f'
num_workers = 40 # For lazy dataloader multiprocessing

## Training Function

In [10]:
def cluster_transfer_learn(bucket, prefix, train_pct, batch_size, downsample_to,
                          n_epochs, base_lr, pretrained_classes, subset, worker_ct):

    worker_rank = int(dist.get_rank())
    
    # --------- Format model and params --------- #
    device = torch.device("cuda")
    net = models.resnet50(pretrained=True) # True means we start with the imagenet version
    model = net.to(device)
    model = DDP(model)
    
    # Set up monitoring
    if worker_rank == 0:
        wandb.init(config=wbargs, reinit=True, project = 'cdl-demo')
        wandb.watch(model)
    
    criterion = nn.CrossEntropyLoss().cuda()    
    optimizer = optim.SGD(model.parameters(), lr=base_lr, momentum=0.9)
    
    # --------- Retrieve data for training and eval --------- #
    # Creates lazy-loading, multiprocessing DataLoader objects
    # for training and evaluation
    
    whole_dataset = preprocess(bucket, prefix, pretrained_classes)
    
    train, val = train_test_split(
        train_pct,
        whole_dataset, 
        batch_size=batch_size,
        downsample_to=downsample_to,
        subset = subset, 
        workers = worker_ct
    )
    
    dataloaders = {'train' : train, 'val': val}

    # --------- Start iterations --------- #
    for epoch in range(n_epochs):
        count = 0
        t_count = 0
        
    # --------- Training section --------- #    
        model.train()  # Set model to training mode
        for inputs, labels in dataloaders["train"]:
            dt = datetime.datetime.now().isoformat()

            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            perct = [torch.nn.functional.softmax(el, dim=0)[i].item() for i, el in zip(preds, outputs)]

            loss = criterion(outputs, labels)
            correct = (preds == labels).sum().item()
            
            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1
                
            # Record the results of this model iteration (training sample) for later review.
            if worker_rank == 0:
                wandb.log({
                        'loss': loss.item(),
                        'learning_rate':base_lr, 
                        'correct':correct, 
                        'epoch': epoch, 
                        'count': count
                    })
            if worker_rank == 0 and count % 5 == 0:
                wandb.log({f'predictions vs. actuals, training, epoch {epoch}, count {count}': plot_model_performance(
                    model, inputs, labels, preds, perct, pretrained_classes)})
                
    # --------- Evaluation section --------- #   
        with torch.no_grad():
            model.eval()  # Set model to evaluation mode
            for inputs_t, labels_t in dataloaders["val"]:
                dt = datetime.datetime.now().isoformat()

                inputs_t, labels_t = inputs_t.to(device), labels_t.to(device)
                outputs_t = model(inputs_t)
                _, pred_t = torch.max(outputs_t, 1)
                perct_t = [torch.nn.functional.softmax(el, dim=0)[i].item() for i, el in zip(pred_t, outputs_t)]

                loss_t = criterion(outputs_t, labels_t)
                correct_t = (pred_t == labels_t).sum().item()
            
                t_count += 1

                # Record the results of this model iteration (evaluation sample) for later review.
                if worker_rank == 0:
                    wandb.log({
                        'val_loss': loss_t.item(),
                        'val_correct':correct_t, 
                        'epoch': epoch, 
                        'count': t_count
                    })
                if worker_rank == 0 and count % 5 == 0:
                    wandb.log({f'predictions vs. actuals, eval, epoch {epoch}, count {t_count}': plot_model_performance(
                        model, inputs_t, labels_t, pred_t, perct_t, pretrained_classes)})


## Train Model

If you'll be using Weights and Biases to train, check to make sure your instance is logged in appropriately. [As the instructions show](https://github.com/saturncloud/weights-and-biases/), if you want to monitor the model training on the cluster, the login code needs to be in the Start Script for the cluster.

In [11]:
wandb.login()

True

### Saturn Connection Setup

Load your user token, [as described in the documentation](https://www.saturncloud.io/docs/getting-started/external_connect/), and create the connection to your project that allows cluster construction.

In [12]:
with open('config.json') as f:
    tokens = json.load(f)

conn = ExternalConnection(
    project_id=project_id,
    base_url='https://app.internal.saturnenterprise.io',
    saturn_token=tokens['api_token']
)
conn

<dask_saturn.external.ExternalConnection at 0x7f8238de0650>

### Start GPU Cluster

The free tier of Saturn Cloud service only allows 3 GPU workers in the cluster, but you can use more at the paid level.

In [13]:
cluster = SaturnCluster(
    external_connection=conn,
    n_workers=6,
    worker_size='g4dn4xlarge',
    scheduler_size='2xlarge',
    nthreads=16)

client = Client(cluster)
client.wait_for_workers(6)
client

INFO:dask-saturn:Cluster is ready
INFO:dask-saturn:Registering default plugins
INFO:dask-saturn:{'tcp://192.168.13.3:40541': {'status': 'repeat'}, 'tcp://192.168.206.131:38741': {'status': 'repeat'}, 'tcp://192.168.211.131:45811': {'status': 'repeat'}, 'tcp://192.168.3.195:37211': {'status': 'repeat'}, 'tcp://192.168.47.195:45463': {'status': 'repeat'}, 'tcp://192.168.5.3:42831': {'status': 'repeat'}}


0,1
Client  Scheduler: tls://d-steph-cdl-demo-fa90a721acb8498caea5f7a29a297b25.internal.saturnenterprise.io:8786  Dashboard: https://d-steph-cdl-demo-fa90a721acb8498caea5f7a29a297b25.internal.saturnenterprise.io,Cluster  Workers: 6  Cores: 96  Memory: 381.00 GB


### Run Training Function on Cluster

Distribute the training function and arguments to the cluster, where the parallel training process will take place. At this point, the model training and system resource performance can be visualized on Weights and Biases.

In [14]:
futures = dispatch.run(
    client, 
    cluster_transfer_learn, 
    **model_params
    )

futures
#futures[0].result()

[<Future: pending, key: dispatch_with_ddp-a7b6d76009a97050cfb671a7fb0ca369>,
 <Future: pending, key: dispatch_with_ddp-deb2bfe07879ea5b9c4988f0f57524d1>,
 <Future: pending, key: dispatch_with_ddp-dc1b3ba2d9deeebdc1b76ff86e04d027>,
 <Future: pending, key: dispatch_with_ddp-402f833334be2d0ca2c690926934789c>,
 <Future: pending, key: dispatch_with_ddp-3afc20d308e8b6d62e61fa3b5f1574ae>,
 <Future: pending, key: dispatch_with_ddp-ac456693a53226931e33e85e95874720>]

***

# Conclusions

### Cluster distributed training with Dask can speed up deep learning training with no loss of performance

### Cluster access is easier and more affordable than you might think

### Pay attention to data loading speed, as this can be a bottleneck

### Ensure that GPUs are being used to full potential, to avoid excess cost