In [None]:
from collections import OrderedDict
from functools import reduce
from typing import Dict, List, Optional, Tuple, Union
import os
import flwr as fl
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from flwr.common import (EvaluateRes, FitRes, NDArrays, Parameters, Scalar,
                         ndarrays_to_parameters, parameters_to_ndarrays)
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy.aggregate import weighted_loss_avg
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

device= torch.device("cuda" if torch.cuda.is_available() else "cpu")

Define Model

In [None]:
class MultipleLinearRegressionModel(nn.Module):
    def __init__(self):
        super(MultipleLinearRegressionModel, self).__init__()
        self.hidden1 = nn.Linear(100, 50)  
        self.dropout1 = nn.Dropout(p=0.2) 
        self.hidden2 = nn.Linear(50, 10)   
        self.dropout2 = nn.Dropout(p=0.2) 
        self.output = nn.Linear(10, 1)

    def forward(self, x):
        x = F.relu(self.hidden1(x))
        x = self.dropout1(x)
        x = F.relu(self.hidden2(x))
        x = self.dropout2(x)
        x = self.output(x)
        return x

def train(model,train_loader,criterion,optimizer,epochs):
    model.train()
    for epoch in range(epochs):
        for X_train_tensor,y_train_tensor in train_loader:
            X_train_tensor,y_train_tensor = X_train_tensor.to(device),y_train_tensor.to(device)
            outputs = model(X_train_tensor)
            loss = criterion(outputs, y_train_tensor)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def test(model,val_loader,criterion):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_val_tensor,y_val_tensor in val_loader:
            X_val_tensor,y_val_tensor = X_val_tensor.to(device),y_val_tensor.to(device)
            val_outputs = model(X_val_tensor)
            val_loss += criterion(val_outputs, y_val_tensor).item()
    average_val_loss = val_loss / len(val_loader)
    return average_val_loss

Define a function to generate pairwise difference data

In [None]:
def make_difference_data(file_path):
    df = pd.read_csv(file_path)
    class_counts = df["grade"].value_counts()
    valid_classes = class_counts[class_counts > 1].index
    df_filtered = df[df["grade"].isin(valid_classes)]

    a_df, b_df = train_test_split(df_filtered, test_size=0.2, stratify=df_filtered["grade"], random_state=42)
    
    df2 = pd.DataFrame()
    df3 = pd.DataFrame()
    
    numeric_a_df = a_df.select_dtypes(include=[int, float])
    for i in range(len(numeric_a_df)):
        for j in range(len(numeric_a_df)):
            if i != j:
                diff = numeric_a_df.iloc[j] - numeric_a_df.iloc[i]
                diff = pd.concat([diff, pd.Series([a_df["userid"].iloc[i], a_df["userid"].iloc[j]], index=['user_1', 'user_2'])])
                df2 = pd.concat([df2, diff.to_frame().T], ignore_index=True)

    numeric_b_df = b_df.select_dtypes(include=[int, float])
    for i in range(len(numeric_b_df)):
        for j in range(len(numeric_b_df)):
            if i != j:
                diff2 = numeric_b_df.iloc[j] - numeric_b_df.iloc[i]
                diff2 = pd.concat([diff2, pd.Series([b_df["userid"].iloc[i], b_df["userid"].iloc[j]], index=['user_1', 'user_2'])])
                df3 = pd.concat([df3, diff2.to_frame().T], ignore_index=True)
    return df2, df3

Define a function to prepare training and validation datasets

In [None]:
def prepare_dataset():
    file_paths = [
        "./learnfd_data/CourseA-2019_100dim.csv",
        "./learnfd_data/CourseA-2020_100dim.csv",
        "./learnfd_data/CourseA-2021_100dim.csv",
        "./learnfd_data/CourseB-2019_100dim.csv",
        "./learnfd_data/CourseC-2021-1_100dim.csv",
        "./learnfd_data/CourseC-2021-2_100dim.csv",
        "./learnfd_data/CourseD-2020_100dim.csv",
        "./learnfd_data/CourseD-2021_100dim.csv",
        "./learnfd_data/CourseE-2020-1_100dim.csv",
        "./learnfd_data/CourseE-2020-2_100dim.csv",
        "./learnfd_data/CourseF-2021_100dim.csv",
        "./learnfd_data/CourseG-2021_100dim.csv"
    ]
    num_client = 0
    train_loaders = [] 
    val_loaders = [] 

    for file_path in file_paths:
        num_client += 1
        print(f"Processing file: {file_path}")
        train_df, val_df = make_difference_data(file_path)
        

        feature_columns = [str(i) for i in range(100)]
        X_train = train_df[feature_columns].apply(pd.to_numeric, errors='coerce').values
        y_train = train_df['grade'].apply(pd.to_numeric, errors='coerce').values.reshape(-1, 1)
        X_val = val_df[feature_columns].apply(pd.to_numeric, errors='coerce').values
        y_val = val_df['grade'].apply(pd.to_numeric, errors='coerce').values.reshape(-1, 1) 

        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
        y_val_tensor = torch.tensor(y_val, dtype=torch.float32)

        train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
        val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

        batch_size = 32 
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        train_loaders.append(train_loader)
        val_loaders.append(val_loader)

    return train_loaders, val_loaders, num_client


