<a href="https://colab.research.google.com/github/zhaolotelli/FedLearn/blob/main/Fed_CIFAR10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Pre-training

import packages

In [1]:
import numpy as np
import pandas as pd
import collections
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch import nn, optim
from torch.utils.data import Dataset, TensorDataset, DataLoader, BatchSampler, RandomSampler
from PIL import Image

download CIFAR10 dataset

In [2]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, 
                download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                download=True, transform=transform)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=0.0, max=170498071.0), HTML(value='')))


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


generate non-iid clients

In [3]:
# generate non-iid clients from Dirichlet prior distribution
# the idea of this part comes from https://github.com/IBM/probabilistic-federated-neural-matching
K = 10
alpha = 0.5
num_clients = 100
min_size = 0
y_train = np.array(trainset.targets)

while min_size < 10:
  client_idxs = [[] for _ in range(num_clients)]
  for k in range(K):
    ps = np.random.dirichlet(np.repeat(alpha, num_clients))
    idx_k = np.where(y_train == k)[0]
    np.random.shuffle(idx_k)
    ps = (np.cumsum(ps)*len(idx_k)).astype(int)[:-1]
    client_idx_k = np.split(idx_k, ps)
    client_idxs = [client_idx + idx.tolist() for client_idx, idx in zip(client_idxs, client_idx_k)]
    min_size = min([len(client_idx) for client_idx in client_idxs])

define client data object

In [4]:
class Client_Data(object):
  def __init__(self, dataset, id):
    idx = client_idxs[id]
    self.X = dataset.data[idx]
    self.y = np.array(dataset.targets)[idx]

  # must define len() method for Client Data object
  def __len__(self):
    return len(self.y)

  def __getitem__(self, i):
    return self.X[i], self.y[i]

define custom Dataset object for CIFAR10

In [5]:
class Client_Dataset(Dataset):
  def __init__(self, X, y, transform = transform):
    self.X = X
    self.y = y
    self.transform = transform

  # custom Dataset object must define __len__ and __getitem__ methods
  # len method
  def __len__(self):
    return len(self.y)

  # getitem method
  def __getitem__(self, idx):
    img = Image.fromarray(self.X[idx])

    if self.transform is not None:
      img = self.transform(img)

    return img, torch.tensor(self.y[idx])

## Define client and server

In [6]:
class Client(object):
  """Client for local training

  Args:
    id: id for identify clients
    client_data (:obj:'Client_Data'): data stored in client

  Attributes:
    id: id for identify clients
    client_data: data stored in client
  """
  def __init__(self, id, client_data):
    self.id = id
    self.client_data = client_data

  def create_model(self, Learner, initial_params, learning_rate):
    """generate local model

    Args:
      Learner (:obj:'nn.Module'): learning model
      initial_params (:obj:'list' of :obj:'np.array): a list contains shape(-1) arrays of parameters of model
      learning_rate: learning rate
    """
    self.model = Learner(initial_params, learning_rate)

  def update_model(self, params):
    """assign new params to local model

    Args:
      params (:obj:'list' of :obj:'np.array): a list contains shape(-1) arrays of parameters
    """
    self.model.assign_params(params)

  def train(self, epoch, batch_size):
    """local training

    Args:
      epoch: epochs for local training
      batch_size: batch size for local training

    Returns:
      num_example: number of data contained in this client
      loss: training loss
    """
    self.model.gd(self.client_data, epoch, batch_size)
    loss = self.model.solve_loss(self.client_data)
    num_example = len(self.client_data)
    return num_example, loss

  def sgd(self, batch_size):
    """apply gradient descent randomly on a mini-batch

    Args:
      batch_size: batch size for mini-batch

    Returns:
      num_example: number of data contained in this client
      loss: training loss of model
      grads: gradients of model on mini-batch data
    """
    loss, grads = self.model.sgd(self.client_data, batch_size)
    num_example = len(self.client_data)
    return num_example, loss, grads

