# Imports


In [None]:
import os
import numpy as np
import pandas as pd
import time
import argparse
import sys

import math
from math import ceil
from random import Random

import torch
import torch.distributed as dist
import torch.utils.data.distributed
import torch.nn as nn
import torch.nn.functional as F
from torch.multiprocessing import Process
import torchvision
from torchvision import datasets, transforms
import torch.backends.cudnn as cudnn
import torchvision.models as models

from distoptim import FedProx, FedNova
import util_v4 as util
import logging
import time

# import matplotlib.pyplot as plt\
import plotly.express as px

from collections import namedtuple
from typing import List, Tuple



: 

In [None]:

parser = argparse.ArgumentParser(description='CIFAR-10 baseline')
parser.add_argument('--name','-n', 
                    default="default", 
                    type=str, 
                    help='experiment name, used for saving results')
parser.add_argument('--backend',
                    default="nccl",
                    type=str,
                    help='background name')
parser.add_argument('--model', 
                    default="VGG", #default: res
                    type=str, 
                    help='neural network model')
parser.add_argument('--alpha', 
                    default=0.2, 
                    type=float, 
                    help='control the non-iidness of dataset')
parser.add_argument('--gmf', 
                    default=0, 
                    type=float, 
                    help='global (server) momentum factor')
parser.add_argument('--lr', 
                    default=0.1, 
                    type=float, 
                    help='client learning rate')
parser.add_argument('--momentum', 
                    default=0.0, 
                    type=float, 
                    help='local (client) momentum factor')
parser.add_argument('--bs', 
                    default=32, 
                    type=int, 
                    help='batch size on each worker/client')
parser.add_argument('--rounds', 
                    default=4, 
                    type=int, 
                    help='total coommunication rounds')
parser.add_argument('--localE', 
                    default=1, 
                    type=int, 
                    help='number of local epochs')
parser.add_argument('--print_freq', 
                    default=100, 
                    type=int, 
                    help='print info frequency')
parser.add_argument('--size', 
                    default=1, 
                    type=int, 
                    help='number of local workers')
parser.add_argument('--rank', 
                    default=0, 
                    type=int, 
                    help='the rank of worker')
parser.add_argument('--seed', 
                    default=1, 
                    type=int, 
                    help='random seed')
parser.add_argument('--save', '-s', 
                    default=True, 
                    action='store_true', 
                    help='whether save the training results')
parser.add_argument('--p', '-p', 
                    action='store_true', 
                    help='whether the dataset is partitioned or not')
parser.add_argument('--NIID',
                    
                    action='store_true',
                    help='whether the dataset is non-iid or not')
parser.add_argument('--pattern',
                    default='constant',
                    type=str, 
                    help='pattern of local steps')
parser.add_argument('--optimizer', 
                    default='fednova',  ##default is 'local'
                    type=str, 
                    help='optimizer name')
# parser.add_argument('--initmethod',
#                     default='tcp://h0:22000',
#                     type=str,
#                     help='init method')
parser.add_argument('--mu', 
                    default=0, 
                    type=float, 
                    help='mu parameter in fedprox')
parser.add_argument('--savepath',
                    default='./results/',
                    type=str,
                    help='directory to save exp results')
parser.add_argument('--datapath',
                    default='./data/',
                    type=str,
                    help='directory to load data')


logging.basicConfig(format='%(levelname)s - %(message)s', level=logging.INFO)
logging.debug('This message should appear on the console')
args = parser.parse_args("")

arg_defaults = {}
for key in vars(args):
    arg_defaults[key] = parser.get_default(key)
arg_defaults


: 

: 

: 

In [None]:

LossResult = namedtuple("LossResult", ["sample_size", "loss_value"])

