Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在 SecretFlow 中实现经典联邦策略 FedDYN #1235

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions secretflow/ml/nn/fl/backend/torch/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .fed_prox import PYUFedProx
from .fed_scr import PYUFedSCR
from .fed_stc import PYUFedSTC
from .fed_dyn import PYUFedDyn

__all__ = [
'PYUFedAvgW',
Expand All @@ -26,4 +27,5 @@
'PYUFedProx',
'PYUFedSCR',
'PYUFedSTC',
'PYUFedDYN',
]
174 changes: 174 additions & 0 deletions secretflow/ml/nn/fl/backend/torch/strategy/fed_dyn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import copy
from typing import Tuple

import numpy as np

import torch
from secretflow.ml.nn.fl.backend.torch.fl_base import BaseTorchModel
from secretflow.ml.nn.fl.strategy_dispatcher import register_strategy


class FedDYN(BaseTorchModel):
def initialize(self, *args, **kwargs):
UMR99 marked this conversation as resolved.
Show resolved Hide resolved
self.grad_l = self.model.get_gradients() # client gradient
UMR99 marked this conversation as resolved.
Show resolved Hide resolved
self.alpha = 0.1 # FedDYN algorithm hyperparameters, can be selected from [0.1, 0.01, 0.001]

def train_step(
self, weights: np.ndarray, cur_steps: int, train_steps: int, **kwargs
) -> Tuple[np.ndarray, int]:
"""Accept ps model params, then do local train

Args:
weights: global weight from params server
cur_steps: current train step
train_steps: local training steps
kwargs: strategy-specific parameters
Returns:
Parameters after local training
"""
if weights is None:
assert (
self.model is not None
), "Model cannot be none, please give model define"
self.model.train()
refresh_data = kwargs.get("refresh_data", False)
if refresh_data:
self._reset_data_iter()
if weights is not None:
self.model.update_weights(weights)
num_sample = 0
dp_strategy = kwargs.get("dp_strategy", None)
logs = {}

for _ in range(train_steps):
self.optimizer.zero_grad()

x, y, s_w = self.next_batch()
num_sample += x.shape[0]
y_pred = self.model(x)

# do back propagation
loss = self.loss(y_pred, y.long())
loss.backward()
self.optimizer.step()
for m in self.metrics:
m.update(y_pred.cpu(), y.cpu())
loss_value = loss.item()
logs["train-loss"] = loss_value

self.logs = self.transform_metrics(logs)
self.wrapped_metrics.extend(self.wrap_local_metrics())
self.epoch_logs = copy.deepcopy(self.logs)

model_weights = self.model.get_weights(return_numpy=True)

# DP operation
if dp_strategy is not None:
if dp_strategy.model_gdp is not None:
model_weights = dp_strategy.model_gdp(model_weights)

return model_weights, num_sample
self.initialize(self)
assert self.model is not None, "Model cannot be none, please give model define"
refresh_data = kwargs.get("refresh_data", False)
if refresh_data:
self._reset_data_iter()
# global parameters
self.model.update_weights(
weights
) # The local model is initialized to the global model
src_model = copy.deepcopy(self.model) # Record global model

for p in src_model.parameters():
p.requires_grad = False

self.model.train()
num_sample = 0
dp_strategy = kwargs.get("dp_strategy", None)
logs = {}

for _ in range(train_steps):
self.optimizer.zero_grad()
iter_data = next(self.train_iter)
if len(iter_data) == 2:
x, y = iter_data
s_w = None
elif len(iter_data) == 3:
x, y, s_w = iter_data
x = x.float()
num_sample += x.shape[0]
if len(y.shape) == 1:
y_t = y
else:
if y.shape[-1] == 1:
y_t = torch.squeeze(y, -1).long()
else:
y_t = y.argmax(dim=-1)
if self.use_gpu:
x = x.to(self.exe_device)
y_t = y_t.to(self.exe_device)
if s_w is not None:
s_w = s_w.to(self.exe_device)
y_pred = self.model(x)

# do back propagation
loss = self.loss(y_pred, y_t.long())

