In [86]:
import os
import torch
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
torch.set_default_device("cpu")
torch.set_default_dtype(torch.float32)

# Force PyTorch to use CPU-only mode
if hasattr(torch.cuda, 'is_available'):
    torch.cuda.is_available = lambda: False

In [87]:
import numpy as np



#%load_ext autoreload

import time
import json
import os
import numpy as np
import flwr as fl
import pickle
from math import floor



from logging import INFO, DEBUG
from flwr.common.logger import log



from src.data.dataset_info import datasets
#data_root = "./datasets/partitions/"

In [88]:
NUM_CLIENTS = 10
BATCH_SIZE = 16
folder_path = "datasets/unified/"
clients_paths = [
    folder_path + "client_0.parquet",
    folder_path + "client_1.parquet",
    folder_path + "client_2.parquet",
    folder_path + "client_3.parquet",
    folder_path + "client_4.parquet",
    folder_path + "client_5.parquet",
    folder_path + "client_6.parquet",
    folder_path + "client_7.parquet",
    folder_path + "client_8.parquet",
    folder_path + "client_9.parquet",
]
multi_class = True
dataset = datasets[0]


In [89]:
# specifying the number of classes
classes_set = {"benign", "attack"}
labels_names = {0: "benign", 1: "attack"}
num_classes = 2

if multi_class:
    import json
    with open("label_mappings.json", 'r') as f:
        data = json.load(f)
        labels_names = {int(k): v for k, v in data["labels_names"].items()}
        classes_set = set(data["classes"])
        num_classes = len(classes_set)

labels_names = {int(k): v for k, v in labels_names.items()}  # Ensure keys are int

print(f"==>> classes_set: {classes_set}")
print(f"==>> num_classes: {num_classes}")
print(f"==>> labels_names: {labels_names}")

==>> classes_set: {'DoS slowloris', 'Heartbleed', 'PortScan', 'Infiltration', 'FTP-Patator', 'DoS Hulk', 'DDoS', 'Web Attack � Sql Injection', 'Web Attack � XSS', 'Bot', 'SSH-Patator', 'Web Attack � Brute Force', 'DoS Slowhttptest', 'BENIGN', 'DoS GoldenEye'}
==>> num_classes: 15
==>> labels_names: {0: 'BENIGN', 1: 'Bot', 2: 'DDoS', 3: 'DoS GoldenEye', 4: 'DoS Hulk', 5: 'DoS Slowhttptest', 6: 'DoS slowloris', 7: 'FTP-Patator', 8: 'Heartbleed', 9: 'Infiltration', 10: 'PortScan', 11: 'SSH-Patator', 12: 'Web Attack � Brute Force', 13: 'Web Attack � Sql Injection', 14: 'Web Attack � XSS'}


In [90]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np



