# Workflow Interface 201: Using Ray to request exclusive GPUs
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb)

In this OpenFL Workflow Interface tutorial, we will demonstrate how to leverage multiple GPUs concurrent model training using the LocalRuntime's Ray Backend. 

# Getting Started

First we start by installing the necessary dependencies for the workflow interface

In [None]:
!pip install git+https://github.com/intel/openfl.git
!pip install -r requirements_workflow_interface.txt

# Uncomment this if running in Google Colab
#!pip install -r https://raw.githubusercontent.com/intel/openfl/develop/openfl-tutorials/experimental/requirements_workflow_interface.txt
#import os
#os.environ["USERNAME"] = "colab"

We begin with the quintessential example of a small pytorch CNN model trained on the MNIST dataset. Let's start define our dataloaders, model, optimizer, and some helper functions like we would for any other deep learning experiment. Notice the `inference` functions makes consistent references to `cuda:0`. This will become important later

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch
import torchvision
import numpy as np

n_epochs = 3
batch_size_train = 64
batch_size_test = 1000
learning_rate = 0.01
momentum = 0.5
log_interval = 10

random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

mnist_train = torchvision.datasets.MNIST('files/', train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ]))

mnist_test = torchvision.datasets.MNIST('files/', train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ]))

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
    
def inference(network,test_loader):
    if torch.cuda.is_available():
        network = network.to('cuda:0')
    network.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
      for data, target in test_loader:
        if torch.cuda.is_available():
          data = data.to('cuda:0')
          target = target.to('cuda:0')
        output = network(data)
        test_loss += F.nll_loss(output, target, size_average=False).item()
        pred = output.data.max(1, keepdim=True)[1]
        correct += pred.eq(target.data.view_as(pred)).sum()
    test_loss /= len(test_loader.dataset)
    print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))
    accuracy = float(correct / len(test_loader.dataset))
    return accuracy

Next we import the `FLSpec`, `LocalRuntime`, and placement decorators.

- `FLSpec` – Defines the flow specification. User defined flows are subclasses of this.
- `Runtime` – Defines where the flow runs, infrastructure for task transitions (how information gets sent). The `LocalRuntime` runs the flow on a single node.
- `aggregator/collaborator` - placement decorators that define where the task will be assigned

In [None]:
from copy import deepcopy

from openfl.experimental.interface import FLSpec, Aggregator, Collaborator
from openfl.experimental.runtime import LocalRuntime
from openfl.experimental.placement import aggregator, collaborator


def FedAvg(models):
    models = [model.to('cpu') for model in models]
    new_model = models[0]
    state_dicts = [model.state_dict() for model in models]
    state_dict = new_model.state_dict()
    for key in models[1].state_dict():
        state_dict[key] = np.sum([state[key]
                                 for state in state_dicts], axis=0) / len(models)
    new_model.load_state_dict(state_dict)
    return new_model



Now we come to the updated flow definition. Here we request `@collaborator(num_gpus=1)` as the placement decorator, which will require a dedicated GPU for each collaborator task. Tune this based on your use case, but because this uses Ray internally, you can also pass through a [fraction of a GPU](https://docs.ray.io/en/latest/ray-core/tasks/using-ray-with-gpus.html#fractional-gpus), which will allow more than one task to run on each GPU (i.e. `@collaborator(num_gpus=0.5)` would result in two tasks per GPU). 

In [None]:
class CollaboratorGPUFlow(FLSpec):

    def __init__(self, model = None, optimizer = None, rounds=3, **kwargs):
        super().__init__(**kwargs)
        if model is not None:
            self.model = model
            self.optimizer = optimizer
        else:
            self.model = Net()
            self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,
                                   momentum=momentum)
        self.rounds = rounds

    @aggregator
    def start(self):
        print(f'Performing initialization for model')
        self.collaborators = self.runtime.collaborators
        self.private = 10
        self.current_round = 0
        self.next(self.aggregated_model_validation,foreach='collaborators',exclude=['private'])

    @collaborator(num_gpus=1)
    def aggregated_model_validation(self):
        print(f'Performing aggregated model validation for collaborator {self.input}')
        self.agg_validation_score = inference(self.model,self.test_loader)
        print(f'{self.input} value of {self.agg_validation_score}')
        self.next(self.train)

    @collaborator(num_gpus=1)
    def train(self):
        """
        Train the model.
        """
        self.model.train()
        self.optimizer = optim.SGD(self.model.parameters(), lr=learning_rate,
                              momentum=momentum)
        train_losses = []
        for batch_idx, (data, target) in enumerate(self.train_loader):
            if torch.cuda.is_available():
                data = data.to("cuda:0")
                target = target.to("cuda:0")
            self.optimizer.zero_grad()
            output = self.model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            self.optimizer.step()
            if batch_idx % log_interval == 0:
                print('Train Epoch: 1 [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    batch_idx * len(data), len(self.train_loader.dataset),
                    100. * batch_idx / len(self.train_loader), loss.item()))
                self.loss = loss.item()
        self.next(self.local_model_validation)

    @collaborator(num_gpus=1)
    def local_model_validation(self):
        self.local_validation_score = inference(self.model,self.test_loader)
        print(f'Doing local model validation for collaborator {self.input}: {self.local_validation_score}')
        self.next(self.join)

    @aggregator
    def join(self,inputs):
        self.average_loss = sum(input.loss for input in inputs)/len(inputs)
        self.aggregated_model_accuracy = sum(input.agg_validation_score for input in inputs)/len(inputs)
        self.local_model_accuracy = sum(input.local_validation_score for input in inputs)/len(inputs)
        print(f'Average aggregated model validation values = {self.aggregated_model_accuracy}')
        print(f'Average training loss = {self.average_loss}')
        print(f'Average local model validation values = {self.local_model_accuracy}')
        self.model = FedAvg([input.model for input in inputs])
        self.optimizer = [input.optimizer for input in inputs][0]
        self.current_round += 1
        if self.current_round < self.rounds:
            self.next(self.aggregated_model_validation, foreach='collaborators', exclude=['private'])
        else:
            self.next(self.end)
        
    @aggregator
    def end(self):
        print(f'This is the end of the flow')  

