# EE 599 HW 6: Distributed ML

Your task in this Colab notebook is to fill out the sections that are specified by **TODO** (please search the keyword `TODO` to make sure you do not miss any).

Prerequisites: set the runtime type to GPU. (Runtime -> Change Runtime Type)

## Initial Setup

Prepare utilization functions and save as local files.

* `set_random_seed`: Set random seed for reproducibility.
* `load_data`: Load CIFAR-10 dataset and apply transformations.
* `create_dataloader`: Create data loaders for training and testing.
* `Net`: Define a simple convolution neural network architecture.
* `data_split`: Split the dataset into multiple subsets for each client.
* `plot_data_split`: Plot the data distribution for each client.

In [1]:
%%writefile utils.py

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import matplotlib


def set_random_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)


def load_data():
    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
    )

    train_data = torchvision.datasets.CIFAR10(
        root="./", train=True, download=True, transform=transform
    )

    test_data = torchvision.datasets.CIFAR10(
        root="./", train=False, download=True, transform=transform
    )

    return train_data, test_data


def create_dataloader(train_data, test_data, batch_size):
    train_loader = torch.utils.data.DataLoader(
        dataset=train_data, batch_size=batch_size, shuffle=True
    )
    test_loader = torch.utils.data.DataLoader(
        dataset=test_data, batch_size=batch_size, shuffle=False
    )
    return train_loader, test_loader


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


def data_split(dataset, num_clients, split_method, alpha=0.1):
    if split_method == "iid":
        data_split = torch.utils.data.random_split(
            dataset, [len(dataset) // num_clients] * num_clients
        )

    elif split_method == "non-iid":
        min_size = 0
        num_classes = len(dataset.classes)

        while min_size < 10:
            idx_batch = [[] for _ in range(num_clients)]
            # for each class in the dataset
            for k in range(num_classes):
                idx_k = np.where(np.array(dataset.targets) == k)[0]
                np.random.shuffle(idx_k)
                proportions = np.random.dirichlet(np.repeat(alpha, num_clients))
                ## Balance
                proportions = np.array(
                    [
                        p * (len(idx_j) < len(dataset) / num_clients)
                        for p, idx_j in zip(proportions, idx_batch)
                    ]
                )

                proportions = proportions / proportions.sum()
                proportions = (np.cumsum(proportions) * len(idx_k)).astype(int)[:-1]
                idx_batch = [
                    idx_j + idx.tolist()
                    for idx_j, idx in zip(idx_batch, np.split(idx_k, proportions))
                ]
                min_size = min([len(idx_j) for idx_j in idx_batch])

        data_split = [
            torch.utils.data.Subset(dataset, idx_batch[i]) for i in range(num_clients)
        ]

    return data_split


def plot_data_split(data_split, num_clients, num_classes, file_name):
    data_per_client = []
    for i in range(num_clients):
        data_per_class = [0] * num_classes
        for idx in data_split[i].indices:
            label = data_split[i].dataset.targets[idx]
            data_per_class[label] += 1
        data_per_client.append(data_per_class)

    x = [f"client {i}" for i in range(1, num_clients + 1)]

    y = np.array(data_per_client)
    y_t = y.transpose()

    cmap = matplotlib.colormaps["tab10"]
    colors = cmap(np.arange(10))

    for i in range(10):
        plt.bar(
            x,
            y_t[i],
            bottom=np.sum(y_t[:i], axis=0),
            color=colors[i],
            label=f"Class {i}",
        )

    # put legent outside
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left", borderaxespad=0.0)
    plt.subplots_adjust(right=0.8)

    plt.ylabel("Number of samples")
    plt.title("Data distribution")
    plt.savefig(f"{file_name}")

Writing utils.py


## Centralized SGD Training

Recall that the function `loss.backward()` computes gradients for each of the parameters in the model based on the forward path, `optimizer.step()` applies the gradients to update the parameters, and `optimizer.zero_grad()` clears the gradients.

In [2]:
import torch
import logging

from utils import load_data, create_dataloader, set_random_seed, Net


if __name__ == "__main__":
    # fix randomness
    set_random_seed(42)

    logging.basicConfig(level=logging.INFO)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device {device}")

    # define model and move to device
    model = Net()
    model.to(device)

    # define optimizer and loss function
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    num_epochs = 3
    batch_size = 100

    # load cifar10 dataset
    train_data, test_data = load_data()
    train_loader, test_loader = create_dataloader(
        train_data, test_data, batch_size=batch_size
    )

    for epoch_idx in range(num_epochs):
        model.train()
        running_loss = 0

        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.to(device)
            target = target.to(device)

            # compute loss
            output = model(data)
            loss = criterion(output, target)

            # compute gradients
            loss.backward()

            # update parameters
            optimizer.step()

            # clear gradients
            optimizer.zero_grad()

            # print statistics
            running_loss += loss.item()
            if (batch_idx + 1) % 200 == 0:
                print(
                    f"Epoch: {epoch_idx} Batch: {batch_idx+1}/{len(train_loader)} | Loss: {running_loss/200:.4f}"
                )
                running_loss = 0

    # evaluate model on testset after training
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for data, target in test_loader:
            data = data.to(device)
            target = target.to(device)

            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

        accuracy = correct / total
        print(f"Test Accuracy: {accuracy:.4f}") # modified "logging info" to "print" so that the message can appear

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:02<00:00, 65404312.06it/s]