def read_clients( clients_paths, label_col, class_col, class_num_col, drop_columns, weak_columns):
    test = pd.read_parquet("datasets/unified/test.parquet")

    if multi_class:
        test[label_col] = test[class_num_col]


    test_by_class = {}
    classes = test[class_col].unique()
    for class_value in classes:
        test_class = test[test[class_col] == class_value].copy()
        test_class.drop(drop_columns, axis=1, inplace=True)
        test_class.drop(weak_columns, axis=1, inplace=True)
        test_class.reset_index(drop=True, inplace=True)

        test_class_labels = test_class[label_col].to_numpy()
        test_class = test_class.drop([label_col], axis=1).to_numpy()

        test_by_class[class_value] = (test_class, test_class_labels)

    test.drop(drop_columns, axis=1, inplace=True)
    test.drop(weak_columns, axis=1, inplace=True)
    test.reset_index(drop=True, inplace=True)

    test_labels = test[label_col].to_numpy().copy()
    test = test.drop([label_col], axis=1).to_numpy().copy()
    input_dim = test.shape[1]

    # --- SUBSAMPLE TEST DATA HERE ---
    idx = np.random.choice(len(test), int(0.05 * len(test)), replace=False)
    test = test[idx]
    test_labels = test_labels[idx]

    for class_value in test_by_class.keys():
        class_features, class_labels = test_by_class[class_value]
        idx = np.random.choice(len(class_features), int(0.05 * len(class_features)), replace=False)
        test_by_class[class_value] = (class_features[idx], class_labels[idx])
    # --- END SUBSAMPLING ---

    client_data = []
    for client_path in clients_paths:
        client_data.append(pd.read_parquet(client_path))

    for i in range(len(client_data)):

        cdata = client_data[i]

        if multi_class:
            cdata[label_col] = cdata[class_num_col]
       

        cdata.drop(drop_columns, axis=1, inplace=True)
        cdata.drop(weak_columns, axis=1, inplace=True)
        cdata.reset_index(drop=True, inplace=True)

        # Split into train, validation, and test sets
        c_train, c_test = train_test_split(cdata, test_size=0.1)

        # Split c_train further into c_train and c_val
        c_train, c_val = train_test_split(c_train, test_size=0.2)

        # Extract labels and features for train, validation, and test
        y_train = c_train[label_col].to_numpy()
        x_train = c_train.drop([label_col], axis=1).to_numpy()
        idx = np.random.choice(len(x_train), int(0.05 * len(x_train)), replace=False)
        x_train = x_train[idx]
        y_train = y_train[idx]

        y_val = c_val[label_col].to_numpy()
        x_val = c_val.drop([label_col], axis=1).to_numpy()
        idx = np.random.choice(len(x_val), int(0.05 * len(x_val)), replace=False)
        x_val = x_val[idx]
        y_val = y_val[idx]

        y_test = c_test[label_col].to_numpy()
        x_test = c_test.drop([label_col], axis=1).to_numpy()
        idx = np.random.choice(len(x_test), int(0.05 * len(x_test)), replace=False)
        x_test = x_test[idx]
        y_test = y_test[idx]

        # Store in client_data: (x_train, y_train, x_val, y_val, x_test, y_test)
        client_data[i] = (x_train, y_train, x_val, y_val, x_test, y_test)

    return client_data, test, test_labels, test_by_class, input_dim

# Model (don't forget to use class weighting cuz u didn't balance dataset)

In [91]:
import torch.nn as nn
import torch

class NIDS_CNN(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(NIDS_CNN, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv1d(1, 8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv1d(8, 16, kernel_size=3, padding=1),
            nn.ReLU(),
        )
        # Calculate flattened size
        self.flattened_size = self._calculate_flattened_size(input_dim)
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flattened_size, 16),
            nn.ReLU(),
            nn.Linear(16, num_classes)
        )

    def _calculate_flattened_size(self, input_dim):
        dummy_input = torch.zeros(1, 1, input_dim)
        dummy_out = self.conv_block(dummy_input)
        return dummy_out.numel()

    def forward(self, x):
        if x.dim() == 2:
            x = x.unsqueeze(1)
        x = self.conv_block(x)
        x = self.classifier(x)
        return x


Compute class weights (Client-Level Balancing)

In [92]:
import numpy as np
import pandas as pd
from sklearn.utils.class_weight import compute_class_weight

def compute_client_weights_multiclass(y_local):
    
    # Calculate class weights
    classes = np.unique(y_local)
    weights = compute_class_weight('balanced', 
                                 classes=classes, 
                                 y=y_local)
    
    # Create weight dictionary
    weight_dict = dict(zip(classes, weights))
    
    # Calculate and print class distribution
    class_counts = {cls: np.sum(y_local == cls) for cls in classes}
    total = len(y_local)
    
    print("\nClass Distribution:")
    for cls, count in class_counts.items():
        print(f"  Class {cls}: {count} samples ({count/total:.2%}) | Weight: {weight_dict[cls]:.4f}")
    
    # Print weight dictionary for reference
    print("\nWeight Dictionary:", weight_dict)
    
    return weight_dict

In [93]:
results_final = {}
results_final["baseline"] = {}
results_final["baseline"]["accuracy"] = {}
results_final["baseline"]["f1s"] = {}


results_final["centralities"] = {}
results_final["centralities"]["accuracy"] = {}
results_final["centralities"]["f1s"] = {}
results_final



{'baseline': {'accuracy': {}, 'f1s': {}},
 'centralities': {'accuracy': {}, 'f1s': {}}}

FL Process

In [94]:
import numpy as np
from typing import List
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple



In [95]:
def get_weights(model):
    return [val.cpu().numpy().copy() for val in model.state_dict().values()]


def set_weights(model, parameters):
    params_dict = zip(model.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.from_numpy(np.copy(v)) for k, v in params_dict})
    model.load_state_dict(state_dict, strict=True)

