In this notebook we sketch the implementation of [Artemis](https://arxiv.org/pdf/2006.14591.pdf). In artemis, every device keeps a memory variable $h_{i}$ to track the gradient in order to insure the convergence, and exchanges the compressed version of the difference between the gradient and this value with the server.

In [1]:
BATCH_SIZE = 64
N_WORKERS = 10
LR_ = 1e-3
N_ROUNDS = 100

# Compression 


We first need to implement a compression scheme, in this example we use quantization

In [2]:
import torch
from torch.distributions.bernoulli import Bernoulli

class QuantizationCompressor(object):
    """Taken from https://github.com/epfml/ChocoSGD/"""
    
    def get_qsgd(self, x, s, is_biased=False):
        norm = x.norm(p=2)
        level_float = s * x.abs() / norm
        previous_level = torch.floor(level_float)
        is_next_level = (torch.rand_like(x) < (level_float - previous_level)).float()
        new_level = previous_level + is_next_level

        scale = 1
        if is_biased:
            d = x.nelement()
            scale = 1.0 / (min(d / (s ** 2), math.sqrt(d) / s) + 1.0)
        return scale * torch.sign(x) * norm * new_level / s

    def qsgd_quantize_numpy(self, x, s, is_biased=False):
        """quantize the tensor x in d level on the absolute value coef wise"""
        norm = np.sqrt(np.sum(np.square(x)))
        level_float = s * np.abs(x) / norm
        previous_level = np.floor(level_float)
        is_next_level = np.random.rand(*x.shape) < (level_float - previous_level)
        new_level = previous_level + is_next_level

        scale = 1
        if is_biased:
            d = len(x)
            scale = 1.0 / (np.minimum(d / s ** 2, np.sqrt(d) / s) + 1.0)
        return scale * np.sign(x) * norm * new_level / s

    def compress(self, x: torch.FloatTensor, s: int) -> torch.FloatTensor:
      if s == 0:
        return x
      norm_x = torch.norm(x, p=2)
      if norm_x == 0:
          return x
      ratio = torch.abs(x) / norm_x
      l = torch.floor(ratio * s)
      p = ratio * s - l
      sampled = Bernoulli(p).sample()
      qtzt = torch.sign(x) * norm_x * (l + sampled) / s
      return qtzt

    def compress_orignal(self, arr, quantize_level=8, is_biased=False):
        if quantize_level != 32:
            s = 2 ** quantize_level - 1
            values = self.get_qsgd(arr, s, is_biased)
        else:
            values = arr
        return values

    def uncompress(self, arr):
        return arr


compressor = QuantizationCompressor()


# Dataset

We simply use iid MNIST here, can be changed with any dataset later, Note that you only need to specify a list of `torch.utils.data.DataLoaders`, each of them is a loader of the dataset of a given client.


In [3]:
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import MNIST
import torchvision.transforms as transforms

trans = transforms.Compose([transforms.ToTensor(),
                                 transforms.Normalize((0.5,), (1.0,))
                                 ])

# TODO : To test, not sure what it does exactly.
dataset = Subset(MNIST(root="./", download=True, transform=trans), range(300))

loaders = []
for _ in range(N_WORKERS):
    loader = DataLoader(dataset, shuffle=True, batch_size=BATCH_SIZE)
    loaders.append(loader)


# Learner

We also implement a `Learner` class that will be used to train and evaluate a deep-learning model

In [4]:
import torch


class Learner:
    """
    Responsible of training and evaluating a (deep-)learning model

    Attributes
    ----------
    model (nn.Module): the model trained by the learner
    criterion (torch.nn.modules.loss): loss function used to train the `model`
    metric (fn): function to compute the metric, should accept as input two vectors and return a scalar
    device (str or torch.device):
    optimizer (torch.optim.Optimizer):

    Methods
    ------
    fit_batch: perform an optimizer step over one batch
    fit_batches: perform successive optimizer steps over successive batches
    evaluate_iterator: evaluate `model` on an iterator
    get_param_tensor: get `model` parameters as a unique flattened tensor
    set_param_tensor: 
    get_grad_tensor:
    set_grad: 
    """

    # TODO: decorate getters and setters
    def __init__(self, model, criterion, metric, device, optimizer):
        self.model = model.to(device)
        self.criterion = criterion.to(device)
        self.metric = metric
        self.device = device
        self.optimizer = optimizer

    def compute_batch_gradients(self, iterator):
        """
        perform one forward-backward propagation on one batch drawn from `iterator`
        :param iterator:
        :type iterator: torch.utils.data.DataLoader
        return:
            loss.item()
            metric.item()
        """
        self.model.train()

        x, y = next(iter(iterator))
        x = x.to(self.device).type(torch.float32)
        y = y.to(self.device).type(torch.int64)

        self.optimizer.zero_grad()

        y_pred = self.model(x).squeeze()
        loss = self.criterion(y_pred, y)
        metric = self.metric(y_pred, y)

        loss.backward()

        return loss.item(), metric.item()


    def optimizer_step(self):
        """
        performs one optimizer step after gradients are computed
        return:
            None
        """
        # TODO: add a flag + assertion to determine if gredients are computed or not
        self.optimizer.step()

    def fit_batch(self, iterator):
        """
        perform an optimizer step over one batch drawn from `iterator`
        :param iterator:
        :type iterator: torch.utils.data.DataLoader
        return:
            loss.item()
            metric.item()
        """
        loss, metric = self.compute_batch_gradients(iterator)

        self.optimizer_step()

        return loss, metric

    def evaluate_iterator(self, iterator):
        """
        evaluate learner on `iterator`
        :param iterator:
        :type iterator: torch.utils.data.DataLoader
        :return
            global_loss and  global_metric accumulated over the iterator
        """
        self.model.eval()

        global_loss = 0
        global_metric = 0

        for x, y in iterator:
            x = x.to(self.device).type(torch.float32)
            y = y.to(self.device).type(torch.int64)

            with torch.no_grad():
                y_pred = self.model(x).squeeze()
                global_loss += self.criterion(y_pred, y).item()
                global_metric += self.metric(y_pred, y).item()

        return global_loss, global_metric

    def fit_batches(self, iterator, n_steps):
        """
        perform successive optimizer steps over successive batches drawn from iterator
        :param iterator:
        :type iterator: torch.utils.data.DataLoader
        :param n_steps: number of successive batches
        :type n_steps: int
        :return:
            average loss and metric over the `n_steps`
        """
        global_loss = 0
        global_acc = 0

        for step in range(n_steps):
            batch_loss, batch_acc = self.fit_batch(iterator)
            global_loss += batch_loss
            global_acc += batch_acc

        return global_loss / n_steps, global_acc / n_steps

    def get_param_tensor(self):
        """
        get `model` parameters as a unique flattened tensor
        :return: torch.tensor
        """
        param_list = []

        for param in self.model.parameters():
            param_list.append(param.data.view(-1, ))

        return torch.cat(param_list)

    def set_param_tensor(self, x):
        # add assertion on the shape of x
        idx = 0
        for param in self.model.parameters():
            shape = param.shape
            param.data = x[idx:idx+param.view(-1, ).shape[0]].view(shape)

            idx += param.view(-1, ).shape[0]

    def get_grad_tensor(self):
        """
        get `model` gradients as a unique flattened tensor
        :return: torch.tensor
        """
        grad_list = []

        for param in self.model.parameters():
            grad_list.append(param.grad.data.view(-1, ))

        return torch.cat(grad_list)

    def set_grad_tensor(self, x):
        """
        set the gradients from a tensor
        :return:
            None
        """
        # add assertion on the shape of x
        idx = 0
        for param in self.model.parameters():
            shape = param.shape
            param.grad.data = x[idx:idx+param.view(-1, ).shape[0]].view(shape)

            idx += param.view(-1, ).shape[0]


# Models

We use a two layer neural network, can be replaced with whatever is needed.

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim


class TwoLayersModel(nn.Module):
    def __init__(self):
        super(TwoLayersModel, self).__init__()
        self.fc1 = nn.Linear(784, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = x.view(-1, 784)
        x = F.relu(self.fc1(x))
        return self.fc2(x)


def accuracy(y_pred, y):
    _, predicted = torch.max(y_pred, 1)
    correct = (predicted == y).float()
    acc = correct.sum() / len(correct)
    return acc


def get_optimizer(optimizer_name, model, lr_initial):
    """
    Gets torch.optim.Optimizer given an optimizer name,
     a model and learning rate
    :param optimizer_name: possible are adam and sgd
    :type optimizer_name: str
    :param model: model to be optimized
    :type optimizer_name: nn.Module
    :param lr_initial: initial learning used to build the optimizer
    :type lr_initial: float
    :return: torch.optim.Optimizer
    """
    
    if optimizer_name == "adam":
        return optim.Adam([param for param in model.parameters() 
                           if param.requires_grad], lr=lr_initial)

    elif optimizer_name == "sgd":
        return optim.SGD([param for param in model.parameters()
                          if param.requires_grad], lr=lr_initial)

    else:
        raise NotImplementedError("Other optimizer are not implemented")

def get_learner(device, optimizer_name, initial_lr, seed=1234):
    """
    constructs the learner corresponding to an experiment for a given seed

    :param device: used device; possible `cpu` and `cuda`
    :param optimizer_name: passed as argument to get_optimizer
    :param initial_lr: initial value of the learning rate
    :param seed:
    :return: Learner
    """
    torch.manual_seed(seed)

    criterion = nn.CrossEntropyLoss()
    metric = accuracy
    model = TwoLayersModel()

    optimizer = get_optimizer(optimizer_name=optimizer_name,
                              model=model,
                              lr_initial=initial_lr)

    return Learner(model=model,
                   criterion=criterion,
                   metric=metric,
                   device=device,
                   optimizer=optimizer
                   )





## Artemis


This is the core class of this project 

In [8]:
class Artemis(object):
    """
    Artemis class responsible of running federated learning with double gradient compression scheme

    Attributes
    ----------
    learners (List of Learner): Each entry is responsible of training and evaluating a (deep-)learning model
    loaders (List of torch.utils.data.DataLoader): loader for each client dataset
    alpha (float): parameter to control memory depth 
    variant (int): determine which variant of artemis to use

    Methods
    ------
    step: perform an optimizer step over one batch
    print_logs: 
    """
    
    def __init__(self, learners, loaders, device, alpha=0.1, variant=1):

        assert len(learners) > 0, "Make sure learners is not empty"
        assert len(learners) == len(loaders), 'Make sure you have the same number of learners and loaders'
        assert variant in [0, 1, 2], "Variant must be 0 (no compression), 1 (uni-compression) or 2 (bi-compression)."
        assert device in ["cpu", "cuda"] "Device must be either 'cpu'; either 'cuda'."

        self.learners = learners
        self.loaders = loaders
        self.alpha = alpha
        self.variant = variant
        self.device = device

        self.model_size = learners[0].get_param_tensor().shape[0]
        self.n_workers = len(learners)

        self.memory_terms = torch.zeros(self.n_workers, self.model_size).to(self.device)
        self.global_memory_term = torch.zeros(self.model_size).to(self.device)

    def step(self):
        # TODO: should be initialized with proper shape in future for efficiency
        average_compressed_delta = torch.zeros(self.model_size).to(self.device) # tracks the average compressed delta

        # Compute and compress local gradietns
        for worker_id, learner in enumerate(learners):
            learner.compute_batch_gradients(loaders[worker_id])

            delta = learner.get_grad_tensor() - self.memory_terms[worker_id, :]

            if self.variant == 0:
              compressed_delta = delta
            else:
              compressed_delta = compressor.compress(delta, s=1)

            average_compressed_delta += compressed_delta

            # Update memory Term
            self.memory_terms[worker_id, :] += self.alpha * compressed_delta

        # Update global memory term and global gradient
        average_compressed_delta = (1/N_WORKERS) * average_compressed_delta 
        average_gradient = self.global_memory_term + average_compressed_delta
        self.global_memory_term += self.alpha * average_compressed_delta

        if self.variant in [0, 1]:
          omega = average_gradient
        else:
          omega = compressor.compress(average_gradient, s=1)

        # Gradient update
        for worker_id, learner in enumerate(learners):
          learner.set_grad_tensor(omega)
          learner.optimizer_step()

    def print_logs(self):
        """
        print train/test loss, train/test metric for average model and local models
        """
        global_train_loss = 0
        global_train_metric = 0

        for iterator in self.loaders:
            train_loss, train_metric = self.learners[0].evaluate_iterator(iterator)

            global_train_loss += train_loss
            global_train_metric += train_metric

        global_train_metric /= self.n_workers
        global_train_loss /= self.n_workers

        print("Train/Loss", global_train_loss)
        print("Train/Metric", global_train_metric)





In [9]:
%%time
learners = [get_learner(device="cuda", optimizer_name="sgd", initial_lr=LR_)
             for _ in range(N_WORKERS)]
artemis = Artemis(learners, loaders, 'cuda', variant=2)
for round_idx in range(N_ROUNDS):
    print('Round:{}...'.format(round_idx))
    artemis.print_logs()
    artemis.step()

Round:0...
Train/Loss 11.554882526397705
Train/Metric 0.4992897756397724
Round:1...
Train/Loss 11.554593157768249
Train/Metric 0.49857954755425454
Round:2...
Train/Loss 11.552077698707581
Train/Metric 0.4936079569160938
Round:3...
Train/Loss 11.548051834106445
Train/Metric 0.50142045840621
Round:4...
Train/Loss 11.546336460113526
Train/Metric 0.5021306842565536
Round:5...
Train/Loss 11.5432950258255
Train/Metric 0.5028409108519554
Round:6...
Train/Loss 11.540909242630004
Train/Metric 0.5205965928733349
Round:7...
Train/Loss 11.539944767951965
Train/Metric 0.5177556850016117
Round:8...
Train/Loss 11.542425155639648
Train/Metric 0.527698865532875
Round:9...
Train/Loss 11.538238573074342
Train/Metric 0.5348011411726474
Round:10...
Train/Loss 11.538372731208801
Train/Metric 0.5497159108519554


KeyboardInterrupt: ignored

# TODO tasks

* Complete the #TODO tasks left inside the code
* Use only one learner instead of a list of learners for memory efficiency
* Check if the compression class is working (I copied it directly from choco-SGD repo without any tests)
* Test the evolution of the parameters on a "toy example", for example linear regression.