l1 = loss # first sub-formula L_k(theta)
l2 = 0 # second sub-formula
l3 = 0 # The third sub-formula ||theta - theta_t-1||^2
for pgl, pm, ps in zip(
self.gradL, self.model.parameters(), src_model.parameters()
):
# pgl represents client gradient, pm represents client model, ps represents server model
pgl = torch.Tensor(pgl)
l2 += torch.dot(pgl.view(-1), pm.view(-1))
l3 += torch.sum(torch.pow(pm - ps, 2))
loss = l1 - l2 + 0.5 * self.alpha * l3

loss.backward()
self.optimizer.step()
for m in self.metrics:
m.update(y_pred.cpu(), y_t.cpu())

# update grad_L
new_gradL = []
for pgl, pm, ps in zip(
self.gradL, self.model.parameters(), src_model.parameters()
):
pgl = torch.Tensor(pgl)
ori_shape = pgl.size()
pgl_tmp = pgl.view(-1) - self.alpha * (pm.view(-1) - ps.view(-1))
pgl_tmp = pgl_tmp.view(ori_shape)
new_gradL.append(pgl_tmp.detach().clone())
self.gradL = new_gradL

loss_value = loss.item()
logs["train-loss"] = loss_value

self.logs = self.transform_metrics(logs)
self.wrapped_metrics.extend(self.wrap_local_metrics())
self.epoch_logs = copy.deepcopy(self.logs)

model_weights = self.model.get_weights(return_numpy=True)

# DP operation
if dp_strategy is not None:
if dp_strategy.model_gdp is not None:
model_weights = dp_strategy.model_gdp(model_weights)

return model_weights, num_sample

def apply_weights(self, weights, **kwargs):
"""Accept ps model params, then update local model

Args:
weights: global weight from params server
"""
if weights is not None:
self.model.update_weights(weights)


@register_strategy(strategy_name="fed_dyn", backend="torch")
class PYUFedDYN(FedDYN):
pass
76 changes: 76 additions & 0 deletions tests/ml/nn/sl/test_fed_dyn_torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import torch
import torch.optim as optim
from secretflow.ml.nn.fl.backend.torch.strategy.fed_dyn import FedDYN
from secretflow.ml.nn.utils import BaseModule
from torch import nn
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader, TensorDataset
from torchmetrics import Accuracy, Precision, Recall


class MNIST_Model(BaseModule):
def __init__(self):
super(MNIST_Model, self).__init__()
self.encoder = nn.Sequential(
nn.Flatten(1),
nn.Linear(784, 200),
nn.ReLU(),
nn.Linear(200, 100),
nn.ReLU(),
)
self.head = nn.Linear(100, 10)

def forward(self, x):
x = self.encoder(x)
x = self.head(x)
return x


class TestFedDYN:
def test_fed_dyn_local_step(self, sf_simulation_setup_devices):
class ConvNetBuilder:
def __init__(self):
self.metrics = [
lambda: Accuracy(task="multiclass", num_classes=10, average="macro")
]

def model_fn(self):
return MNIST_Model()

def loss_fn(self):
return CrossEntropyLoss()

def optim_fn(self, parameters):
return optim.Adam(parameters)

# Initialize FedDYN strategy with ConvNet model
conv_net_builder = ConvNetBuilder()
fed_dyn_worker = FedDYN(builder_base=conv_net_builder)

# Prepare dataset
x_test = torch.rand(128, 1, 28, 28) # Randomly generated data
y_test = torch.randint(
0, 10, (128,)
) # Randomly generated labels for a 10-class task
test_loader = DataLoader(
TensorDataset(x_test, y_test), batch_size=32, shuffle=True
)
fed_dyn_worker.train_set = iter(test_loader)
fed_dyn_worker.train_iter = iter(fed_dyn_worker.train_set)

# Perform a training step
gradients = None
gradients, num_sample = fed_dyn_worker.train_step(
gradients, cur_steps=0, train_steps=1
)

# Apply weights update
fed_dyn_worker.apply_weights(gradients)

# Assert the sample number and length of gradients
assert num_sample == 32 # Batch size
assert len(gradients) == len(list(fed_dyn_worker.model.parameters())) # Number of model parameters

# Perform another training step to test cumulative behavior
_, num_sample = fed_dyn_worker.train_step(gradients, cur_steps=1, train_steps=2)
assert num_sample == 64 # Cumulative batch size over two steps
Loading