In [7]:
class Server(object):
  """Server for training control

  Args:
    ids (list): list of client id numbers
    Learner: name of model object
    initial_params (:obj:'list' of :obj:'np.array): a list contains shape(-1) arrays of parameters of model
    learning_rate: learning rate for local model

  Attributes:
    ids: list of client id numbers
    learner: model name
    model: server model
  """
  def __init__(self, train_data, ids, Learner, initial_params, learning_rate):
    self.ids = ids
    self.learner = Learner
    self.clients = self.set_clients(train_data)
    self.model = self.learner(initial_params, learning_rate)

  def set_clients(self, train_data):
    """set clients with training data
    """
    clients = []
    for id in self.ids:
      client_data = Client_Data(train_data, id)
      c = Client(id, client_data)
      c.create_model(self.learner, initial_params, learning_rate)
      clients.append(c)
    return clients

  def send_model(self):
    """send newest model to all clients
    """
    params = self.model.print_params()
    for c in self.clients:
      c.update_model(params)

  def select_client(self, select_rate):
    """select clients for each iteration
    """
    self.num_clients = np.maximum(1, np.int(np.floor(len(self.ids) * select_rate)))
    select_ids = np.random.choice(self.ids, self.num_clients, replace=False)
    select_clients = []
    for id in select_ids:
      loc_id = np.array([id == idx for idx in self.ids])
      ind = np.int(np.array(range(len(self.ids)))[loc_id])
      select_client = self.clients[ind]
      select_clients.append(select_client)
    return select_clients

## define training model

model

In [8]:
# check if device is gpu or not
dev = torch.device(
    "cuda") if torch.cuda.is_available() else torch.device("cpu")
dev

device(type='cuda')