class client:

    def __init__(self):
        self.optimizer = None
        self.model=None
        self.criterion = None
        self.comm_rounds_x= []
        self.acc_y = []
    
    algorithms = {
            'fedavg': FedProx, # mu = 0
            'fedprox': FedProx,
            # 'scaffold': Scaffold,
            'fednova': FedNova,
            # 'fednova_vr':FedNovaVR,
        }
        
    def run(self, init_run:bool, rank: int, size: int, round:int, opt:int):
        # initiate experiments folder
        save_path = args.savepath
        folder_name = save_path+args.name

        if rank == 0 and os.path.isdir(folder_name)==False and args.save: #first client
            os.makedirs(folder_name)
        # dist.barrier()

        # initiate log files
        tag = '{}/lr{:.3f}_bs{:d}_cp{:d}_a{:.2f}_e{}_r{}_n{}.csv'
        saveFileName = tag.format(folder_name, args.lr, args.bs, args.localE, 
                                args.alpha, args.seed, round, size)
        args.out_fname = saveFileName
        with open(args.out_fname, 'w+') as f: #opens this file in results/default and writes the following print statement to it
            print(
                'TRAINING\n'
                'World-Size,{ws}\n'
                'Batch-Size,{bs}\n'
                'Round {rnd}'
                'Epoch,itr,'
                'Loss,avg:Loss, Prec@1, avg:Prec@1, val, time'.format(
                    ws=args.size,
                    bs=args.bs, rnd=round),
                file=f)
    

        # seed for reproducibility
        torch.manual_seed(args.seed)
            
        if init_run:
            # load datasets
            self.train_loader, self.test_loader, self.DataRatios = \
                util.partition_dataset(rank, size, args)
            logging.debug("Worker rank {} local sample ratio {} "
                        "local epoch length {}"
                        .format(rank, self.DataRatios[rank], len(self.train_loader)))
            # define neural nets model, criterion, and optimizer
            self.model = util.select_model(10, args).cpu()
            self.criterion = nn.CrossEntropyLoss().cpu()

            # select optimizer according to algorithm
            
            selected_opt = self.algorithms[args.optimizer] #fednova #instantiating an
            
            self.optimizer = selected_opt(self.model.parameters(),
                                    lr=args.lr,
                                    gmf=args.gmf,
                                    mu=args.mu,
                                    ratio=self.DataRatios[rank],
                                    momentum=args.momentum,
                                    nesterov = False,
                                    weight_decay=1e-4)

        best_test_accuracy = 0
        

        # Decide number of local updates per client
        local_epochs = self.update_local_epochs(args.pattern, rank, round)
        print('local_epochs: '+ str(local_epochs))

        tau_i = local_epochs * len(self.train_loader)
        logging.info("local epochs {} iterations {}"
                        .format(local_epochs, tau_i)) 

        # Decay learning rate according to round index
        self.update_learning_rate(self.optimizer, round, args.lr)
        # Clients locally train for several local epochs
        
               
        for t in range(local_epochs): 
            sample_size, loss_value= self.train(self.model, self.criterion, self.optimizer, self.train_loader, t, rank)
        
        loss_result = LossResult(sample_size,loss_value)
            
            
        
    # # synchronize parameters
    #     # dist.barrier()
    #     comm_start = time.time()
    #     optimizer.average()
    #     # dist.barrier()
    #     comm_end = time.time()
    #     comm_time = comm_end - comm_start
  
    # evaluate test accuracy
        test_acc = self.evaluate(self.model, self.test_loader)
      
        if test_acc > best_test_accuracy:
            best_test_accuracy = test_acc
    #### record metrics ####
        logging.info("Round {} test accuracy {:.3f}".format(round, test_acc))
        with open(args.out_fname, '+a') as f:
            print(' ayo metrics: {ep},{itr},{filler},{filler},'
                    '{filler},{filler},'
                    '{val:.4f}'
                    .format(ep=round, itr=-1,
                            filler=-1, val=test_acc), file=f)

        logging.info("Worker {} best test accuracy {:.3f}"
                     .format(rank, best_test_accuracy))
        
        return self.optimizer, loss_result
        
        
    def evaluate(self, model, test_loader) :
        model.eval()
        top1 = util.Meter(ptag='Loss')
        with torch.no_grad():
            for data, target in test_loader:
                data = data.cpu()
                target = target.cpu()
                outputs = model(data)
                loss1 = util.comp_accuracy(outputs, target)
                top1.update(loss1[0].item(), data.size(0))

        return top1.avg

        
    def train(self, model, criterion, optimizer, loader, epoch, rank):
        for name, param in model.named_parameters():
            if "classifier.6" not in name:
                param.requires_grad = False

        model.train()

        losses = util.Meter(ptag='Loss')
        top1 = util.Meter(ptag='Prec@1')


        for batch_idx, (data, target) in enumerate(loader):
            # data loading
            
            data = data.cpu()
            target = target.cpu()

            # forward pass
            output = model(data)
            loss = criterion(output, target)

            # backward pass
            loss.backward()

            # gradient step
            optimizer.step() #in fednova.py
            optimizer.zero_grad()

            # write log files
            train_acc = util.comp_accuracy(output, target)
            

            losses.update(loss.item(), data.size(0))
            top1.update(train_acc[0].item(), data.size(0))

            if batch_idx % args.print_freq == 0 and args.save:
                logging.debug('epoch {} itr {}, '
                            'rank {}, loss value {:.4f}, train accuracy {:.3f}'
                            .format(epoch, batch_idx, rank, losses.avg, top1.avg))

                with open(args.out_fname, '+a') as f:
                    print('{ep},{itr},'
                        '{loss.val:.4f},{loss.avg:.4f},'
                        '{top1.val:.3f},{top1.avg:.3f},-1'
                        .format(ep=epoch, itr=batch_idx,
                                loss=losses, top1=top1), file=f)

            with open(args.out_fname, '+a') as f:
                print('{ep},{itr},'
                    '{loss.val:.4f},{loss.avg:.4f},'
                    '{top1.val:.3f},{top1.avg:.3f},-1'
                    .format(ep=epoch, itr=batch_idx,
                            loss=losses, top1=top1), file=f)
        print('TEST LOSSES')
        print(losses.avg)
        
        return len(loader), losses.avg

    def update_local_epochs(self, pattern, rank: int, rnd: int):
        if pattern == "constant":
            return args.localE #basic case is what you input

        if pattern == "uniform_random":
            np.random.seed(2020+rank+rnd+args.seed)
            return np.random.randint(low=2, high=args.localE, size=1)[0]


    def update_learning_rate(self, optimizer, epoch, target_lr):
        """
        1) Decay learning rate exponentially (epochs 30, 60, 80)
        ** note: target_lr is the reference learning rate from which to scale down
        """
        if epoch == int(args.rounds / 2):
            lr = target_lr/10
            logging.info('Updating learning rate to {}'.format(lr))
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr

        if epoch == int(args.rounds * 0.75):
            lr = target_lr/100
            logging.info('Updating learning rate to {}'.format(lr))
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr
    
   

