# Federated PyTorch MNIST Tutorial

In [None]:
#Install dependencies if not already installed
!pip install torch torchvision

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
import openfl.native as fx
from openfl.federated import FederatedModel,FederatedDataSet


  from .autonotebook import tqdm as notebook_tqdm


After importing the required packages, the next step is setting up our openfl workspace. To do this, simply run the `fx.init()` command as follows:

In [None]:
#Setup default workspace, logging, etc.
# fx.init('torch_cnn_mnist')
fx.init('torch_cnn_mnist', log_level='METRIC', log_file='./spam_metric.log')

Now we are ready to define our dataset and model to perform federated learning on. The dataset should be composed of a numpy arrayWe start with a simple fully connected model that is trained on the MNIST dataset. 

In [170]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

In [171]:
from torch.utils.data import DataLoader

train_set = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
# trainloader = DataLoader(train_set, shuffle=True, num_workers=8)

print(len(train_set))

Files already downloaded and verified
50000


In [172]:
# train_data = [] #np.empty(len(train_set))
# train_labels = [] #np.empty(len(train_set))
# for i, data in enumerate(trainloader, 0):
#     inputs, labels = data
# #     print(len(inputs), len(labels))
# #     train_data[i] = inputs
# #     train_labels[i] = labels
#     train_data.append(inputs)
#     train_labels.append(labels)
train_data =  train_set.data # next(iter(trainloader))[0].numpy()
train_labels = np.array(train_set.targets)

train_data = torch.from_numpy(np.expand_dims(train_data, axis=1)).float()
# train_data = torch.from_numpy(train_data).float()
# train_labels = np.array(train_set.targets)

print(len(train_data), len(train_labels))
# print(train_set.data[0])
print(type(train_data))

50000 50000
<class 'torch.Tensor'>


In [173]:
train_data

