# Federated PyTorch COVID INPUT

In [150]:
#!pip install -r requirements.txt

In [151]:
import os
import glob

from PIL import Image

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from openfl.interface.interactive_api.federation import Federation
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment
from copy import deepcopy
import torchvision
from torchvision import transforms as T
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import tqdm

myseed = 0
torch.manual_seed(myseed)
np.random.seed(myseed)

## Connect to the Federation

In [152]:
client_id = 'api'
cert_dir = 'cert'
director_node_fqdn = 'localhost'
federation = Federation(client_id=client_id, director_node_fqdn=director_node_fqdn, director_port='50051', tls=False)

In [153]:
federation.target_shape

['1', '256', '256']

In [154]:
shard_registry = federation.get_shard_registry()
shard_registry

{'env_one': {'shard_info': node_info {
    name: "env_one"
  }
  shard_description: "Images-tabular dataset, shard number 1 out of 1"
  sample_shape: "1"
  sample_shape: "256"
  sample_shape: "256"
  target_shape: "1"
  target_shape: "256"
  target_shape: "256",
  'is_online': False,
  'is_experiment_running': True,
  'last_updated': '2024-12-20 12:54:37',
  'current_time': '2024-12-20 12:58:49',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'env_three': {'shard_info': node_info {
    name: "env_three"
  }
  shard_description: "Images-tabular dataset, shard number 1 out of 1"
  sample_shape: "1"
  sample_shape: "256"
  sample_shape: "256"
  target_shape: "1"
  target_shape: "256"
  target_shape: "256",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2024-12-20 12:58:21',
  'current_time': '2024-12-20 12:58:49',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'}}

In [155]:
# First, request a dummy_shard_desc that holds information about the federated dataset 
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
dummy_shard_dataset = dummy_shard_desc.get_dataset('train')
sample, target = dummy_shard_dataset[0]
print(sample.shape)
print(target.shape)

(1, 256, 256)
(1, 256, 256)


## Creating a FL experiment using Interactive API

In [156]:
class TransformedDataset(Dataset):
    """Image Person ReID Dataset."""

    def __init__(self, dataset, transform=None, target_transform=None):
        """Initialize Dataset."""
        self.dataset = dataset
        #self.transform = transform
        #self.target_transform = target_transform

    def __len__(self):
        """Length of dataset."""
        return len(self.dataset)

    def __getitem__(self, index):
        img, tab, label = self.dataset[index]
        #label = self.target_transform(label) if self.target_transform else label
        #img = self.transform(img) if self.transform else img
        #tab = self.transform(tab) if self.transform else tab
        return img, tab, label


In [157]:
class COVIDDataset(DataInterface):
    def __init__(self, **kwargs):
        self.kwargs = kwargs
    
    @property
    def shard_descriptor(self):
        return self._shard_descriptor
        
    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor
        
        self.train_set = TransformedDataset(
            self._shard_descriptor.get_dataset('train'),
            #transform=training_transform
        )
        self.valid_set = TransformedDataset(
            self._shard_descriptor.get_dataset('val'),
            #transform=valid_transform
        )
        
    def get_train_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        generator=torch.Generator()
        generator.manual_seed(myseed)
        return DataLoader(
            self.train_set, batch_size=self.kwargs['train_bs'], shuffle=True, generator=generator
            )

    def get_valid_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        return DataLoader(self.valid_set, batch_size=self.kwargs['valid_bs'])

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)
    

In [158]:
fed_dataset = COVIDDataset(train_bs=8, valid_bs=8)

### Describe the model and optimizer

# Only Image

In [159]:

# class BasicBlock(nn.Module):
#     expansion = 1

#     def __init__(self, in_planes, planes, stride=1):
#         super(BasicBlock, self).__init__()
#         self.conv1 = nn.Conv2d(
#             in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
#         self.bn1 = nn.BatchNorm2d(planes)
        
#         self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
#                                stride=1, padding=1, bias=False)
#         self.bn2 = nn.BatchNorm2d(planes)

#         self.shortcut = nn.Sequential()
#         if stride != 1 or in_planes != self.expansion*planes:
#             self.shortcut = nn.Sequential(
#                 nn.Conv2d(in_planes, self.expansion*planes,
#                           kernel_size=1, stride=stride, bias=False),
#                 nn.BatchNorm2d(self.expansion*planes)
               
#             )

#     def forward(self, x):
#         out = F.relu(self.bn1(self.conv1(x)))
#         out = self.bn2(self.conv2(out))
#         out += self.shortcut(x)
#         out = F.relu(out)
#         return out


# class ResNet(nn.Module):
#     def __init__(self, block, num_blocks, in_channels=1, num_classes=2):
#         super(ResNet, self).__init__()
#         torch.manual_seed(myseed)
#         self.in_planes = 64

#         self.conv1 = nn.Conv2d(in_channels, 64, kernel_size=3,
#                                stride=1, padding=1, bias=False)
#         self.bn1 = nn.BatchNorm2d(64)
#         self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
#         self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
#         self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
#         self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
#         self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
#         self.linear = nn.Linear(512 * block.expansion, num_classes)

#     def _make_layer(self, block, planes, num_blocks, stride):
#         strides = [stride] + [1]*(num_blocks-1)
#         layers = []
#         for stride in strides:
#             layers.append(block(self.in_planes, planes, stride))
#             self.in_planes = planes * block.expansion
#         return nn.Sequential(*layers)

#     def forward(self, x):
#         out = F.relu(self.bn1(self.conv1(x)))
#         out = self.layer1(out)
#         out = self.layer2(out)
#         out = self.layer3(out)
#         out = self.layer4(out)
#         out = self.avgpool(out)
#         out = out.view(out.size(0), -1)
#         out = self.linear(out)
#         return out


# def ResNet18(in_channels, num_classes):
#     return ResNet(BasicBlock, [2, 2, 2, 2], in_channels=in_channels, num_classes=num_classes)

# model_img = ResNet18(1,1)
# print(model_img)


# Only Test(table)

In [160]:

class TextNN(nn.Module):

    #Constructor
    def __init__(self):
    # Call parent contructor
        super().__init__()
        torch.manual_seed(myseed)
        self.relu = nn.ReLU()
        self.ln1 = nn.Linear(23, 50) #23 sono le colonne in input
        self.ln2 = nn.Linear(50, 50)
        self.ln3 = nn.Linear(50, 10)
        self.ln4 = nn.Linear(10, 1)
    
    def forward(self, tab):
        tab = self.ln1(tab)
        tab = self.relu(tab)
        tab = self.ln2(tab)
        tab = self.relu(tab)
        tab = self.ln3(tab)
        tab = self.relu(tab)
        tab = self.ln4(tab)

        return tab

model_text = TextNN()
print(model_text)


TextNN(
  (relu): ReLU()
  (ln1): Linear(in_features=23, out_features=50, bias=True)
  (ln2): Linear(in_features=50, out_features=50, bias=True)
  (ln3): Linear(in_features=50, out_features=10, bias=True)
  (ln4): Linear(in_features=10, out_features=1, bias=True)
)


# MULTI INPUT

In [161]:

# class BasicBlock(nn.Module):
#     expansion = 1

#     def __init__(self, in_planes, planes, stride=1):
#         super(BasicBlock, self).__init__()
#         self.conv1 = nn.Conv2d(
#             in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
#         self.bn1 = nn.BatchNorm2d(planes)
        
#         self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
#                                stride=1, padding=1, bias=False)
#         self.bn2 = nn.BatchNorm2d(planes)

#         self.shortcut = nn.Sequential()
#         if stride != 1 or in_planes != self.expansion*planes:
#             self.shortcut = nn.Sequential(
#                 nn.Conv2d(in_planes, self.expansion*planes,
#                           kernel_size=1, stride=stride, bias=False),
#                 nn.BatchNorm2d(self.expansion*planes)
               
#             )

#     def forward(self, x):
#         out = F.relu(self.bn1(self.conv1(x)))
#         out = self.bn2(self.conv2(out))
#         out += self.shortcut(x)
#         out = F.relu(out)
#         return out


# class ResNet(nn.Module):
#     def __init__(self, block, num_blocks, in_channels=1, num_classes=2):
#         super(ResNet, self).__init__()
#         torch.manual_seed(myseed)
#         self.in_planes = 64

#         self.conv1 = nn.Conv2d(in_channels, 64, kernel_size=3,
#                                stride=1, padding=1, bias=False)
#         self.bn1 = nn.BatchNorm2d(64)
#         self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
#         self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
#         self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
#         self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
#         self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
#         self.linear = nn.Linear(512 * block.expansion, 10) #qui ho cambiato da num_classes a 10, per far cat con testo
#         #TESTO
#         self.relu = nn.ReLU()
#         self.ln1 = nn.Linear(23, 50) #23 sono le colonne in input
#         self.ln2 = nn.Linear(50, 50)
#         self.ln3 = nn.Linear(50, 10)
#         self.ln4 = nn.Linear(20, 1) #20 perchè 10 derivano da img e 10 da tab

#     def _make_layer(self, block, planes, num_blocks, stride):
#         strides = [stride] + [1]*(num_blocks-1)
#         layers = []
#         for stride in strides:
#             layers.append(block(self.in_planes, planes, stride))
#             self.in_planes = planes * block.expansion
#         return nn.Sequential(*layers)

#     def forward(self, x, tab):
#         out = F.relu(self.bn1(self.conv1(x)))
#         out = self.layer1(out)
#         out = self.layer2(out)
#         out = self.layer3(out)
#         out = self.layer4(out)
#         out = self.avgpool(out)
#         out = out.view(out.size(0), -1)
#         out = self.linear(out)
#         out = self.relu(out)
        
#         tab = self.ln1(tab)
#         tab = self.relu(tab)
#         tab = self.ln2(tab)
#         tab = self.relu(tab)
#         tab = self.ln3(tab)
#         tab = self.relu(tab)
        
#         out = torch.cat((out, tab), dim=1)
#         out = self.relu(out)
        
#         out = self.ln4(out)
        
#         return out
    

# def ResNet18(in_channels, num_classes):
#     return ResNet(BasicBlock, [2, 2, 2, 2], in_channels=in_channels, num_classes=num_classes)

# model_multi = ResNet18(1,1)
# print(model_multi)


In [162]:
# model_net = model_img
model_net = model_text
#model_net = model_multi

In [163]:
model_net

TextNN(
  (relu): ReLU()
  (ln1): Linear(in_features=23, out_features=50, bias=True)
  (ln2): Linear(in_features=50, out_features=50, bias=True)
  (ln3): Linear(in_features=50, out_features=10, bias=True)
  (ln4): Linear(in_features=10, out_features=1, bias=True)
)

In [164]:
list(model_net.parameters())

[Parameter containing:
 tensor([[-0.0016,  0.1119, -0.1716,  ...,  0.0757,  0.1731, -0.0429],
         [ 0.1560, -0.0336,  0.0221,  ..., -0.1947, -0.1507, -0.1075],
         [ 0.1316,  0.1223, -0.0925,  ...,  0.0080,  0.0483,  0.1294],
         ...,
         [-0.1528,  0.0760, -0.0233,  ..., -0.0417,  0.1644,  0.1539],
         [-0.1413,  0.0969, -0.1636,  ..., -0.1966, -0.0293, -0.1346],
         [-0.1445,  0.0820, -0.1605,  ..., -0.1614,  0.1036, -0.1783]],
        requires_grad=True),
 Parameter containing:
 tensor([ 0.1282, -0.1974, -0.1753,  0.0031,  0.1643, -0.1786, -0.0930,  0.1831,
         -0.0350,  0.0864, -0.2006, -0.0656, -0.1059,  0.1758,  0.0865, -0.1521,
          0.0336, -0.0024, -0.0857, -0.1191, -0.0692,  0.1644, -0.1495, -0.1485,
         -0.0643,  0.0036,  0.0568,  0.0063, -0.0502, -0.1264, -0.0374, -0.0839,
          0.1685, -0.2030, -0.1330, -0.1696, -0.0381,  0.1625,  0.1828, -0.1815,
         -0.0090,  0.1224,  0.0919,  0.1708, -0.0058, -0.1401,  0.1379, -0.0715

In [165]:
params_to_update = []
for param in model_net.parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
'''
FEDPROX
'''        
#from openfl.utilities.optimizers.torch import FedProxAdam        
#optimizer = FedProxAdam(params_to_update, lr=1e-4, mu=0.01)

'''
ORIGINALE
'''
optimizer = optim.Adam(params_to_update, lr = 0.0001)
#optimizer = optim.SGD(params_to_update, lr=1e-4)

#Define a criterion
criterion = nn.BCEWithLogitsLoss()

def cross_entropy(output, target):
    """Cross-entropy metric
    """
    #return F.cross_entropy(input=output,target=target)
    #return F.binary_cross_entropy_with_logits(input=output,target=target)
    criterion = nn.BCEWithLogitsLoss()
    loss = criterion(output, target)
    return loss

### Register model

In [166]:
framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
model_interface = ModelInterface(model=model_net, optimizer=optimizer, framework_plugin=framework_adapter)

# Save the initial model state
initial_model = deepcopy(model_net)

## Define and register FL tasks

In [167]:
task_interface = TaskInterface()


# The Interactive API supports registering functions definied in main module or imported.
def function_defined_in_notebook(some_parameter):
    print(f'Also I accept a parameter and it is {some_parameter}')

# Task interface currently supports only standalone functions.
@task_interface.add_kwargs(**{'some_parameter': 42})
@task_interface.register_fl_task(model='net_model', data_loader='train_loader', \
                     device='device', optimizer='optimizer') 
#@task_interface.set_aggregation_function(FedCurvWeightedAverage())


def train(net_model, train_loader, optimizer, device, loss_fn=cross_entropy, some_parameter=None):
    torch.manual_seed(myseed)
    #fedcurv.on_train_begin(net_model)
    device='cpu'
    function_defined_in_notebook(some_parameter)
    
    train_loader = tqdm.tqdm(train_loader, desc="train")
    net_model.train()
    net_model.to(device)

    losses = []
    epochs = 5
    
    for epoch in range(epochs):
        for img, tabular, target in train_loader:
            img, tabular, target = torch.tensor(img).to(device), torch.tensor(tabular).to(device), torch.tensor(target).to(device, dtype=torch.int64)
            optimizer.zero_grad()
            #output = torch.flatten(net_model(img, tabular)) #multi input
            output = torch.flatten(net_model(img)) #solo immagini
            #output = torch.flatten(net_model(tabular)) #solo tabular
            output = output.float()
            loss = criterion(output, target.float())
            loss.backward()
            optimizer.step()
            losses.append(loss.detach().cpu().numpy())
    return {'train_loss': np.mean(losses),}


@task_interface.register_fl_task(model='net_model', data_loader='val_loader', device='device')   
  
def validate(net_model, val_loader, device):
    torch.manual_seed(myseed)
    device = torch.device('cpu')
    net_model.eval()
    net_model.to(device)
    losses = []
    
    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with torch.no_grad():
        for img, tabular, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            img, tabular, target = torch.tensor(img).to(device), torch.tensor(tabular).to(device), torch.tensor(target).to(device, dtype=torch.int64)
            #output = torch.flatten(net_model(img, tabular)) #multi input
            output = torch.flatten(net_model(img)) #solo immagini
            #output = torch.flatten(net_model(tabular)) #solo tabular
            output = (output >= 0.0).float() #binarize predictions
            loss = criterion(output, target.float())
            losses.append(loss.detach().cpu().numpy())
            #da wine
            #_, preds = torch.max(outputs, dim=1)
            #return torch.tensor(torch.sum(preds == labels).item() / len(preds))
            
            #originale
            #pred = output.argmax(dim=1,keepdim=True)
            
            #tentativo
            #_, pred = torch.max(output, dim=1)
            #val_score += pred.eq(target).sum().cpu().numpy()
            val_score += output.eq(target).sum().cpu().numpy()
            
    return {'acc': val_score / total_samples,}

SyntaxError: invalid syntax (3009034632.py, line 64)

## Time to start a federated learning experiment

In [20]:
# create an experimnet in federation
experiment_name = 'federated_covid_Multi_Input_6hospitals'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

In [21]:
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(
    model_provider=model_interface, 
    task_keeper=task_interface,
    data_loader=fed_dataset,
    rounds_to_train=5,
    opt_treatment='CONTINUE_GLOBAL'
)



In [None]:
# If user want to stop IPython session, then reconnect and check how experiment is going
# fl_experiment.restore_experiment_state(model_interface)

fl_experiment.stream_metrics(tensorboard_logs=True)