Define Flower client


In [None]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, train_loader, val_loader) -> None:
        super().__init__()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.train_loader = train_loader
        self.val_loader= val_loader
        self.model = MultipleLinearRegressionModel().to(self.device)
    
    def set_parameters(self, parameters):
        params_dict = zip(self.model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
        self.model.load_state_dict(state_dict, strict=True)

    def get_parameters(self, config: Dict[str, Scalar]):
        return [val.cpu().numpy() for _, val in self.model.state_dict().items()]

    def fit(self, parameters, config):
        self.set_parameters(parameters)

        criterion = torch.nn.MSELoss()
        optimizer = torch.optim.SGD(self.model.parameters(), lr=0.0005)

        train(self.model, self.train_loader, criterion, optimizer, epochs=1)

        return self.get_parameters({}), len(self.train_loader), {}

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        criterion = torch.nn.MSELoss()
        loss = test(self.model, self.val_loader, criterion=criterion)
        
        return float(loss), len(self.val_loader), {}


Define the strategy for Federated Learning

In [None]:
class AggregateCustomMetricStrategy(fl.server.strategy.FedAvg):  
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loss=float("inf")
        self.path_bool = False

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[fl.server.client_proxy.ClientProxy, fl.common.FitRes]],
        failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:

        weights_results = [
                (parameters_to_ndarrays(fit_res.parameters), fit_res.num_examples)
                for _, fit_res in results
            ]

        num_examples_total = sum(num_examples for (_, num_examples) in weights_results)

        weighted_weights = [
            [layer * num_examples for layer in weights] for weights, num_examples in weights_results
        ]

        weights_prime: NDArrays = [
            reduce(np.add, layer_updates) / num_examples_total
            for layer_updates in zip(*weighted_weights)
        ]
        aggregated_parameters = ndarrays_to_parameters(weights_prime)

        metrics_aggregated = {}
        
        if aggregated_parameters is not None:
            if self.path_bool:
                aggregated_ndarrays: List[np.ndarray] = fl.common.parameters_to_ndarrays(aggregated_parameters)
                params_dict = zip(best_net.state_dict().keys(), aggregated_ndarrays)
                state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
                best_net.load_state_dict(state_dict, strict=True)

        return aggregated_parameters,metrics_aggregated

    def aggregate_evaluate(
        self,
        server_round: int,
        results: List[Tuple[ClientProxy, EvaluateRes]],
        failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
    ) -> Tuple[Optional[float], Dict[str, Scalar]]:
        
        if not results:
            return None, {}
        if not self.accept_failures and failures:
            return None, {}

        loss_aggregated = weighted_loss_avg(
            [
                (evaluate_res.num_examples, evaluate_res.loss)
                for _, evaluate_res in results
            ]
        )

        if(loss_aggregated<self.loss):
            self.loss= loss_aggregated
            self.path_bool=True
        else:
            self.path_bool=False

        metrics_aggregated = {}
        if self.evaluate_metrics_aggregation_fn:
            eval_metrics = [(res.num_examples, res.metrics) for _, res in results]
            metrics_aggregated = self.evaluate_metrics_aggregation_fn(eval_metrics)

        return loss_aggregated, metrics_aggregated
    
def get_params(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]


Define a function to generate client instances

In [None]:
def generate_client_fn(train_loader,val_loader):
    def client_fn(cid: str):

        return FlowerClient(
           train_loader=train_loader[int(cid)], val_loader=val_loader[int(cid)]
        ).to_client()

    return client_fn

Execution

Prepare datasets

In [None]:
train_loaders,val_loaders, num_client=prepare_dataset()

Run federated learning simulation and save the best model

In [None]:
for times in range(1,11):
    best_net = MultipleLinearRegressionModel() 
    params=get_params(best_net)

    strategy = AggregateCustomMetricStrategy(
        initial_parameters=fl.common.ndarrays_to_parameters(params),
        fraction_fit=1,  
        fraction_evaluate=1,  
        min_available_clients=num_client, 
    )  

    client_fn_callback = generate_client_fn(train_loaders,val_loaders)
    my_client_resources = {"num_cpus": 2, "num_gpus": 0.16}
    history = fl.simulation.start_simulation(
        client_fn=client_fn_callback, 
        num_clients=num_client, 
        config=fl.server.ServerConfig(num_rounds=250),  
        strategy=strategy, 
        client_resources = my_client_resources
    )
    save_dir = r"./pth_register"
    os.makedirs(save_dir, exist_ok=True)

    torch.save(best_net.state_dict(), os.path.join(save_dir, f"test2_best_model_proposed_method_{times}.pth"))