Extracting ./cifar-10-python.tar.gz to ./
Files already downloaded and verified
Epoch: 0 Batch: 200/500 | Loss: 2.2050
Epoch: 0 Batch: 400/500 | Loss: 1.7740
Epoch: 1 Batch: 200/500 | Loss: 1.5161
Epoch: 1 Batch: 400/500 | Loss: 1.4392
Epoch: 2 Batch: 200/500 | Loss: 1.3234
Epoch: 2 Batch: 400/500 | Loss: 1.2929
Test Accuracy: 0.5565


## Install mpi4py package

MPI, or Message Passing Interface, is a standardized and portable message-passing system designed to enable processes to communicate in a parallel computing environment. MPI has become the de facto standard for high-performance parallel computing in a wide range of applications, from simulations in scientific research to large-scale data processing. At its core, MPI provides various communication mechanisms, including point-to-point and collective operations, allowing data to be exchanged between processes irrespective of their physical location—be it on the same machine or across a vast cluster of computers. By abstracting the complexities of inter-process communication, MPI empowers developers to craft scalable parallel software efficiently and effectively.

`mpirun` is a command-line utility in most MPI implementations, used to start parallel jobs in distributed computing environments. It launches a specified number of processes on different nodes, enabling them to work together on MPI-enabled applications. The number of processes and their distribution can be controlled by various command-line options and arguments provided to `mpirun`. For instance, using `-n 4` would initiate four parallel processes.

