<img src="../img/saturn_logo.png" width="300" />

We don't need to run all of Notebook 5 again, we'll just call `setup2.py` in the next chunk to get ourselves back to the right state. This also includes the reindexing work from Notebook 5, and a couple of visualization functions that we'll talk about later.

***
**Note: This notebook assumes you have an S3 bucket where you can store your model performance statistics.**  
If you don't have access to an S3 bucket, but would still like to train your model and review results, please visit [Notebook 6b](06b-transfer-training-local.ipynb) and [Notebook 7](07-learning-results.ipynb) to see detailed examples of how you can do that.
***

## Connect to Cluster

In [1]:
%run -i ../tools/setup2.py

display(HTML(gpu_links))

INFO:dask-saturn:Cluster is ready
INFO:dask-saturn:Registering default plugins
INFO:dask-saturn:{'tcp://10.0.15.16:38589': {'status': 'repeat'}, 'tcp://10.0.3.145:39377': {'status': 'repeat'}, 'tcp://10.0.8.9:36789': {'status': 'repeat'}}


In [2]:
import torch
from tensorboardX import SummaryWriter

from torch import nn, optim
from torch.nn.parallel import DistributedDataParallel as DDP

from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

import torch.distributed as dist

In [3]:
client

0,1
Client  Scheduler: tcp://d-steph-pytorch-training-90e6119a500640599ff558a22c25098d.main-namespace:8786  Dashboard: https://d-steph-pytorch-training-90e6119a500640599ff558a22c25098d.internal.saturnenterprise.io,Cluster  Workers: 3  Cores: 96  Memory: 382.50 GB


We're ready to do some learning! 

## Model Parameters

Aside from the Special Elements noted below, we can write this section essentially the same way we write any other PyTorch training loop. 
* Cross Entropy Loss for our loss function
* SGD (Stochastic Gradient Descent) for our optimizer

We have two stages in this process, as well - training and evaluation. We run the training set completely using batches of 100 before we move to the evaluation step, where we run the eval set completely also using batches of 100.

Most of the training workflow function shown will be very familiar for users of PyTorch. However, there are a couple of elements that are different.

### 1. DaskResultsHandler
In order to use the model output handler, we need to initialize the `DaskResultsHandler` class for our experiment, from `dask-pytorch-ddp`.
This object has a few important methods, including letting our model performance at each iteration be automatically documented.  

In [None]:
import uuid
key = uuid.uuid4().hex

rh = results.DaskResultsHandler(key)

### 2. Model to GPU Resources

```
device = torch.device(0)
net = models.resnet50(pretrained=True)
model = net.to(device)
```

We need to make sure our model is assigned to a GPU resource- here we do it one time before the training loops begin. We will also assign each image and its label to a GPU resource within the training and evaluation loops.


### 3. DDP Wrapper
```
model = DDP(model)
```

And finally, we need to enable the DistributedDataParallel framework. To do this, we are using the `DDP()` wrapper around the model, which is short for the PyTorch function `torch.nn.parallel.DistributedDataParallel`. There is a lot to know about this, but for our purposes the important thing is to understand that this allows the model training to run in parallel on our cluster. https://pytorch.org/docs/stable/notes/ddp.html