In [96]:
model = NIDS_CNN(input_dim=46, num_classes=15)
weights = get_weights(model)
print("Number of parameters:", len(weights))
for w in weights:
    print(w.shape, w.flags.writeable)

Number of parameters: 8
(8, 1, 3) True
(8,) True
(16, 8, 3) True
(16,) True
(16, 736) True
(16,) True
(15, 16) True
(15,) True


In [97]:
import flwr as fl

class FLClient(fl.client.NumPyClient):
    def __init__(self, x_train, y_train, x_val, y_val, x_test, y_test, input_dim, num_classes):
        # Store data sizes before conversion
        self.train_size = len(x_train)
        self.val_size = len(x_val)        

        self.device = torch.device("cpu")

        # Then create model
        self.model = NIDS_CNN(input_dim, num_classes)
        self.model.to(self.device)

 

        
        # Convert data to CPU tensors with explicit dtype
        self.x_train = torch.tensor(x_train.astype(np.float32), dtype=torch.float32, device=self.device)
        self.y_train = torch.tensor(y_train.astype(np.int64), dtype=torch.long, device=self.device)
        self.x_val = torch.tensor(x_val.astype(np.float32), dtype=torch.float32, device=self.device)
        self.y_val = torch.tensor(y_val.astype(np.int64), dtype=torch.long, device=self.device)
        self.x_test = torch.tensor(x_test.astype(np.float32), dtype=torch.float32, device=self.device)
        self.y_test = torch.tensor(y_test.astype(np.int64), dtype=torch.long, device=self.device)
        
        # Loss function and optimizer
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
        
        # Class weights
        self.class_weights = compute_class_weight('balanced', 
                                                 classes=np.unique(y_train), 
                                                 y=y_train)
        self.class_weights = torch.FloatTensor(self.class_weights).to(self.device)
    def get_parameters(self, config:None):
        return get_weights(self.model)
    

    def set_parameters(self, parameters):
        return set_weights(self.model, parameters)


    def fit(self, parameters, config):
        """Train the model locally."""
        self.set_parameters(parameters)
        
        # Training configuration
        local_epochs = config.get("local_epochs", 1)
        batch_size = config.get("batch_size", 16)
        
        # Set model to training mode
        self.model.train()
        
        # Training loop
        for epoch in range(local_epochs):
            epoch_loss = 0.0
            num_batches = 0
            
            # Mini-batch training
            for i in range(0, len(self.x_train), batch_size):
                # Get batch
                end_idx = min(i + batch_size, len(self.x_train))
                batch_x = self.x_train[i:end_idx]
                batch_y = self.y_train[i:end_idx]
                
                # Ensure data is on CPU
                batch_x = batch_x.to(self.device)
                batch_y = batch_y.to(self.device)
                
                # Zero gradients
                self.optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(True):
                    outputs = self.model(batch_x)
                    loss = self.criterion(outputs, batch_y)
                
                # Backward pass
                loss.backward()
                
                # Update weights
                self.optimizer.step()
                
                # Track loss
                epoch_loss += loss.item()
                num_batches += 1
                
                # Clean up intermediate tensors
                del outputs, loss
        
        # Return parameters and number of training samples
        return self.get_parameters(), self.train_size, {}

    def evaluate(self, parameters, config):
        """Evaluate the model locally."""
        self.set_parameters(parameters)
        self.model.eval()
        
        total_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            # Evaluate in batches to prevent memory issues
            batch_size = config.get("batch_size", 16)
            
            for i in range(0, len(self.x_val), batch_size):
                end_idx = min(i + batch_size, len(self.x_val))
                batch_x = self.x_val[i:end_idx]
                batch_y = self.y_val[i:end_idx]
                
                # Ensure data is on CPU
                batch_x = batch_x.to(self.device)
                batch_y = batch_y.to(self.device)
                
                # Forward pass
                outputs = self.model(batch_x)
                loss = self.criterion(outputs, batch_y)
                
                # Calculate accuracy
                _, predicted = torch.max(outputs.data, 1)
                total += batch_y.size(0)
                correct += (predicted == batch_y).sum().item()
                total_loss += loss.item()
        
        accuracy = correct / total if total > 0 else 0.0
        avg_loss = total_loss / (len(self.x_val) / batch_size) if len(self.x_val) > 0 else 0.0
        
        return avg_loss, self.val_size, {"accuracy": accuracy}

