### 介绍

这一节介绍的是如何用FLGo实现在通信阶段做出修改的算法。这里使用一个仅在通信阶段做了较少修改的方法作为例子，qffl，该算法由Li Tian等人于2019年提出，发表于ICLR 2020（[论文链接](https://arxiv.org/abs/1905.10497)），旨在提升联邦学习的公平性。下面讲解如何用FLGo实现该算法。

## qffl简介 

该算法受网络中负载均衡的启发，提出了一个更加公平的优化目标：
$$\min_w f_q(w)=\sum_{k=1}^m \frac{p_k}{q+1}F_k^{q+1}(w)$$

其中$q$为人为设定的超参数，$F_k(w)$为用户$k$的本地损失，$p_k$为用户$k$的原始目标函数权重。
通过观察上述目标可以发现，只要令$q>0$，则每个用户在该目标中的损失$F'_k=\frac{F_k^{q+1}}{q+1}$都会具备这样一个性质：随着$F_k$的增大，$F'_k$迅速增大（大于$F_k$的增速），使得全局目标函数$f_q$也迅速增大。因此为了防止$f_q$暴涨，优化该目标函数将被迫自动平衡不同用户的损失值大小，防止其中任意较大值的出现，其中$q$决定了$F'_k$的增速，$q$越大，公平性越强。

为了优化该公平目标函数，作者提出了q-FedAVG算法，该算法核心步骤如下：

1. 用户$k$收到全局模型后，使用全局模型$w^t$评估本地训练集损失，得到$F_k(w^t)$;

2. 用户$k$训练全局模型，得到$\bar{w}_k^{t+1}$后，计算以下变量：

$$\Delta w_k^t=L(w^t-\bar{w}_k^{t+1})\approx\frac{1}{\eta}(w^t-\bar{w}_k^{t+1})\\\Delta_k^t=F_k^q(w^t) \Delta w_k^t\\h_k^t=qF_k^{q-1}(w^t)\|\Delta w_k^t\|^2+LF_k^q(w^t)$$

3. 用户上传$h_k^t$和$\Delta_k^t$；

4. 服务器聚合全局模型为：

$$w^{t+1}=w^t-\frac{\sum_{k\in S_t}\Delta_k^t}{\sum_{k\in S_t}h_k^t}$$

下面介绍q-Fedavg在FLGo中的代码实现。

## 实现qffl

相较于fedavg通信的是全局模型，qffl通信的为$h_k^t$和$\Delta_k^t$，因此在Client本地的pack函数中完成对这两项的计算，并修改返回的字典。相对的，Server端接收的包裹中不止有model，因此用关键字（dk和hk）来取出包裹中的结果，并在iterate中直接调整聚合策略为qffl的形式（聚合过于简单且不存在复用，故不使用aggregate方法）。

In [None]:
import flgo
import flgo.algorithm.fedbase as fedbase
import torch
import flgo.utils.fmodule as fmodule
import flgo.algorithm.fedavg as fedavg
import copy
import os

class Client(fedbase.BasicClient):
    def unpack(self, package):
        model = package['model']
        self.global_model = copy.deepcopy(model)
        return model
    
    def pack(self, model):
        Fk = self.test(self.global_model, 'train')['loss']+1e-8
        L = 1.0/self.learning_rate
        delta_wk = L*(self.global_model - model)
        dk = (Fk**self.q)*delta_wk
        hk = self.q*(Fk**(self.q-1))*(delta_wk.norm()**2) + L*(Fk**self.q)
        self.global_model = None
        return {'dk':dk, 'hk':hk}
        
class Server(fedbase.BasicServer):
    def initialize(self, *args, **kwargs):
        self.init_algo_para({'q': 1.0})
    
    def iterate(self):
        self.selected_clients = self.sample()
        res = self.communicate(self.selected_clients)
        self.model = self.model - fmodule._model_sum(res['dk'])/sum(res['hk'])
        return len(self.received_clients)>0

class qffl:
    Server = Server
    Client = Client
    

## 测试qffl

In [None]:
task = './synthetic11_client100'
config = {'benchmark':{'name':'flgo.benchmark.synthetic_regression', 'para':{'alpha':1, 'beta':1, 'num_clients':100}}}
if not os.path.exists(task): flgo.gen_task(config, task_path = task)
option = {'num_rounds':2000, 'num_epochs':1, 'batch_size':10, 'learning_rate':0.1, 'gpu':0, 'proportion':0.1,'lr_scheduler':0}
fedavg_runner = flgo.init(task, fedavg, option=option)
qffl_runner = flgo.init(task, qffl, option=option)
fedavg_runner.run()
qffl_runner.run()

## 结果分析

这里在100个人的synthetic(1,1)上对qffl的性能做一个初步的验证。

###  跟fedavg比较

In [None]:
import flgo.experiment.analyzer
analysis_plan = {
    'Selector':{
        'task': task,
        'header':['fedavg','qffl_q1.0' ]
    },
    'Painter':{
        'Curve':[
            {'args':{'x': 'communication_round', 'y':'test_loss'}, 'fig_option':{'title':'test loss on Synthetic(1,1)'}},
            {'args':{'x': 'communication_round', 'y':'test_accuracy'},  'fig_option':{'title':'test accuracy on Synthetic(1,1)'}},
            {'args':{'x': 'communication_round', 'y':'std_valid_loss'}, 'fig_option':{'title':'std_valid_loss on Synthetic(1,1)'}},
        ]
    }
}
flgo.experiment.analyzer.show(analysis_plan)

In [None]:
import matplotlib.pyplot as plt
import flgo.experiment.analyzer as al

s = al.Selector({'task':task, 'header':['qffl_q1.0','fedavg']})
records = s.records[task]
for rec in records:
    print(rec.data['option']['algorithm'])
    print(max(rec.data['valid_loss_dist'])-min(rec.data['valid_loss_dist']))
    plt.hist(rec.data['valid_loss_dist'], label=rec.data['option']['algorithm'], bins=len(rec.data['valid_loss_dist']))
plt.legend()
plt.show()

### 参数q的影响

In [None]:
qffl_runner_q50 = flgo.init(task, qffl, option={'algo_para':5.0,'num_rounds':2000, 'num_epochs':1, 'batch_size':10, 'learning_rate':0.1, 'gpu':0, 'proportion':0.1,'lr_scheduler':0})
qffl_runner_q05 = flgo.init(task, qffl, option={'algo_para':0.5,'num_rounds':2000, 'num_epochs':1, 'batch_size':10, 'learning_rate':0.1, 'gpu':0, 'proportion':0.1,'lr_scheduler':0})
qffl_runner_q01 = flgo.init(task, qffl, option={'algo_para':0.1,'num_rounds':2000, 'num_epochs':1, 'batch_size':10, 'learning_rate':0.1, 'gpu':0, 'proportion':0.1,'lr_scheduler':0})
runners = [qffl_runner_q50, qffl_runner_q05, qffl_runner_q01]
for r in runners: r.run()

In [None]:
analysis_on_q = {
    'Selector':{
        'task': task,
        'header':['fedavg','qffl' ]
    },
    'Painter':{
        'Curve':[
            {'args':{'x': 'communication_round', 'y':'test_accuracy'},  'fig_option':{'title':'test accuracy on Synthetic(1,1)'}},
            {'args':{'x': 'communication_round', 'y':'std_valid_loss'}, 'fig_option':{'title':'std_valid_loss on Synthetic(1,1)'}},
            {'args':{'x': 'communication_round', 'y':'mean_valid_accuracy'},  'fig_option':{'title':'mean valid accuracy on Synthetic(1,1)'}},
            
            
        ]
    }
}
flgo.experiment.analyzer.show(analysis_on_q)