> **Discussing DDP**   
It may be interesting for you to know what DDP is really doing under the hood: for a detailed discussion and more tips about this same workflow, you can visit our blog to read more! [https://www.saturncloud.io/s/combining-dask-and-pytorch-for-better-faster-transfer-learning/](https://www.saturncloud.io/s/combining-dask-and-pytorch-for-better-faster-transfer-learning/)


***


# Training time!
Our whole training process is going to be contained in one function, here named `run_transfer_learning`.



## Modeling Functions

Setting these pretty basic steps into a function just helps us ensure perfect parity between our train and evaluation steps.

In [4]:
def iterate_model(inputs, labels, model, device):
    # Pass items to GPU
    inputs = inputs.to(device)
    labels = labels.to(device)

    # Run model iteration
    outputs = model(inputs)

    # Format results
    _, preds = torch.max(outputs, 1)
    perct = [torch.nn.functional.softmax(el, dim=0)[i].item() for i, el in zip(preds, outputs)]
    
    return inputs, labels, outputs, preds, perct

In [6]:
def run_transfer_learning(bucket, prefix, train_pct, batch_size, 
                          n_epochs, base_lr, imagenetclasses, 
                          n_workers = 1, subset = False):
    '''Load basic Resnet50, run transfer learning over given epochs.
    Uses dataset from the path given as the pool from which to take the 
    training and evaluation samples.'''
    
    worker_rank = int(dist.get_rank())
    
    # Set results writer
    writer = SummaryWriter(f's3://pytorchtraining/pytorch_bigbatch/learning_worker{worker_rank}')
    executor = ThreadPoolExecutor(max_workers=64)
    
    # --------- Format model and params --------- #
    device = torch.device("cuda")
    net = models.resnet50(pretrained=True) # True means we start with the imagenet version
    model = net.to(device)
    model = DDP(model)
    
    criterion = nn.CrossEntropyLoss().cuda()    
    optimizer = optim.SGD(model.parameters(), lr=base_lr, momentum=0.9)

    # --------- Retrieve data for training and eval --------- #
    whole_dataset = prepro_batches(bucket, prefix)
    new_class_to_idx = {x: int(replace_label(x, imagenetclasses)[1]) for x in whole_dataset.classes}
    whole_dataset.class_to_idx = new_class_to_idx
    
    train, val = get_splits_parallel(train_pct, whole_dataset, batch_size=batch_size, subset = subset, workers = n_workers)
    dataloaders = {'train' : train, 'val': val}

    # --------- Start iterations --------- #
    count = 0
    t_count = 0
    
    for epoch in range(n_epochs):
        agg_loss = []
        agg_loss_t = []
        
        agg_cor = []
        agg_cor_t = []
    # --------- Training section --------- #    
        model.train()  # Set model to training mode
        for inputs, labels in dataloaders["train"]:
            dt = datetime.datetime.now().isoformat()

            inputs, labels, outputs, preds, perct = iterate_model(inputs, labels, model, device)
            
            loss = criterion(outputs, labels)
            correct = (preds == labels).sum().item()
            
            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1
            
            # Track statistics
            for param_group in optimizer.param_groups:
                current_lr = param_group['lr']
                
            # Record the results of this model iteration (training sample) for later review.
            rh.submit_result(
                f"worker/{worker_rank}/data-{dt}.json", 
                json.dumps({
                    'loss': loss.item(),
                    'learning_rate':current_lr, 
                    'correct':correct, 
                    'epoch': epoch, 
                    'count': count, 
                    'worker': worker_rank, 
                    'sample': 'train'
                })
            )
                
    # --------- Evaluation section --------- #   
        with torch.no_grad():
            model.eval()  # Set model to evaluation mode
            for inputs_t, labels_t in dataloaders["val"]:
                dt = datetime.datetime.now().isoformat()
                
                inputs_t, labels_t, outputs_t, pred_t, perct_t = iterate_model(inputs_t, labels_t, model, device)

                loss_t = criterion(outputs_t, labels_t)
                correct_t = (pred_t == labels_t).sum().item()
            
                t_count += 1

                # Track statistics
                for param_group in optimizer.param_groups:
                    current_lr = param_group['lr']
                    
                # Record the results of this model iteration (evaluation sample) for later review.
                rh.submit_result(
                    f"worker/{worker_rank}/data-{dt}.json", 
                    json.dumps({
                        'loss': loss_t.item(),
                        'learning_rate':current_lr, 
                        'correct':correct_t, 
                        'epoch': epoch, 
                        'count': t_count, 
                        'worker': worker_rank, 
                        'sample': 'eval'
                    })
                )
        if worker_rank == 0:
            rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))

###### 
Now we've done all the hard work, and just need to run our function! Using `dispatch.run` from `dask-pytorch-ddp`, we pass in the transfer learning function so that it gets distributed correctly across our cluster. This creates futures and starts computing them.


In [7]:
import math
import numpy as np
import multiprocessing as mp
import datetime
import json 
import pickle
from concurrent.futures import ThreadPoolExecutor