In [9]:
class cifar10_CNN(nn.Module):
  def __init__(self, initial_params, learning_rate):
    """ CNN model for CIFAR10 dataset
    the CNN model structure is from https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html

    Args:
      initial_params (:obj:'list' of :obj:'np.array): a list contains shape(-1) arrays of parameters of model. 
        if initial_params is None, it will generate random initial parameters. 
      learning_rate: learning rate

    Attributes:
      loss_func: loss function
      lr: learning rate
    """
    super().__init__()
    self.conv1 = nn.Conv2d(3, 6, 5)
    self.pool = nn.MaxPool2d(2, 2)
    self.conv2 = nn.Conv2d(6, 16, 5)
    self.fc1 = nn.Linear(16 * 5 * 5, 120)
    self.fc2 = nn.Linear(120, 84)
    self.fc3 = nn.Linear(84, 10)
    self.lr = learning_rate
    self.loss_func = F.cross_entropy
    if initial_params is not None:
      self.assign_params(initial_params)

  def forward(self, xb):
    xb = self.pool(F.relu(self.conv1(xb)))
    xb = self.pool(F.relu(self.conv2(xb)))
    xb = xb.view(-1, 16 * 5 * 5)
    xb = F.relu(self.fc1(xb))
    xb = F.relu(self.fc2(xb))
    xb = self.fc3(xb)
    return xb

  def gd(self, client_data, epoch, batch_size):
    """gradient descent on client data

    Args:
      client_data (:obj:'Client_Data'): data to train model on
      epoch: epochs for training
      batch_size: batch size for training
    """
    #X, y = map(torch.tensor, (client_data.X, client_data.y))
    X, y = client_data.X, client_data.y
    
    if dev == torch.device("cuda"):
      # training with GPU
      self.to(dev)

    train_ds = Client_Dataset(X, y)
    train_dl = DataLoader(train_ds, batch_size = batch_size)
    opt = optim.SGD(self.parameters(), lr=self.lr, momentum=0.9)
    
    for _ in range(epoch):
      for xb, yb in train_dl:
        if dev == torch.device("cuda"):
          # training with GPU
          xb, yb = xb.to(dev), yb.to(dev)
        pred = self.forward(xb)
        loss = self.loss_func(pred, yb)
        loss.backward()
        opt.step()
        opt.zero_grad()

  def sgd(self, client_data, batch_size):
    """stochastic gradient descent

    Args:
      client_data (:obj:'Client_Data'): data to train model on
      batch_size: batch size for training

    Returns:
      loss_value: loss value
      grads (:obj:'list' of :obj:'np.array'): stochastic gradients on mini-batch data
    """
    #X, y = map(torch.tensor, (client_data.X, client_data.y))
    X, y = client_data.X, client_data.y
    
    if dev == torch.device("cuda"):
      # training with GPU
      self.to(dev)

    train_ds = Client_Dataset(X, y)
    train_dl = DataLoader(train_ds, 
        sampler = BatchSampler(RandomSampler(train_ds), 
        batch_size = batch_size, drop_last = False
    ))
    opt = optim.SGD(self.parameters(), lr=self.lr, momentum=0.9)
    
    xb, yb = next(iter(train_dl))
    yb = yb.view(-1)
    if dev == torch.device("cuda"):
      # training with GPU
      xb, yb = xb.to(dev), yb.to(dev)
    pred = self.forward(xb)
    loss = self.loss_func(pred, yb)
    loss.backward()
    
    if dev == torch.device("cuda"):
      grads = [p.grad.view(-1).cpu().detach().numpy() for p in self.parameters()]
    else:
      grads = [p.grad.view(-1).detach().numpy() for p in self.parameters()]
    loss_value = self.solve_loss(client_data)

    return loss_value, grads

  def assign_params(self, params):
    self.conv1.weight = nn.Parameter(torch.tensor(params[0].reshape(6, 3, 5, 5), dtype=torch.float32))
    self.conv1.bias = nn.Parameter(torch.tensor(params[1], dtype=torch.float32))
    self.conv2.weight = nn.Parameter(torch.tensor(params[2].reshape(16, 6, 5, 5), dtype=torch.float32))
    self.conv2.bias = nn.Parameter(torch.tensor(params[3], dtype=torch.float32))
    self.fc1.weight = nn.Parameter(torch.tensor(params[4].reshape(120, 400), dtype=torch.float32))
    self.fc1.bias = nn.Parameter(torch.tensor(params[5], dtype=torch.float32))
    self.fc2.weight = nn.Parameter(torch.tensor(params[6].reshape(84, 120), dtype=torch.float32))
    self.fc2.bias = nn.Parameter(torch.tensor(params[7], dtype=torch.float32))
    self.fc3.weight = nn.Parameter(torch.tensor(params[8].reshape(10, 84), dtype=torch.float32))
    self.fc3.bias = nn.Parameter(torch.tensor(params[9], dtype=torch.float32))

  def print_params(self):
    """print model parameters

    Returns:
      model parameters
    """
    if dev == torch.device("cuda"):
      params = [p.cpu().detach().numpy().reshape(-1) for p in self.parameters()]
    else:
      params = [p.detach().numpy().reshape(-1) for p in self.parameters()]
    return params
  
  def solve_loss(self, client_data):
    """return the loss value on given data

    Args:
      client_data (:obj:'Client_Data'): data to compute loss value on

    Returns:
      loss value on given data
    """
    X, y = client_data.X, client_data.y
    train_ds = Client_Dataset(X, y)
    train_dl = DataLoader(train_ds, batch_size = 1)

    total_loss = 0
    with torch.no_grad():
      for xb, yb in train_dl:
        if dev == torch.device("cuda"):
          xb, yb = xb.to(dev), yb.to(dev)
        pred = self.forward(xb)
        loss = self.loss_func(pred, yb)
        total_loss += loss.item()
    
    return total_loss

  def predict_accu(self, client_data):
    """return the loss value on given data

    Args:
      client_data (:obj:'Client_Data'): data to compute loss value on

    Returns:
      predict accuracy on given data
    """
    X, y = client_data.X, client_data.y
    train_ds = Client_Dataset(X, y)
    train_dl = DataLoader(train_ds, batch_size = 10)

    correct = 0
    total = 0

    with torch.no_grad():
      for xb, yb in train_dl:
        if dev == torch.device("cuda"):
          xb, yb = xb.to(dev), yb.to(dev)
        pred = self.forward(xb)
        y_pred = F.softmax(pred, dim = 1).detach().argmax(axis = 1)
        total += yb.size(0)
        correct += (y_pred == yb).sum().item()

    return correct / total

## Aggregation

FedAvg