In [98]:
from sklearn.metrics import accuracy_score, f1_score

def get_evaluate_fn(x_test_server, y_test_server, input_dim, num_classes, results, test_by_class):
    device = torch.device("cpu")
    
    def evaluate_fn(server_round: int, parameters, config):
        model = NIDS_CNN(input_dim, num_classes).to(device)
        set_weights(model, parameters)
        
        model.eval()
        with torch.no_grad():
            # Convert to tensors
            x_test_tensor = torch.FloatTensor(x_test_server.copy()).to(device)
            y_test_tensor = torch.LongTensor(y_test_server.copy()).to(device)
            
            # Get predictions
            outputs = model(x_test_tensor)
            loss = nn.CrossEntropyLoss()(outputs, y_test_tensor)
            preds = outputs.argmax(dim=1)
            
            # Calculate accuracy
            accuracy = (preds == y_test_tensor).float().mean()
            
            # Calculate F1 score
            f1 = f1_score(y_test_server, preds.cpu().numpy(), average='weighted')
        
        # Store results
        results["scores"]["accuracy"][server_round] = accuracy.item()
        results["scores"]["f1s"][server_round] = f1
        
        return loss.item(), {"accuracy": accuracy.item(), "f1": f1}
    
    return evaluate_fn

In [99]:
from flwr.common import Context

def generate_client_fn(client_data, input_dim, num_classes):
    def client_fn(context):
        partition_id = context.node_config["partition-id"]
        x_train, y_train, x_val, y_val, x_test, y_test = client_data[partition_id]
        
        client = FLClient(
            x_train, y_train, x_val, y_val, x_test, y_test, 
            input_dim, num_classes
        )
        return client.to_client()
    
    return client_fn

In [100]:
def get_on_fit_config():

    def fit_config_fn(server_round: int):
        return {
            "lr": 0.001,
            "local_epochs": 3,
            "batch_size": 16,
        }

    return fit_config_fn

# BASELINE

In [101]:
client_data, test, test_labels, test_by_class, input_dim = read_clients(clients_paths,dataset.label_col, dataset.class_col, dataset.class_num_col, dataset.drop_columns, dataset.weak_columns)

In [102]:
results = {}  # a dictionary that will contain all the options and results of models
# add all options to the results dictionary, to know what options selected for obtained results
results["configuration"] = "2dt - baseline"
#results["dtime"] = dtime
results["multi_class"] = multi_class
#results["learning_rate"] = learning_rate
results["dataset_name"] = dataset.name
results["num_classes"] = num_classes
results["labels_names"] = labels_names
results["input_dim"] = input_dim

results["scores"] = {}
results["scores"]["server"] = {}
results["scores"]["clients"] = {}
results["scores"]["accuracy"] = {}
results["scores"]["f1s"] = {}

if not multi_class:
    results["scores"]["test_by_class"] = {}
    results["scores"]["test_by_class"]["accuracy"] = {}
    results["scores"]["test_by_class"]["f1s"] = {}
    for k in test_by_class.keys():
        results["scores"]["test_by_class"]["length"] = len(test_by_class[k][0])
        results["scores"]["test_by_class"]["accuracy"][k] = {}   
        results["scores"]["test_by_class"]["f1s"][k] = {}    
        
results

{'configuration': '2dt - baseline',
 'multi_class': True,
 'dataset_name': 'client_0',
 'num_classes': 15,
 'labels_names': {0: 'BENIGN',
  1: 'Bot',
  2: 'DDoS',
  3: 'DoS GoldenEye',
  4: 'DoS Hulk',
  5: 'DoS Slowhttptest',
  6: 'DoS slowloris',
  7: 'FTP-Patator',
  8: 'Heartbleed',
  9: 'Infiltration',
  10: 'PortScan',
  11: 'SSH-Patator',
  12: 'Web Attack � Brute Force',
  13: 'Web Attack � Sql Injection',
  14: 'Web Attack � XSS'},
 'input_dim': 46,
 'scores': {'server': {}, 'clients': {}, 'accuracy': {}, 'f1s': {}}}

In [103]:
from flwr.server import ServerApp, ServerAppComponents, ServerConfig
from flwr.common import ndarrays_to_parameters


# You need to define the 'strategy' variable before this code
# For example:
# from flwr.server.strategy import FedAvg
# strategy = FedAvg(...)
model = NIDS_CNN(input_dim=46, num_classes=num_classes)