num_workers = 64

s3 = s3fs.S3FileSystem()
with s3.open('s3://saturn-public-data/dogs/imagenet1000_clsidx_to_labels.txt') as f:
    imagenetclasses = [line.strip() for line in f.readlines()]
    
client.restart() # Clears memory on cluster- optional but recommended.

0,1
Client  Scheduler: tcp://d-steph-pytorch-training-90e6119a500640599ff558a22c25098d.main-namespace:8786  Dashboard: https://d-steph-pytorch-training-90e6119a500640599ff558a22c25098d.internal.saturnenterprise.io,Cluster  Workers: 3  Cores: 96  Memory: 382.50 GB


In [8]:
startparams = {'n_epochs': 6, 
                'batch_size': 100,
                'train_pct': .8,
                'base_lr': 0.01,
                'imagenetclasses':imagenetclasses,
                'subset': True,
                'n_workers': 3} #only necessary if you select subset

## Kick Off Job

### Send Tasks to Workers
 
We talked in Notebook 2 about how we distribute tasks to the workers in our cluster, and now you get to see it firsthand. Inside the `dispatch.run()` function in `dask-pytorch-ddp`, we are actually using the `client.submit()` method to pass tasks to our workers, and collecting these as futures in a list. We can prove this by looking at the results, here named "futures", where we can see they are in fact all pending futures, one for each of the workers in our cluster.

> *Why don't we use `.map()` in this function?*   
> Recall that `.map` allows the Cluster to decide where the tasks are completed - it has the ability to choose which worker is assigned any task. That means that we don't have the control we need to ensure that we have one and only one job per GPU. This could be a problem for our methodology because of the use of DDP.    
> Instead we use `.submit` and manually assign it to the workers by number. This way, each worker is attacking the same problem - our transfer learning problem - and pursuing a solution simultaneously. We'll have one and only one job per worker.

In [9]:
%%time    
futures = dispatch.run(client, run_transfer_learning, bucket = "saturn-public-data", prefix = "dogs/Images", **startparams)
futures

CPU times: user 39.6 ms, sys: 435 µs, total: 40.1 ms
Wall time: 40.2 ms


[<Future: pending, key: dispatch_with_ddp-98a39b1aca636983fb4acf1769934ef9>,
 <Future: pending, key: dispatch_with_ddp-923c7c9f12e5aa7ae181477a758e0a82>,
 <Future: pending, key: dispatch_with_ddp-9bc94b3cc8a7a75b8bf629084f1490fe>]

In [10]:
futures

[<Future: pending, key: dispatch_with_ddp-98a39b1aca636983fb4acf1769934ef9>,
 <Future: pending, key: dispatch_with_ddp-923c7c9f12e5aa7ae181477a758e0a82>,
 <Future: pending, key: dispatch_with_ddp-9bc94b3cc8a7a75b8bf629084f1490fe>]

In [None]:
futures[0].result()

<img src="https://media.giphy.com/media/VFDeGtRSHswfe/giphy.gif" alt="parallel" style="width: 200px;"/>

Now we let our workers run for awhile. This step will take time, so you may not be able to see the full results during our workshop. See the dashboards to view the GPUs efforts as the job runs.


### Retrieve Results

This step is where we gather up and save the results. While the cluster is working away at the computation, we can run the `process_results()` method on the DaskResultsHandler. This will be us requesting the results of each future as they run. To see partial results coming in, you should have the `workshop_results` folder in the folder menu a few moments after you run the next two chunks. Look in this folder to see the results each worker is returning to us.

In [None]:
!rm -rf /home/jovyan/project/workshop-dask-pytorch/workshop_results

In [None]:
%%time

rh.process_results("/home/jovyan/project/workshop-dask-pytorch/workshop_results", futures, raise_errors=False)

This task will continue to hold up your Jupyter instance until it has been able to collect all the results.

## Proof of Results

We don't have the time today to run an assortment of different cluster sizes to see what works best, but I happen to have the results of those runs saved and visualized, to demonstrate how well it works! [Follow me to Notebook 7!](07-learning-results.ipynb)