[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/interactive_api/numpy_linear_regression/workspace/SingleNotebook.ipynb)

### Preparations in colab:
We need to clone the repository to run a federation because it contains director and envoy configs to start from.

1. Clone the OpenFL repository
2. Install OpenFL 
3. Go to the linreg workspace

In [1]:
# For right now, install from source, later we would migrate to PyPI install
# !pip install openfl==1.2.1
import shutil
# shutil.rmtree('./openfl')
!git clone https://github.com/walteriviera/openfl.git
!cd openfl && git checkout develop && pip install . --upgrade pip
!pip install medmnist

Cloning into 'openfl'...
remote: Enumerating objects: 4069, done.[K
remote: Total 4069 (delta 0), reused 0 (delta 0), pack-reused 4069[K
Receiving objects: 100% (4069/4069), 3.18 MiB | 25.08 MiB/s, done.
Resolving deltas: 100% (2363/2363), done.
Branch 'develop' set up to track remote branch 'develop' from 'origin'.
Switched to a new branch 'develop'
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Processing /content/openfl
[33m  DEPRECATION: A future pip version will change local packages to be built in-place without first copying to a temporary directory. We recommend you use --use-feature=in-tree-build to test your packages with this new behavior before it becomes the default.
   pip 21.3 will remove support for this functionality. You can find discussion regarding this at https://github.com/pypa/pip/issues/7555.[0m
Collecting pip
  Downloading pip-22.1.2-py3-none-any.whl (2.1 MB)
[K     |████████████████████████████████| 2.1 M

In [2]:
import os
from time import sleep
os.chdir('openfl/openfl-tutorials/interactive_api/PyTorch_MedMNIST_2D/workspace')

In [3]:
# Install dependencies if not already installed
from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torch.nn.functional as F

import medmnist

from medmnist import INFO, Evaluator

# Describing the model and optimizer

In [4]:
num_epochs = 3
TRAIN_BS, VALID_BS = 64, 128

lr = 0.001
gamma=0.1
milestones = [0.5 * num_epochs, 0.75 * num_epochs]
data_flag = 'bloodmnist'

info = INFO[data_flag]
task = info['task']
n_channels = info['n_channels']
n_classes = len(info['label'])

In [5]:
# define a simple CNN model

class Net(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(Net, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 16, kernel_size=3),
            nn.BatchNorm2d(16),
            nn.ReLU())

        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer3 = nn.Sequential(
            nn.Conv2d(16, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU())
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU())

        self.layer5 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc = nn.Sequential(
            nn.Linear(64 * 4 * 4, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes))

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

model = Net(in_channels=n_channels, num_classes=n_classes)
    
# define loss function and optimizer
if task == "multi-label, binary-class":
    criterion = nn.BCEWithLogitsLoss()
else:
    criterion = nn.CrossEntropyLoss()
    
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

In [6]:
# from torchvision import models
print(model)

Net(
  (layer1): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer2): Sequential(
    (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer4): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (layer5): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=

# Now we run the same training on federated data

## 1. Start the Director service and several envoys with generated data

In [7]:
# Here are the main parameters for our Federation
n_cols = 3
datapath = 'data/.' 
dataname = 'bloodmnist'

In [8]:
import os
from pathlib import Path
import yaml
from typing import Dict, List, Union

### Start the Director service

In [9]:
cwd = Path.cwd()
director_workspace_path = Path('../director/').absolute()
director_config_file = director_workspace_path / 'director_config.yaml'
director_logfile = director_workspace_path / 'director.log'
if director_logfile.is_file(): director_logfile.unlink()

os.environ['main_folder'] = str(cwd)
os.environ['director_workspace_path'] = str(director_workspace_path)
os.environ['director_logfile'] = str(director_logfile)
os.environ['director_config_file'] = str(director_config_file)

In [10]:
%%script /bin/bash --bg
cd $director_workspace_path
fx director start --disable-tls -c $director_config_file > $director_logfile &
cd $main_folder

Starting job # 0 in a separate thread.


## Start Envoys

#### First, we create several envoy config files 

In [12]:
# Read the original envoy config file content
with open(Path('../envoy/envoy_config.yaml'), "r") as stream:
    orig_config = yaml.safe_load(stream)

def generate_envoy_configs(config: Dict,
                           save_path: Union[str, Path] = '../envoy/',
                           n_cols: int = 2,
                           datapath: str = '',
                           dataname: str = 'bloodmnist') -> List[Path]:

    config['shard_descriptor']['params']['datapath'] = datapath
    config['shard_descriptor']['params']['dataname'] = dataname
        
    config_paths = [(Path(save_path) / f'{i}_envoy_config.yaml').absolute()
                for i in range(1, n_cols + 1)]

    for i, path in enumerate(config_paths):
        config['shard_descriptor']['params']['rank_worldsize'] = str(i+1) + ", " + str(n_cols)
        with open(path, "w") as stream:
            yaml.safe_dump(config, stream)
            
    return config_paths
            
def remove_configs(config_paths):
    for path in config_paths:
        path.unlink()

In [13]:
config_paths = generate_envoy_configs(orig_config,
                                      n_cols=n_cols,
                                      datapath=datapath,
                                      dataname=dataname)

#### Now start Envoy processes in a loop

In [14]:
# envoy_workspace_path = Path('../envoy/').absolute()
def start_envoys(config_paths: List[Path]) -> None:
    envoy_workspace_path = config_paths[0].parent
    cwd = Path.cwd()
    os.chdir(envoy_workspace_path)
    for i, path in enumerate(config_paths):
        os.system(f'fx envoy start -n env_{i + 1} --disable-tls '
                  f'--envoy-config-path {path} -dh localhost -dp 50051 '
                  f'>env_{i + 1}.log &')
    os.chdir(cwd)

sleep(5)

start_envoys(config_paths)

sleep(25)

## 2. Connect to the Director service of out Federation as Data scientist

In [15]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'frontend'
director_node_fqdn = 'localhost'
director_port=50051

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(
    client_id=client_id,
    director_node_fqdn=director_node_fqdn,
    director_port=director_port, 
    tls=False)

In [16]:
# Data scientist may request a list of connected envoys
shard_registry = federation.get_shard_registry()

# WARNING!

# Make sure shard registry contains all the envoys you started!
# In other case try rereconnecting to the Director (the cell above).
shard_registry

{'env_1': {'current_time': '2022-07-05 08:28:45',
  'experiment_name': 'ExperimentName Mock',
  'is_experiment_running': False,
  'is_online': True,
  'last_updated': '2022-07-05 08:27:49',
  'shard_info': node_info {
    name: "env_1"
  }
  shard_description: "MedMNIST dataset, shard number 1 out of 3"
  sample_shape: "28"
  sample_shape: "28"
  sample_shape: "3"
  target_shape: "1"
  target_shape: "1",
  'valid_duration': seconds: 120},
 'env_2': {'current_time': '2022-07-05 08:28:45',
  'experiment_name': 'ExperimentName Mock',
  'is_experiment_running': False,
  'is_online': True,
  'last_updated': '2022-07-05 08:28:17',
  'shard_info': node_info {
    name: "env_2"
  }
  shard_description: "MedMNIST dataset, shard number 2 out of 3"
  sample_shape: "28"
  sample_shape: "28"
  sample_shape: "3"
  target_shape: "1"
  target_shape: "1",
  'valid_duration': seconds: 120},
 'env_3': {'current_time': '2022-07-05 08:28:45',
  'experiment_name': 'ExperimentName Mock',
  'is_experiment_run

### Now we will prepare an FL experiment using OpenFL Python API

*   List item
*   List item



### Data

In [17]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment
from PIL import Image

data_transform = T.Compose([T.ToTensor(), 
                            T.Normalize(mean=[.5], std=[.5])]
                 )

In [18]:
class TransformedDataset(Dataset):
    """Image Person ReID Dataset."""


    def __init__(self, dataset, transform=None, target_transform=None):
        """Initialize Dataset."""
        self.dataset = dataset
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        """Length of dataset."""
        return len(self.dataset)

    def __getitem__(self, index):
                
        img, label = self.dataset[index]
        
        if self.target_transform:
            label = self.target_transform(label)  
        else:
            label = label.astype(int)
        
        if self.transform:
            img = Image.fromarray(img)
            img = self.transform(img)
        else:
            base_transform = T.PILToTensor()
            img = Image.fromarray(img)
            img = base_transform(img)  

        return img, label


In [19]:
class MedMnistFedDataset(DataInterface):
    def __init__(self, **kwargs):
        self.kwargs = kwargs
        
    @property
    def shard_descriptor(self):
        return self._shard_descriptor
        
    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor

        self.train_set = TransformedDataset(
            self._shard_descriptor.get_dataset('train'),
            transform=data_transform
        )       
        
        self.valid_set = TransformedDataset(
            self._shard_descriptor.get_dataset('val'),
            transform=data_transform
        )
        
    def get_train_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        return DataLoader(
            self.train_set, num_workers=8, batch_size=self.kwargs['train_bs'], shuffle=True)

    def get_valid_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        return DataLoader(self.valid_set, num_workers=8, batch_size=self.kwargs['valid_bs'])

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)


In [20]:
fed_dataset = MedMnistFedDataset(train_bs=TRAIN_BS, valid_bs=VALID_BS)

### Model

In [21]:
from copy import deepcopy

framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
MI = ModelInterface(model=model, optimizer=optimizer, framework_plugin=framework_adapter)

# Save the initial model state
initial_model = deepcopy(model)

### Tasks
Using an Optimizer does not make sense for this experiment. Yet it is a required part of a training task contract in the current version of OpenFL, so we just pass None.
We need to employ a trick reporting metrics. OpenFL decides which model is the best based on an *increasing* metric.

In [22]:
TI = TaskInterface()

import torch
import tqdm

train_custom_params={'criterion':criterion,'task':task}

# Task interface currently supports only standalone functions.
@TI.add_kwargs(**train_custom_params)
@TI.register_fl_task(model='model', data_loader='train_loader',
                     device='device', optimizer='optimizer')
def train(model, train_loader, device, optimizer, criterion, task):
    total_loss = []
    
    train_loader = tqdm.tqdm(train_loader, desc="train")
    model.train()
    model.to(device)
    
    for inputs, targets in train_loader:
    
        optimizer.zero_grad()
        outputs = model(inputs.to(device))
        
        if task == 'multi-label, binary-class':
            targets = targets.to(torch.float32).to(device)
            loss = criterion(outputs, targets)
        else:
            targets = torch.squeeze(targets, 1).long().to(device)
            loss = criterion(outputs, targets)
        
        total_loss.append(loss.item())
        
        loss.backward()
        optimizer.step()

    return {'train_loss': np.mean(total_loss),}



val_custom_params={'criterion':criterion, 
                   'run':'model1',
                   'task':task}

@TI.add_kwargs(**val_custom_params)
@TI.register_fl_task(model='model', data_loader='val_loader', device='device')
def validate(model, val_loader, device, criterion, run, task):

    val_loader = tqdm.tqdm(val_loader, desc="validate")
    model.eval()
    model.to(device)

    val_score = 0
    total_samples = 0
    total_loss = []
    y_score = torch.tensor([]).to(device)

    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs.to(device))
            
            if task == 'multi-label, binary-class':
                targets = targets.to(torch.float32).to(device)
                loss = criterion(outputs, targets)
                m = nn.Sigmoid()
                outputs = m(outputs).to(device)
            else:
                targets = torch.squeeze(targets, 1).long().to(device)
                loss = criterion(outputs, targets)
                m = nn.Softmax(dim=1)
                outputs = m(outputs).to(device)
                targets = targets.float().resize_(len(targets), 1)

            total_loss.append(loss.item())
            
            total_samples += targets.shape[0]
            pred = outputs.argmax(dim=1)
            val_score += pred.eq(targets).sum().cpu().numpy()
        
        acc = val_score / total_samples        
        test_loss = sum(total_loss) / len(total_loss)

        return {'acc': acc,
                'test_loss': test_loss,
                }


### Run

In [23]:
experiment_name = 'medmnist_exp'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

In [24]:
fl_experiment.start(model_provider=MI, 
                    task_keeper=TI,
                    data_loader=fed_dataset,
                    rounds_to_train=3,
                    device_assignment_policy='CUDA_PREFERRED')

In [None]:
# This method not only prints messages recieved from the director, 
# but also saves logs in the tensorboard format (by default)
fl_experiment.stream_metrics()

## Cleaning

In [None]:
# To stop all services run
!pkill fx

In [None]:
remove_configs(config_paths)