You'll notice in the `FederatedFlow` definition above that there were certain attributes that the flow was not initialized with, namely the `train_loader` and `test_loader` for each of the collaborators. These are **private_attributes** that are exposed only throught he runtime. Each participant has it's own set of private attributes: a dictionary where the key is the attribute name, and the value is the object that will be made accessible through that participant's task. 

Below, we segment shards of the MNIST dataset for **four collaborators**: Portland, Seattle, Chandler, and Portland. Each has their own slice of the dataset that's accessible via the `train_loader` or `test_loader` attribute. Note that the private attributes are flexible, and you can choose to pass in a completely different type of object to any of the collaborators or aggregator (with an arbitrary name). These private attributes will always be filtered out of the current state when transfering from collaborator to aggregator, or vice versa. 

The LocalRuntime is now initialized **without** the backend argument. Now the LocalRuntime will default to `backend='ray'`, which allows passing through the `num_gpus` argument in the placement decorator

In [None]:
# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}

# Setup collaborators with private attributes
collaborator_names = ['Portland', 'Seattle', 'Chandler','Bangalore']
collaborators = [Collaborator(name=name) for name in collaborator_names]
for idx, collaborator in enumerate(collaborators):
    local_train = deepcopy(mnist_train)
    local_test = deepcopy(mnist_test)
    local_train.data = mnist_train.data[idx::len(collaborators)]
    local_train.targets = mnist_train.targets[idx::len(collaborators)]
    local_test.data = mnist_test.data[idx::len(collaborators)]
    local_test.targets = mnist_test.targets[idx::len(collaborators)]
    collaborator.private_attributes = {
            'train_loader': torch.utils.data.DataLoader(local_train,batch_size=batch_size_train, shuffle=True),
            'test_loader': torch.utils.data.DataLoader(local_test,batch_size=batch_size_train, shuffle=True)
    }

# The following is equivalent to
# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, **backend='ray'**)
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
print(f'Local runtime collaborators = {local_runtime.collaborators}')

Now that we have our flow and runtime defined, let's run the experiment! 

(If you run this example on Google Colab with the GPU Runtime, you should see one task executing at a time.)

In [None]:
model = None
best_model = None
optimizer = None
flflow = CollaboratorGPUFlow(model,optimizer,checkpoint=True)
flflow.runtime = local_runtime
flflow.run()

Now that the flow has completed, let's get the final model and accuracy:

In [None]:
print(f'Sample of the final model weights: {flflow.model.state_dict()["conv1.weight"][0]}')

print(f'\nFinal aggregated model accuracy for {flflow.rounds} rounds of training: {flflow.aggregated_model_accuracy}')

Now that the flow is complete, let's dig into some of the information captured along the way

In [None]:
run_id = flflow._run_id

In [None]:
import metaflow

In [None]:
from metaflow import Metaflow, Flow, Task, Step

In [None]:
m = Metaflow()
list(m)

For existing users of Metaflow, you'll notice this is the same way you would examine a flow after completion. Let's look at the latest run that generated some results:

In [None]:
f = Flow('CollaboratorGPUFlow').latest_run

In [None]:
f

And its list of steps

In [None]:
list(f)

This matches the list of steps executed in the flow, so far so good...

In [None]:
s = Step(f'CollaboratorGPUFlow/{run_id}/train')

In [None]:
s

In [None]:
list(s)

Now we see **12** steps: **4** collaborators each performed **3** rounds of model training  

In [None]:
t = Task(f'FederatedFlow/{run_id}/train/9')

In [None]:
t

Now let's look at the data artifacts this task generated

In [None]:
t.data

In [None]:
t.data.input

Now let's look at its log output (stdout)

In [None]:
print(t.stdout)

And any error logs? (stderr)

In [None]:
print(t.stderr)

# Congratulations!
Now that you've completed your **workflow interface 201 tutorial**, see some of the more advanced things you can do in our [other tutorials](broken_link), including:

- Vertical Federated Learning
- Model Watermarking
- Differential Privacy
- And More!