: 

: 

: 

In [None]:

#server level

def average_opt(opts_list, weight=0, tau_eff=0):
  
    #Loop through list of optimizers and sum up the tau_eff to get a global tau_eff
    for opt in opts_list:
        
    # tau_eff
    
        if weight == 0:
            weight = opt.ratio
        if tau_eff == 0: 
            tau_eff==0
            if opt.mu != 0:

                tau_eff_cuda = torch.tensor(opt.local_steps*opt.ratio).cpu()
            else:
                print(opt.local_normalizing_vec)
                tau_eff_cuda = torch.tensor(opt.local_normalizing_vec*opt.ratio).cpu()
            # dist.all_reduce(tau_eff_cuda, op=dist.ReduceOp.SUM)
            tau_eff += tau_eff_cuda.item() #Returns the value of this tensor as a standard Python number. This only works for tensors with one element.

    #tau_eff is saved, will be passed onward
    #loop from the optimizers again to get param
    
    full_param_list = [] #contains updated params from all optimizers
    
    for opt in opts_list:
    
        param_list = []
        
        for group in opt.param_groups:
            
            for p in group['params']:
                param_state = opt.state[p]
                scale = tau_eff/opt.local_normalizing_vec
                if 'cum_grad' in param_state: #pytorch automatically does this for you
                    param_state['cum_grad'].mul_(weight*scale)
                    param_list.append(param_state['cum_grad'])
        
        #this part isnt so important since we dont use momentum buffers at the moment
        for group in opt.param_groups:
            lr = group['lr']
            for p in group['params']:
                param_state = opt.state[p]
                
                #optional
                if opt.gmf != 0:
                    if 'global_momentum_buffer' not in param_state:
                        buf = param_state['global_momentum_buffer'] = torch.clone(param_state['cum_grad']).detach()
                        buf.div_(lr)
                    else:
                        buf = param_state['global_momentum_buffer']
                        buf.mul_(opt.gmf).add_(1/lr, param_state['cum_grad'])
                    if 'old_init' in param_state:
                        param_state['old_init'].sub_(lr, buf)
                else:
                    if 'old_init' in param_state:

                        param_state['old_init'].sub_(param_state['cum_grad'])
                
                if 'old_init' and 'cum_grad' in param_state:
                    p.data.copy_(param_state['old_init'])
                    param_state['cum_grad'].zero_()

            # Reinitialize momentum buffer
                if 'momentum_buffer' in param_state:
                    param_state['momentum_buffer'].zero_()       
                    
        opt.local_counter = 0
        opt.local_normalizing_vec = 0
        opt.local_steps = 0
        
        full_param_list.append(param_list)
        
    return full_param_list