In [10]:
class WAVGM(Server):
  """child class of Server to define certain aggregation method
  WAVGM denotes weighted average model which is the same as FedAvg
  """
  def __init__(self, train_data, ids, Learner, initial_params, learning_rate):
    super(WAVGM, self).__init__(train_data, ids, Learner, initial_params, learning_rate)

  def train(self, epoch, batch_size, select_rate=1):
    """apply one iteration of local training

    Args:
      epoch: local training epochs
      batch_size: local training batch size
      select_rate: the rate of clients to be trained per iteration

    Returns:
      sum of local losses
    """
    self.send_model()
    self.select_clients = self.select_client(select_rate)
    losses = []
    self.client_nums = []
    for client in self.select_clients:
      client_num, client_loss = client.train(epoch, batch_size)
      losses.append(client_loss)
      self.client_nums.append(client_num)
      # print('Client: {}, Local_loss: {:f}'.format(client.id, client_loss))
    self.aggregate()
    return np.sum(losses)
  
  def aggregate(self):
    """custom aggregation method
    simple average for FedAvg in this case

    Returns:
      averaged parameters
    """
    total_params = [np.zeros(len(param)) for param in self.model.print_params()]
    total_num = sum(self.client_nums)
    t = 0
    for c in self.select_clients:
      for i in range(len(total_params)):
        total_params[i] = total_params[i] + self.client_nums[t] / total_num * c.model.print_params()[i]
      t += 1
    self.model.assign_params(total_params)
    return total_params

## Save and load pre-trained parameters

In [11]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [12]:
!ls "/content/gdrive/MyDrive"

 AFL_EMNIST_model_saving_file0	 CIFAR10_model_saving_file6
 AFL_EMNIST_model_saving_file1	 CIFAR10_model_saving_file7
 AFL_EMNIST_model_saving_file2	 CIFAR10_model_saving_file8
 AFL_EMNIST_model_saving_file3	 CIFAR10_model_saving_file9
 AFL_EMNIST_model_saving_file4	'Colab Notebooks'
 AFL_EMNIST_model_saving_file5	 EMNIST_model_saving_file0
 AFL_EMNIST_model_saving_file6	 EMNIST_model_saving_file1
 CIFAR10_model_saving_file0	 EMNIST_model_saving_file2
 CIFAR10_model_saving_file1	 EMNIST_model_saving_file3
 CIFAR10_model_saving_file2	 EMNIST_model_saving_file4
 CIFAR10_model_saving_file3	 EMNIST_model_saving_file5
 CIFAR10_model_saving_file4	'Getting started.pdf'
 CIFAR10_model_saving_file5


load parameters

In [13]:
read_params = [np.fromfile('/content/gdrive/My Drive/CIFAR10_model_saving_file'+str(i), dtype = np.float32) for i in range(10)]

## Training

In [14]:
IDs = np.arange(num_clients)

training with random initial params

In [13]:
initial_params = None
learning_rate = 0.001
EPOCH = 10
BATCH_SIZE = 10

training with pre-trained params

In [15]:
initial_params = read_params
learning_rate = 0.001
EPOCH = 10
BATCH_SIZE = 10

FedAvg

In [16]:
CNN_AVGM_fit = WAVGM(trainset, IDs, cifar10_CNN, initial_params, learning_rate)

In [None]:
ITER = 20
for i in range(ITER):
  loss = CNN_AVGM_fit.train(EPOCH, BATCH_SIZE)
  print('----------iter: {:d}/{:d}, loss: {:f}----------'.format(i+1, ITER, loss))
  if (i+1) % 5 == 0:
    final_model = CNN_AVGM_fit.model
    for k, param in enumerate(final_model.print_params()):
      param.tofile('/content/gdrive/My Drive/CIFAR10_model_saving_file'+str(k))
    print('iter {:d}: model parameters have been saved'.format(i+1))

In [23]:
final_model = CNN_AVGM_fit.model

In [18]:
sample_data = Client_Data(trainset, 0)
sample_data.X = testset.data
sample_data.y = testset.targets

In [24]:
final_model.to(dev)
final_model.predict_accu(sample_data)

0.4857