In [3]:
!pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.6.tar.gz (2.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.6-cp310-cp310-linux_x86_64.whl size=2746318 sha256=4eedb41c76f1071cd06299e95ca9002f70d4b168441dd8a9a879d89a4ee09932
  Stored in directory: /root/.cache/pip/wheels/4c/ca/89/8fc1fb1c620afca13bb41c630b1f948bbf446e0aaa4b762e10
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.6


In Colab, we need to set the following environment variables to allow for the execution of MPI as the root user and enable oversubscription to control resource allocation.

In [4]:
%env OMPI_ALLOW_RUN_AS_ROOT=1
%env OMPI_ALLOW_RUN_AS_ROOT_CONFIRM=1
%env OMPI_MCA_rmaps_base_oversubscribe=true

env: OMPI_ALLOW_RUN_AS_ROOT=1
env: OMPI_ALLOW_RUN_AS_ROOT_CONFIRM=1
env: OMPI_MCA_rmaps_base_oversubscribe=true


## Distributed Training

### **TODO 1:** Data parallel distributed SGD training without parameter server.
In this task, you'll simulate data parallel distributed training without a parameter server using MPI. Each rank will simulate a single GPU within a single machine. During the training process, each rank will handle fetching corresponding data and training the model on its allocated portion. Gradients will be synchronized across ranks using the `all_reduce` function.

To access gradients of the parameters in the model, you can iterate over the model parameters:
```
for param in model.parameters():
    print(param.grad)
```
Note `param.grad` is initially set as None and becomes a Tensor the first time a call to `loss.backward()`.

The training loss at each printing step should be close to the coresponding one from the centralized implmentaion.

In [5]:
%%writefile serverless.py

import torch
import logging

from utils import load_data, create_dataloader, set_random_seed, Net

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if __name__ == "__main__":
    # fix randomness
    set_random_seed(42)

    logging.basicConfig(level=logging.INFO)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Rank {rank} is using device {device}")

    # define model and move to device
    model = Net()
    model.to(device)

    # define optimizer and loss function
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    num_epochs = 3
    batch_size = 100

    # load cifar10 dataset
    train_data, test_data = load_data()
    train_loader, test_loader = create_dataloader(
        train_data, test_data, batch_size=batch_size
    )

    # compute number of samples for each rank
    num_samples_per_rank = int(batch_size / size)

    for epoch_idx in range(num_epochs):
        model.train()
        running_loss = 0

        for batch_idx, (data_global, target_global) in enumerate(train_loader):
            data_split = torch.split(data_global, num_samples_per_rank, dim=0)
            target_split = torch.split(target_global, num_samples_per_rank, dim=0)

            data_local = data_split[rank].to(device)
            target_local = target_split[rank].to(device)

            # compute loss
            output = model(data_local)
            loss = criterion(output, target_local)

            # compute gradients
            loss.backward()

            # TODO: reduce loss to rank 0, this is not necessary but just for debugging
            # remember to multiply loss by number of local samples before reduce

            loss *= len(data_local)
            loss = comm.reduce(loss, op=MPI.SUM, root=0)

            # TODO: allreduce gradients for each layer
            # remember to multiply gradients by number of local samples before allreduce
            # and divide gradients by number of global samples after allreduce

            for param in model.parameters():
                param.grad *= len(data_local)
                param.grad = comm.allreduce(param.grad, op=MPI.SUM)
                param.grad /= len(data_global)


            # update parameters
            optimizer.step()

            # clear gradients
            optimizer.zero_grad()

            # print statistics
            if rank == 0:
                loss /= data_global.shape[0]
                running_loss += loss.item()

                if (batch_idx + 1) % 200 == 0:
                    logging.info(
                        f"Epoch: {epoch_idx} Batch: {batch_idx+1}/{len(train_loader)} | Loss: {running_loss/200:.4f}"
                    )
                    running_loss = 0

    # evaluate model on testset after training on rank 0
    model.eval()
    if rank == 0:
        with torch.no_grad():
            correct = 0
            total = 0
            for data, target in test_loader:
                data = data.to(device)
                target = target.to(device)

                output = model(data)
                _, predicted = torch.max(output.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()

            accuracy = correct / total
            logging.info(f"Test Accuracy: {accuracy:.4f}")

Writing serverless.py


Launch MPI with 4 processes to run serverless.py

In [6]:
!mpirun -n 4 python serverless.py

INFO:root:Rank 3 is using device cuda
INFO:root:Rank 1 is using device cuda
INFO:root:Rank 0 is using device cuda
INFO:root:Rank 2 is using device cuda
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
INFO:root:Epoch: 0 Batch: 200/500 | Loss: 2.2050
INFO:root:Epoch: 0 Batch: 400/500 | Loss: 1.7737
INFO:root:Epoch: 1 Batch: 200/500 | Loss: 1.5127
INFO:root:Epoch: 1 Batch: 400/500 | Loss: 1.4385
INFO:root:Epoch: 2 Batch: 200/500 | Loss: 1.3218
INFO:root:Epoch: 2 Batch: 400/500 | Loss: 1.2896
INFO:root:Test Accuracy: 0.5577


### **TODO 2:** Data parallel distributed SGD training with parameter server.

In this task, you will implement data parallel distributed training with a parameter server. Assume multiple machines, each equipped with one GPU (simulated by ranks other than 0), cooperate in the training process. Each rank (excluding rank 0) will be responsible for sending gradients to the server (rank 0), which will receive, average these gradients, and send them back for further updates.

The training loss at each printing step should be close to the coresponding one from the centralized implmentaion.

In [None]:
%%writefile parameter_server.py

import torch
import logging

from utils import load_data, create_dataloader, set_random_seed, Net

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if __name__ == "__main__":
    # fix randomness
    set_random_seed(42)

    logging.basicConfig(level=logging.INFO)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Rank {rank} is using device {device}")

    # define model and move to device
    model = Net()
    model.to(device)

    # define optimizer and loss function
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    num_epochs = 3
    batch_size = 100

    # load cifar10 dataset
    train_data, test_data = load_data()
    train_loader, test_loader = create_dataloader(
        train_data, test_data, batch_size=batch_size
    )

    # compute number of samples for each rank
    # note that rank 0 is the parameter server, and we have size-1 workers
    num_samples_per_rank = int(batch_size / (size - 1))

    for epoch_idx in range(num_epochs):
        running_loss = 0

        for batch_idx, (data_global, target_global) in enumerate(train_loader):
            if rank == 0:
                data_split = torch.split(data_global, num_samples_per_rank, dim=0)
                target_split = torch.split(target_global, num_samples_per_rank, dim=0)

                # TODO: send local data and target to other ranks
                for i in range(1, size):
                    data_local = data_split[i-1]
                    target_local = target_split[i-1]
                    comm.send(data_local, dest=i, tag=0)
                    comm.send(target_local, dest=i, tag=1)


                # TODO: receive loss from other ranks and sum them up
                loss = 0
                for i in range(1, size):
                    loss += comm.recv(source=i, tag=2)


                # TODO: compute averaged loss by dividing number of global samples
                # accumulate averged loss to running loss
                running_loss += loss.item()/len(data_global)


                # TODO: receive gradients from other ranks and sum them up
                # remember to divide gradients by number of global samples
                for param in model.parameters():
                    param.grad = torch.zeros_like(param.data)

                    for i in range(1, size):
                        param.grad += comm.recv(source=i, tag=3)

                    param.grad /= len(data_global)


                # TODO: update parameters and send updated model to other ranks
                optimizer.step()

                for i in range(1, size):
                    comm.send(model, dest=i, tag=10)

                # clear gradients
                optimizer.zero_grad()

                # print statistics
                if (batch_idx + 1) % 200 == 0:
                    logging.info(
                        f"Epoch: {epoch_idx} Batch: {batch_idx+1}/{len(train_loader)} | Loss: {running_loss/200:.4f}"
                    )
                    running_loss = 0

            else:
                model.train()

                # TODO: receive data from rank 0
                data_local = comm.recv(source=0, tag=0)
                target_local = comm.recv(source=0, tag=1)


                # TODO: move data to device
                data_local = data_local.to(device)
                target_local = target_local.to(device)


                # TODO: compute loss
                output = model(data_local)
                loss = criterion(output, target_local)


                # TODO: compute gradients
                loss.backward()


                # TODO: send loss to rank 0, this is not necessary but just for debugging
                # remember to multiply loss by number of local samples
                loss *= len(data_local)
                comm.send(loss, dest=0, tag=2)


                # TODO: send gradients to rank 0, remember to multiply gradients by number of local samples
                for param in model.parameters():
                    param.grad *= len(data_local)
                    comm.send(param.grad, dest=0, tag=3)

                # TODO: receive updated model from rank 0
                model = comm.recv(source=0, tag=10)


                # clear gradients
                optimizer.zero_grad()

    # evaluate model on testset after training on parameter server
    model.eval()
    if rank == 0:
        with torch.no_grad():
            correct = 0
            total = 0
            for data, target in test_loader:
                data = data.to(device)
                target = target.to(device)

                output = model(data)
                _, predicted = torch.max(output.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()

            accuracy = correct / total
            logging.info(f"Test Accuracy: {accuracy:.4f}")

Writing parameter_server.py


In [None]:
!mpirun -n 4 python parameter_server.py

INFO:root:Rank 0 is using device cuda
INFO:root:Rank 2 is using device cuda
INFO:root:Rank 1 is using device cuda
INFO:root:Rank 3 is using device cuda
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
INFO:root:Epoch: 0 Batch: 200/500 | Loss: 2.1845
INFO:root:Epoch: 0 Batch: 400/500 | Loss: 1.7600
INFO:root:Epoch: 1 Batch: 200/500 | Loss: 1.5021
INFO:root:Epoch: 1 Batch: 400/500 | Loss: 1.4261
INFO:root:Epoch: 2 Batch: 200/500 | Loss: 1.3138
INFO:root:Epoch: 2 Batch: 400/500 | Loss: 1.2814
INFO:root:Test Accuracy: 0.5524


## Federated Learning

### **TODO 3:**

Use MPI to simulate federated learning with fedavg algorithm. Similar to distributed training with parameter server, assume multiple clients, each equipped with one GPU (simulated by ranks other than 0), cooperate in the training process. Each rank (excluding rank 0) will be responsible for sending modles (not gradients) to the server (rank 0), which will receive, average these models, and send them back for further updates.

Split the data into independent, identically distributed (IID) by specifying `iid` in the command line.

In [8]:
%%writefile fed_avg.py

import torch
import logging
import sys

from utils import load_data, create_dataloader, set_random_seed, Net, data_split, plot_data_split

from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()


class Aggregator:
    def __init__(self, model):
        self.model = model

    def get_model_params(self):
        return self.model.state_dict()

    def set_model_params(self, model_params):
        self.model.load_state_dict(model_params)

    def aggregate(self, state_dict_list, num_samples_list):
        # TODO: aggregate local model params using federated averaging algorithm
        # the weight of each local model is num of samples on that client / total num of samples
        # return the aggregated model params in the form of a state_dict
        avg_params = state_dict_list[0]
        for k in avg_params.keys():
            for i in range(0, len(state_dict_list)):
                local_model_params = state_dict_list[i]
                w = num_samples_list[i] / sum(num_samples_list)
                if i == 0:
                    avg_params[k] = local_model_params[k] * w
                else:
                    avg_params[k] += local_model_params[k] * w

        return avg_params


if __name__ == "__main__":
    # fix randomness
    set_random_seed(42)

    logging.basicConfig(level=logging.INFO)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Rank {rank} is using device {device}")

    # define model and move to device
    model = Net()
    model.to(device)

    # define optimizer and loss function
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    num_rounds = 5
    num_epochs = 1
    batch_size = 32

    # load cifar10 dataset
    train_data, test_data = load_data()

    # in simulation each client can see the whole dataset but only uses part of it
    # however, in practice, each client will have its distinct dataset and cannot access others' dataset
    split_method = sys.argv[1]  # options: "non-iid" or "iid"
    if rank == 0:
        logging.info(f"Data split method: {split_method}")

    train_data_split = data_split(
        dataset=train_data, num_clients=size - 1, split_method=split_method
    )

    # plot the distribution of each client's dataset
    plot_data_split(
        data_split=train_data_split,
        num_clients=size - 1,
        num_classes=10,
        file_name=f"./{split_method}.png",
    )

    test_data_split = torch.utils.data.random_split(
        test_data,
        [len(test_data) // (size - 1) for _ in range(size - 1)],
    )

    if rank == 0:
        # initialize aggregator in the parameter server
        aggregator = Aggregator(model)

    # start federated learning
    for round_idx in range(num_rounds):
        # rank 0 is the parameter server and rest of the ranks are clients
        if rank == 0:
            # TODO: get global model params from the aggregator
            global_model_params = aggregator.get_model_params()

            # TODO: send global model params to other ranks
            for i in range(1, size):
                comm.send(global_model_params, dest=i, tag=0)

            local_model_params_list = []
            num_samples_list = []
            # TODO: receive local model params from other ranks and append to local_model_params_list
            # and receive number of samples from other ranks and append to num_samples_list
            for i in range(1, size):
                local_model_params_list.append(comm.recv(source=i, tag=1))
                num_samples_list.append(comm.recv(source=i, tag=2))

            # TODO: aggregate local model params
            global_model_params = aggregator.aggregate(local_model_params_list, num_samples_list)

            # TODO: set global model params to the aggregator
            aggregator.set_model_params(global_model_params)


            logging.info(
                f"--------- | Round: {round_idx} | aggregation finished | ---------"
            )
        else:
            # TODO: create data loader on local dataset
            train_loader, test_loader = create_dataloader(
                train_data_split[rank - 1],
                test_data_split[rank - 1],
                batch_size=batch_size,
            )

            # TODO: receive global model params from parameter server
            global_model_params = comm.recv(source=0, tag=0)

            # TODO: set local model params
            model.load_state_dict(global_model_params)

            # start local training
            model.train()
            running_loss = 0

            for epoch_idx in range(num_epochs):
                for batch_idx, (data_local, target_local) in enumerate(train_loader):
                    data_local = data_local.to(device)
                    target_local = target_local.to(device)

                    output = model(data_local)
                    loss = criterion(output, target_local)

                    loss.backward()
                    optimizer.step()
                    optimizer.zero_grad()

                    running_loss += loss.item()

            # evaluate model on testset
            model.eval()
            correct = 0
            total = 0
            accuracy = 0

            with torch.no_grad():
                for data_local, target_local in test_loader:
                    data_local = data_local.to(device)
                    target_local = target_local.to(device)

                    output = model(data_local)
                    _, predicted = torch.max(output.data, 1)
                    total += target_local.size(0)
                    correct += (predicted == target_local).sum().item()

                accuracy = correct / total
                running_loss = running_loss / len(train_loader) / num_epochs
                logging.info(
                    f"Client: {rank} | Round: {round_idx} | Loss: {running_loss:.4f} | Accuracy: {accuracy:.4f}"
                )

            # TODO: send local model params to parameter server
            local_model_params = model.state_dict()
            comm.send(local_model_params, dest=0, tag=1)

            # TODO: send number of samples to parameter server
            num_samples = len(train_data_split[rank - 1])
            comm.send(num_samples, dest=0, tag=2)


Writing fed_avg.py


In [9]:
!mpirun -n 5 python fed_avg.py iid

INFO:root:Rank 0 is using device cuda
INFO:root:Rank 4 is using device cuda
INFO:root:Rank 3 is using device cuda
INFO:root:Rank 2 is using device cuda
INFO:root:Rank 1 is using device cuda
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
INFO:root:Data split method: iid
INFO:root:Client: 4 | Round: 0 | Loss: 2.0618 | Accuracy: 0.3596
INFO:root:Client: 1 | Round: 0 | Loss: 2.0588 | Accuracy: 0.3312
INFO:root:Client: 3 | Round: 0 | Loss: 2.0589 | Accuracy: 0.3628
INFO:root:Client: 2 | Round: 0 | Loss: 2.0660 | Accuracy: 0.3232
INFO:root:--------- | Round: 0 | aggregation finished | ---------
INFO:root:Client: 2 | Round: 1 | Loss: 1.6750 | Accuracy: 0.4268
INFO:r

### **TODO 4:**

Split the data into non-independent, identically distributed (Non-IID) by specifying `non-iid` in the command line and run the code again.

In [10]:
!mpirun -n 5 python fed_avg.py non-iid

INFO:root:Rank 1 is using device cuda
INFO:root:Rank 3 is using device cuda
INFO:root:Rank 2 is using device cuda
INFO:root:Rank 4 is using device cuda
INFO:root:Rank 0 is using device cuda
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
INFO:root:Data split method: non-iid
INFO:root:Client: 3 | Round: 0 | Loss: 0.5874 | Accuracy: 0.1012
INFO:root:Client: 1 | Round: 0 | Loss: 1.0305 | Accuracy: 0.2056
INFO:root:Client: 4 | Round: 0 | Loss: 0.8795 | Accuracy: 0.2452
INFO:root:Client: 2 | Round: 0 | Loss: 0.9865 | Accuracy: 0.2500
INFO:root:--------- | Round: 0 | aggregation finished | ---------
INFO:root:Client: 3 | Round: 1 | Loss: 0.4831 | Accuracy: 0.1032
IN

### **TODO 5:**

1. Check the data distribution plots named "iid.png" and "non-idd.png" by clicking the "Files" folder icon on the left pannel bar. Comment on why non-iid federated learning could lead to worse performance.
1. Assume the model has `P` trainable parameters. For distributed training, we have `N` processes and train `S` steps (a step means one update of the model's parameters using a batch of training data). For federated learning, we have `N` clients and train `R` rounds. Analyze the total amount of data transmission for each paradigm and quantify it in terms of `P`, `N`, `S`, or `R`.

Your answer:

1. In non-iid federated learning, sample data in each client is bias in particular class, leading to weight divergence during the training process.

2. Distributed Training: $2PNS$ because in each step, loss is gathered, calculated and then reduced distributedly. Therefore, for each parameters in one process in one step, there are 2 data transmission.

  Federated Learning: $2NR$ because in each round of the federated learning, the clients receive the model parameters from the server, and send back the trained parameters back to the server after training.