def load_state(opt, updated_params):
        
    param_index = 0
    
    for group in opt.param_groups:
        for p in group['params']:
            param_state = opt.state[p]
            if 'cum_grad' in param_state: #pytorch automatically does this for you
                param_state['cum_grad']= updated_params[param_index] 
                param_index+=1  
    
def weighted_loss_avg(results: List[LossResult]) -> float:
    """Aggregate loss evaluation results obtained from multiple clients."""
    num_total_evaluation_examples = sum([result.sample_size for result in results])
    weighted_losses = [result.sample_size * result.loss_value for result in results]
    return sum(weighted_losses) / num_total_evaluation_examples

def plot_metrics(rounds, losses):
        # print('rounds list {}'.format(rounds_x))
        # print('losses list {}'.format(losses_y))

        rounds_list = list(range(1,rounds+1))
  
        x = np.array(rounds_list)
        y = np.array(losses)
        # dataset = pd.DataFrame({'label': label, 'images': list(images)}, columns=['label', 'images'])
        
        df = pd.DataFrame({'round':x , 'loss': y})

        # plt.plot(x,y, marker = 'o')
        # plt.xticks(np.arange(min(x), max(x)+1, 1.0))
        # plt.xlabel("Rounds")
        # plt.ylabel("Loss")      
    
        fig = px.line(df, x="round", y="loss", title='Results')
        fig.show()
    

: 

: 

: 

In [None]:

client1 = client()
client2 = client()

num_rounds = 4

opt1 = 0
opt2 = 0

init_run = True


loss_list = []

for r in range(num_rounds):
         
    opt1, result_1 = client1.run(init_run, rank=args.rank, size=1, round=r, opt=opt1)
    opt2, result_2 = client2.run(init_run, rank=args.rank, size=1, round=r, opt=opt2)
    
    opt_list = [opt1, opt2]
    all_results = [result_1,result_2]

    updated_params = average_opt(opt_list)
    
    load_state(opt1, updated_params[0])
    load_state(opt2, updated_params[1])
    
    ## need sample lengths + loss values 
    
    weighted_avg = weighted_loss_avg(all_results)
    loss_list.append(weighted_avg)

    
    print('-----ROUND COMPLETED-----')    
    ## add a flag that will go to false when the first loop ends, so that it doesn't need to 
    init_run= False
    
    

plot_metrics(num_rounds, loss_list)


: 

: 

: 