tensor([[[[[ 59.,  62.,  63.],
           [ 43.,  46.,  45.],
           [ 50.,  48.,  43.],
           ...,
           [158., 132., 108.],
           [152., 125., 102.],
           [148., 124., 103.]],

          [[ 16.,  20.,  20.],
           [  0.,   0.,   0.],
           [ 18.,   8.,   0.],
           ...,
           [123.,  88.,  55.],
           [119.,  83.,  50.],
           [122.,  87.,  57.]],

          [[ 25.,  24.,  21.],
           [ 16.,   7.,   0.],
           [ 49.,  27.,   8.],
           ...,
           [118.,  84.,  50.],
           [120.,  84.,  50.],
           [109.,  73.,  42.]],

          ...,

          [[208., 170.,  96.],
           [201., 153.,  34.],
           [198., 161.,  26.],
           ...,
           [160., 133.,  70.],
           [ 56.,  31.,   7.],
           [ 53.,  34.,  20.]],

          [[180., 139.,  96.],
           [173., 123.,  42.],
           [186., 144.,  30.],
           ...,
           [184., 148.,  94.],
           [ 97.,  62.,  34.

In [174]:
test_set = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
# testloader = DataLoader(test_set, shuffle=False, num_workers=8)

print(len(test_set))

Files already downloaded and verified
10000


In [175]:
# test_data = []
# test_labels = []
# for inputs, labels in trainloader:
#     test_data.append(inputs)
#     test_labels.append(labels)
# test_data = torch.from_numpy(np.expand_dims(test_data, axis=1)).float()
# test_labels = np.array(test_labels)

def one_hot(labels, classes):
    return np.eye(classes)[labels]

# test_data = torch.from_numpy(np.expand_dims(test_set.data, axis=1)).float()
# test_labels = one_hot(np.array(test_set.targets), len(test_set.classes))

test_data = test_set.data # next(iter(testloader))[0].numpy() 
test_data = torch.from_numpy(np.expand_dims(test_data, axis=1)).float()
# test_data = torch.from_numpy(test_data).float()
test_labels = np.array(test_set.targets)

print(len(test_data), len(test_labels), len(test_set.classes))
# print(train_set.targets)

10000 10000 10


In [176]:
num_classes = len(train_set.classes)
print(num_classes)

10


In [177]:
batch_size = 32
fl_data = FederatedDataSet(train_data, train_labels, test_data, test_labels, batch_size=batch_size, num_classes=num_classes)

class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
# optimizer = lambda x: optim.Adam(x, lr=1e-4)
optimizer = lambda x: optim.SGD(x, lr=0.001, momentum=0.9)
# optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

def cross_entropy(output, target):
    """Binary cross-entropy metric
    """
    # criterion = nn.CrossEntropyLoss()
    # loss = criterion(output, target)
    # return loss
    return F.cross_entropy(input=output,target=target)

In [178]:
batch_size = 32
fl_data = FederatedDataSet(train_data, train_labels, test_data, test_labels, batch_size=batch_size, num_classes=num_classes)

class Net(nn.Module):
    def __init__(self) -> None:
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
# optimizer = lambda x: optim.Adam(x, lr=1e-4)
optimizer = lambda x: optim.SGD(x, lr=0.001, momentum=0.9)
# optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

def cross_entropy(output, target):
    """Binary cross-entropy metric
    """
    # criterion = nn.CrossEntropyLoss()
    # loss = criterion(output, target)
    # return loss
    return F.cross_entropy(input=output,target=target)

Here we can define metric logging function. It should has the following signature described below. You can use it to write metrics to tensorboard or some another specific logging.

In [179]:
# from torch.utils.tensorboard import SummaryWriter
import time

# writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5)
log = open("./logs/cifar_openfl.log", "w")

def write_metric(node_name, task_name, metric_name, metric, round_number):
    log.write("{}/{}/{}: {}, {}, {}\n".format(node_name, task_name, metric_name, metric, round_number, time.time()))
    # writer.add_scalar("{}/{}/{}".format(node_name, task_name, metric_name),
    #                   metric, round_number)

In [180]:

#Create a federated model using the pytorch class, lambda optimizer function, and loss function
fl_model = FederatedModel(build_model=Net,optimizer=optimizer,loss_fn=cross_entropy,data_loader=fl_data)

In [181]:
collaborator_models = fl_model.setup(num_collaborators=8)
collaborators = {'1':collaborator_models[0],
                 '2':collaborator_models[1],
                 '3':collaborator_models[2],
                 '4':collaborator_models[3],
                 '5':collaborator_models[4],
                 '6':collaborator_models[5],
                 '7':collaborator_models[6],
                 '8':collaborator_models[7]}

In [182]:
#Original MNIST dataset
print(f'Original training data size: {len(train_data)}')
print(f'Original validation data size: {len(test_data)}\n')

for i, model in enumerate(collaborator_models):
    print(f'Collaborator {i}\'s training data size: {len(model.data_loader.X_train)}')
    print(f'Collaborator {i}\'s validation data size: {len(model.data_loader.X_valid)}\n')

#Collaborator one's data
# print(f'Collaborator one\'s training data size: {len(collaborator_models[0].data_loader.X_train)}')
# print(f'Collaborator one\'s validation data size: {len(collaborator_models[0].data_loader.X_valid)}\n')

#Collaborator two's data
# print(f'Collaborator two\'s training data size: {len(collaborator_models[1].data_loader.X_train)}')
# print(f'Collaborator two\'s validation data size: {len(collaborator_models[1].data_loader.X_valid)}\n')

#Collaborator three's data
#print(f'Collaborator three\'s training data size: {len(collaborator_models[2].data_loader.X_train)}')
#print(f'Collaborator three\'s validation data size: {len(collaborator_models[2].data_loader.X_valid)}')

Original training data size: 50000
Original validation data size: 10000

Collaborator 0's training data size: 6250
Collaborator 0's validation data size: 1250

Collaborator 1's training data size: 6250
Collaborator 1's validation data size: 1250

Collaborator 2's training data size: 6250
Collaborator 2's validation data size: 1250

Collaborator 3's training data size: 6250
Collaborator 3's validation data size: 1250

Collaborator 4's training data size: 6250
Collaborator 4's validation data size: 1250

Collaborator 5's training data size: 6250
Collaborator 5's validation data size: 1250

Collaborator 6's training data size: 6250
Collaborator 6's validation data size: 1250

Collaborator 7's training data size: 6250
Collaborator 7's validation data size: 1250



We can see the current plan values by running the `fx.get_plan()` function

In [183]:
 #Get the current values of the plan. Each of these can be overridden
print(fx.get_plan())

{
    "aggregator.settings.best_state_path": "save/torch_cnn_mnist_best.pbuf",
    "aggregator.settings.db_store_rounds": 2,
    "aggregator.settings.init_state_path": "save/torch_cnn_mnist_init.pbuf",
    "aggregator.settings.last_state_path": "save/torch_cnn_mnist_last.pbuf",
    "aggregator.settings.log_metric_callback": {
        "template": "src.mnist_utils.write_metric"
    },
    "aggregator.settings.rounds_to_train": 10,
    "aggregator.settings.write_logs": false,
    "aggregator.template": "openfl.component.Aggregator",
    "assigner.settings.task_groups": [
        {
            "name": "train_and_validate",
            "percentage": 1.0,
            "tasks": [
                "aggregated_model_validation",
                "train",
                "locally_tuned_model_validation"
            ]
        }
    ],
    "assigner.template": "openfl.component.RandomGroupedAssigner",
    "collaborator.settings.db_store_rounds": 1,
    "collaborator.settings.delta_updates": false,
  

Now we are ready to run our experiment. If we want to pass in custom plan settings, we can easily do that with the `override_config` parameter

In [184]:
# Run experiment, return trained FederatedModel

final_fl_model = fx.run_experiment(collaborators, override_config={
    'aggregator.settings.rounds_to_train': 100,
    'aggregator.settings.log_metric_callback': write_metric,
})

  data, target = pt.tensor(data).to(self.device), pt.tensor(


RuntimeError: Expected 4-dimensional input for 4-dimensional weight [6, 3, 5, 5], but got 5-dimensional input of size [32, 1, 32, 32, 3] instead

In [11]:
#Save final model
final_fl_model.save_native('final_cifar_model')

In [None]:
log.close()