def generate_server_fn():
    def server_fn(context: Context):  # Accepts one argument, even if not used
        # Initialize model parameters
        ndarrays = get_weights(model)
        parameters = ndarrays_to_parameters(ndarrays)
        strategy = fl.server.strategy.FedAvg(
            fraction_fit=0.1,
            min_fit_clients=1,
            fraction_evaluate=0.1,  # Enable evaluation
            min_evaluate_clients=1,  # Number of clients to evaluate
            min_available_clients=1,
            evaluate_fn=get_evaluate_fn(test, test_labels, input_dim, num_classes, results, test_by_class),
            on_fit_config_fn=get_on_fit_config(),
            initial_parameters=parameters)

        return ServerAppComponents(
            strategy=strategy,
            config=ServerConfig(num_rounds=5),
        )
    return server_fn  # Return the inner function

server_app = ServerApp(server_fn=generate_server_fn())



In [104]:
from flwr.client import ClientApp
client_app = ClientApp(client_fn = generate_client_fn(client_data, input_dim, num_classes))

In [105]:
import torch
DEVICE = torch.device("cpu")

# Specify the resources each of your clients need
# By default, each client will be allocated 1x CPU and 0x GPUs
from math import floor
import multiprocessing

backend_config = {
    "client_resources": {
        "num_cpus": 1,
        "num_gpus": 0.0,
    },
    "concurrent_workers": 1,  # Start with 1 for debugging
    "ray_init_args": {
        "ignore_reinit_error": True,
        "include_dashboard": False,
        "local_mode": True,
        "runtime_env": {
            "env_vars": {
                "CUDA_VISIBLE_DEVICES": "",
                "OMP_NUM_THREADS": "1",
                "MKL_NUM_THREADS": "1",
                "TORCH_USE_CUDA_DSA": "0",
            }
        }
    }
}

# When running on GPU, assign an entire GPU for each client
#if DEVICE == "cuda":
#    backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 1.0}}
    # Refer to our Flower framework documentation for more details about Flower simulations
    # and how to set up the `backend_config`

In [106]:
RAY_memory_monitor_refresh_ms = 0
RAY_memory_usage_threshold = 0.99

In [107]:
from flwr.simulation import run_simulation

# Run simulation
run_simulation(
    server_app = server_app,
    client_app = client_app,
    num_supernodes = NUM_CLIENTS,
    backend_config = backend_config,


)

[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=5, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Using initial global parameters provided by strategy
[92mINFO [0m:      Starting evaluation of initial global parameters
[92mINFO [0m:      initial parameters (loss, other metrics): 140019.953125, {'accuracy': 0.04199232533574104, 'f1': 0.010957695269191346}
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 1]
[92mINFO [0m:      configure_fit: strategy sampled 1 clients (out of 10)
[36m(ClientAppActor pid=319357)[0m free(): double free detected in tcache 2
[36m(ClientAppActor pid=319357)[0m *** SIGABRT received at time=1756827036 on cpu 6 ***
[36m(ClientAppActor pid=319357)[0m PC: @     0x7312e849eb2c  (unknown)  pthread_kill
[36m(ClientAppActor pid=319357)[0m     @     0x7312e8445330  1518263456  (unknown)
[36m(ClientAppActor pid=319357)[0m     @     0x7312e844527e         32  raise
[36m(ClientAppActor pid=319357

[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffffe18464c9d4dbfba81a2bc1d601000000 Worker ID: d8c48c25d2fcb16efb7c263c471deee9ccfd602111a56ad032dea21a Node ID: cfd81749b1d90ee93f16b0cd2a173d934c67c6d5de63af49fe2e37c5 Worker IP address: 172.19.148.253 Worker port: 46355 Worker PID: 319357 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.


[92mINFO [0m:      aggregate_evaluate: received 0 results and 1 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 1 clients (out of 10)
[91mERROR [0m:     An exception was raised when processing a message by RayBackend
[91mERROR [0m:     The actor died unexpectedly before finishing this task.
	class_name: ClientAppActor
	actor_id: e18464c9d4dbfba81a2bc1d601000000
	pid: 319357
	namespace: 08b6f1cd-1a53-4662-b668-69fdbb3c42a5
	ip: 172.19.148.253
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[91mERROR [0m:     Traceback (most recent call last):
  File "/mnt/c/User

# FL-with-centralities