# Dependências e Imports

Nesta seção, instalamos e importamos as bibliotecas necessárias.
A biblioteca Flower (flwr) é usada para simular o treinamento federado.
Também utilizamos PyTorch para definir e treinar o modelo de Machine Learning.


In [2]:
!pip install -q flwr[simulation] flwr-datasets[vision] torch torchvision matplotlib

In [3]:
# Importações das dependências e setup do dispositivo.
# Aqui, determinamos se o treinamento será em CPU ou GPU.
# Em seguida, imprimimos as versões para checagem.

from collections import OrderedDict
from typing import List, Tuple, Optional, Union
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import flwr
from flwr.client import NumPyClient, ClientApp
from flwr.common import Context, Metrics, Parameters, parameters_to_ndarrays, ndarrays_to_parameters
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg
from flwr.simulation import run_simulation

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")

# Diretório com os dados dos dispositivos
DATA_DIR = "devices_logs/"
BATCH_SIZE = 32

Training on cpu
Flower 1.13.1 / PyTorch 2.5.1+cu121


# Exploração dos dados para treinamento

Nesta seção, carregamos e exploramos os dados do simulador. O objetivo é entender sua distribuição antes de configurar o treinamento federado.
Cada dispositivo (client) terá seu próprio subconjunto de dados, simulando um cenário federado.


In [4]:
# Função para carregar e analisar os dados.
# Isso nos ajuda a entender a distribuição de casos 'ShouldMigrate=True' e 'ShouldMigrate=False'
# antes de montar o esquema de treinamento federado.

def load_data(data_dir):
    data = {}
    for filename in os.listdir(data_dir):
        if filename.endswith(".csv"):
            device_id = int(filename.split("_")[1])
            filepath = os.path.join(data_dir, filename)
            df = pd.read_csv(filepath, delimiter=";")

            if device_id not in data:
                data[device_id] = []

            data[device_id].append(df)

    for device_id in data:
        combined_df = pd.concat(data[device_id], ignore_index=True)
        data[device_id] = combined_df.drop_duplicates(
            subset=["Time", "DeviceId", "Speed", "DistanceToSourceAp",
                    "DistanceToLocalCloudlet", "ShouldMigrate", "IsMigPoint", "IsMigZone"]
        )

    return data

def analyze_data(data):
    for device_id, df in data.items():
        true_cases = df[df['ShouldMigrate'] == True]
        false_cases = df[df['ShouldMigrate'] == False]

        true_count = len(true_cases)
        false_count = len(false_cases)
        total_count = len(df)
        print(f"Device {device_id}:")
        print(f"  Total cases: {total_count}")
        print(f"  True (shouldMigrate = True): {true_count} ({(true_count / total_count) * 100:.2f}%)")
        print(f"  False (shouldMigrate = False): {false_count} ({(false_count / total_count) * 100:.2f}%)")

        print("\n  Examples where ShouldMigrate = True:")
        print(true_cases[['Time', 'DeviceId', 'Speed', 'DistanceToSourceAp',
                          'DistanceToLocalCloudlet', 'ShouldMigrate', 'IsMigPoint', 'IsMigZone']].head(3))
        print("\n  Examples where ShouldMigrate = False:")
        print(false_cases[['Time', 'DeviceId', 'Speed', 'DistanceToSourceAp',
                           'DistanceToLocalCloudlet', 'ShouldMigrate', 'IsMigPoint', 'IsMigZone']].head(3))
        print("\n" + "-"*50)

data = load_data(DATA_DIR)
analyze_data(data)

Device 4:
  Total cases: 138
  True (shouldMigrate = True): 2 (1.45%)
  False (shouldMigrate = False): 136 (98.55%)

  Examples where ShouldMigrate = True:
         Time  DeviceId  Speed  DistanceToSourceAp  DistanceToLocalCloudlet  \
109  150000.0         4     13          953.495674               953.495674   
179  110000.0         4     13          592.190848               592.190848   

     ShouldMigrate  IsMigPoint  IsMigZone  
109           True        True       True  
179           True        True       True  

  Examples where ShouldMigrate = False:
      Time  DeviceId  Speed  DistanceToSourceAp  DistanceToLocalCloudlet  \
0  41000.0         4      0          529.555474               529.555474   
1  42000.0         4      2          529.306150               529.306150   
2  43000.0         4      4          529.395882               529.395882   

   ShouldMigrate  IsMigPoint  IsMigZone  
0          False       False      False  
1          False       False      False  
2 

# Prepara a base de features para treinamento

Nesta etapa, criamos um conjunto de dados artificialmente balanceado e simplificado. O objetivo aqui é comprovar a viabilidade do treinamento federado em um contexto onde os dados do dispositivo (client) seriam usados diretamente em um cenário real. Para simplificar o uso do modelo neste estudo, criamos combinações de valores (IsMigPoint/IsMigZone) e labels fixos.

Essa abordagem permite focar no objetivo principal do projeto: viabilizar a execução de modelos de Machine Learning no simulador **MobFogSim**, com estudos voltados para treinamento federado. O uso de dados artificiais não prejudica a avaliação do treinamento federado nem o objetivo de plugar o modelo no simulador. Em projetos futuros, o modelo pode ser adaptado para usar dados reais e resolver problemas específicos, mantendo a estrutura federada e a integração com o simulador.


In [5]:
def preprocess_data(data, test_size=0.2, random_state=42):
    processed_data = {}

    # Gerando amostras sintéticas balanceadas para IsMigPoint/IsMigZone.
    # Assim cada dispositivo terá dados balanceados para o treinamento federado.
    samples_per_combination = 300
    combinations = [
        ([0, 0], 0),
        ([1, 0], 0),
        ([0, 1], 0),
        ([1, 1], 1)
    ]

    for device_id in data.keys():
        X_list = []
        y_list = []

        for (feat, label) in combinations:
            arr = np.tile(feat, (samples_per_combination, 1))
            X_list.append(arr)
            y_list.append(np.full(samples_per_combination, label))

        X_all = np.vstack(X_list)
        y_all = np.concatenate(y_list)

        X_train, X_val, y_train, y_val = train_test_split(
            X_all, y_all, test_size=test_size, random_state=random_state, stratify=y_all
        )

        train_size = len(y_train)
        val_size = len(y_val)
        total_size = train_size + val_size

        print(f"Device {device_id}:")
        print(f"  Total: {total_size} instâncias")
        print(f"  Treino: {train_size} ({(train_size/total_size)*100:.2f}%)")
        print(f"    True: {(y_train==1).sum()} - False: {(y_train==0).sum()}")
        print(f"  Val: {val_size} ({(val_size/total_size)*100:.2f}%)")
        print(f"    True: {(y_val==1).sum()} - False: {(y_val==0).sum()}")
        print("-"*50)

        processed_data[device_id] = {
            "train": (X_train, y_train),
            "val": (X_val, y_val)
        }

    return processed_data

processed_data = preprocess_data(data)

Device 4:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val: 240 (20.00%)
    True: 60 - False: 180
--------------------------------------------------
Device 7:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val: 240 (20.00%)
    True: 60 - False: 180
--------------------------------------------------
Device 6:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val: 240 (20.00%)
    True: 60 - False: 180
--------------------------------------------------
Device 5:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val: 240 (20.00%)
    True: 60 - False: 180
--------------------------------------------------
Device 0:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val: 240 (20.00%)
    True: 60 - False: 180
--------------------------------------------------
Device 3:
  Total: 1200 instâncias
  Treino: 960 (80.00%)
    True: 240 - False: 720
  Val

# Modelo usado para treinamento

Definimos um modelo PyTorch simples (duas camadas fully-connected), suficiente para demonstrar o conceito de treinamento federado.
No treinamento federado, cada cliente treina esse modelo localmente e o servidor agrega os parâmetros.


In [6]:
class MigrationModel(nn.Module):
    def __init__(self, input_dim=2):
        super(MigrationModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 4)
        self.fc2 = nn.Linear(4, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Client e Server do Flower

Abaixo definimos o cliente e o servidor do Flower:
- O **cliente federado** (FederatedClient) treina o modelo localmente e avalia.
- O **servidor** coordena os rounds de treinamento. Ele envia parâmetros globais, recebe parâmetros locais dos clientes e agrega.

Esta é a lógica principal do treinamento federado: nenhum dado sai do dispositivo (client), apenas parâmetros do modelo.
Ao final de cada rodada, salvamos um checkpoint do modelo global.


In [7]:
### Cliente Federado ###
# O cliente recebe o modelo, dados locais (train_loader, test_loader) e realiza o treinamento local (fit),
# além de avaliar (evaluate). Isso simula um dispositivo com seus próprios dados.
class FederatedClient(NumPyClient):
    def __init__(self, model, train_loader, test_loader, device):
        self.model = model
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.device = device

    def get_parameters(self, config):
        return [val.cpu().numpy() for _, val in self.model.state_dict().items()]

    def fit(self, parameters, config):
        # Carrega parâmetros globais no modelo local e treina.
        params_dict = zip(self.model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
        self.model.load_state_dict(state_dict, strict=True)
        self.train(self.train_loader)
        return [val.cpu().numpy() for _, val in self.model.state_dict().items()], len(self.train_loader.dataset), {}

    def evaluate(self, parameters, config):
        # Carrega parâmetros globais e avalia no conjunto local de validação.
        params_dict = zip(self.model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
        self.model.load_state_dict(state_dict, strict=True)
        loss, accuracy = self.test(self.test_loader)
        return float(loss), len(self.test_loader.dataset), {"accuracy": float(accuracy)}

    def train(self, train_loader):
      # Treinamento local do modelo no dispositivo
      self.model.train()
      criterion = nn.BCEWithLogitsLoss()
      optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)

      # Mais épocas locais para melhor convergência
      for epoch in range(10):
          for features, labels in train_loader:
              features, labels = features.to(self.device), labels.to(self.device).float()
              optimizer.zero_grad()
              outputs = self.model(features).squeeze()
              loss = criterion(outputs, labels)
              loss.backward()
              optimizer.step()

    def test(self, test_loader):
        # Avaliação local
        self.model.eval()
        criterion = nn.BCEWithLogitsLoss()
        correct = 0
        total = 0
        test_loss = 0.0

        with torch.no_grad():
            for features, labels in test_loader:
                features, labels = features.to(self.device), labels.to(self.device).float()
                outputs = self.model(features).squeeze()
                test_loss += criterion(outputs, labels).item()
                preds = (torch.sigmoid(outputs) > 0.5).float()
                correct += (preds == labels).sum().item()
                total += labels.size(0)

        accuracy = correct / total
        return test_loss / len(test_loader), accuracy

### Configuração do Cliente ###
# Cada cliente receberá seu partition_id, seus dados locais, criará o modelo local e retornará um FederatedClient.
def client_fn(context: Context) -> NumPyClient:
    partition_id = context.node_config.get("partition_id", 0)
    train_data = processed_data[partition_id]["train"]
    val_data = processed_data[partition_id]["val"]

    input_dim = train_data[0].shape[1]
    local_model = MigrationModel(input_dim=input_dim).to(DEVICE)

    X_train, y_train = train_data
    X_val, y_val = val_data

    # Criação dos DataLoaders para treino e validação locais
    train_dataset = list(zip(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)))
    val_dataset = list(zip(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32)))

    train_loader = DataLoader(
        train_dataset,
        batch_size=32,
        shuffle=True
    )

    test_loader = DataLoader(
        val_dataset,
        batch_size=32,
        shuffle=False
    )

    return FederatedClient(local_model, train_loader, test_loader, DEVICE)

### Estratégia com Salvamento ###
# A estratégia FedAvg agrega os parâmetros dos clientes a cada rodada.
# Ao final de cada rodada, salvamos o modelo global em um checkpoint.
class SaveModelStrategy(FedAvg):
    def __init__(self, model, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.model = model

    def aggregate_fit(
        self,
        server_round: int,
        results: List[Tuple[flwr.server.client_proxy.ClientProxy, flwr.common.FitRes]],
        failures: List[Union[Tuple[flwr.server.client_proxy.ClientProxy, flwr.common.FitRes], BaseException]],
    ) -> Tuple[Optional[Parameters], dict]:
        aggregated_parameters, aggregated_metrics = super().aggregate_fit(server_round, results, failures)

        if aggregated_parameters is not None:
            # Carrega parâmetros agregados no modelo global
            aggregated_ndarrays = parameters_to_ndarrays(aggregated_parameters)
            params_dict = zip(self.model.state_dict().keys(), aggregated_ndarrays)
            state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict})
            self.model.load_state_dict(state_dict, strict=True)

            # Salva o modelo global da rodada
            checkpoint_filename = f"model_round_{server_round}.pth"
            torch.save({
                'input_dim': self.model.fc1.in_features,
                'model_state_dict': self.model.state_dict()
            }, checkpoint_filename)
            print(f"Model checkpoint saved: {checkpoint_filename}")

        return aggregated_parameters, aggregated_metrics

### Configuração do Servidor ###
# O servidor orquestra o treinamento federado.
# Ele inicia o treinamento com o modelo global e configura a estratégia.
def server_fn(context: Context) -> ServerAppComponents:
    first_device = list(processed_data.keys())[0]
    input_dim = processed_data[first_device]["train"][0].shape[1]

    model = MigrationModel(input_dim=input_dim).to(DEVICE)
    num_devices = len(processed_data)

    strategy = SaveModelStrategy(
        model=model,
        fraction_fit=1.0,
        fraction_evaluate=1.0,
        min_fit_clients=num_devices,
        min_evaluate_clients=num_devices,
        min_available_clients=num_devices,
    )
    server_config = ServerConfig(num_rounds=10)
    return ServerAppComponents(strategy=strategy, config=server_config)


# Execução do treinamento federado

Aqui chamamos `run_simulation` para simular o treinamento federado com o número de dispositivos igual ao número de entradas em `processed_data`.
O server e o client são definidos pelas funções `server_fn` e `client_fn`.

Ao final, teremos um modelo global treinado de forma federada, com pesos agregados das atualizações locais dos clients.


In [8]:
run_simulation(
    server_app=ServerApp(server_fn=server_fn),
    client_app=ClientApp(client_fn=client_fn),
    num_supernodes=len(processed_data),
    backend_config={"client_resources": {"num_cpus": 1, "num_gpus": 0.0}},
)

DEBUG:flwr:Asyncio event loop already running.
[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=10, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[36m(pid=2052)[0m 2024-12-08 19:13:06.715555: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(pid=2052)[0m 2024-12-08 19:13:06.755932: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(pid=2052)[0m 2024-12-08 19:13:06.768640: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[92mINFO [0m:      Received initial paramete

Model checkpoint saved: model_round_1.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_2.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 3]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_3.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 4]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_4.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 5]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_5.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 6]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_6.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 7]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_7.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 8]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_8.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 9]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_9.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 10]
[92mINFO [0m:      configure_fit: strategy sampled 9 clients (out of 9)
[92mINFO [0m:      aggregate_fit: received 9 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 9 clients (out of 9)


Model checkpoint saved: model_round_10.pth


[92mINFO [0m:      aggregate_evaluate: received 9 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [SUMMARY]
[92mINFO [0m:      Run finished 10 round(s) in 49.98s
[92mINFO [0m:      	History (loss, distributed):
[92mINFO [0m:      		round 1: 0.5248469896614552
[92mINFO [0m:      		round 2: 0.40321817621588707
[92mINFO [0m:      		round 3: 0.3104447778314352
[92mINFO [0m:      		round 4: 0.24423078447580338
[92mINFO [0m:      		round 5: 0.19025187194347382
[92mINFO [0m:      		round 6: 0.14682694617658854
[92mINFO [0m:      		round 7: 0.11237869411706924
[92mINFO [0m:      		round 8: 0.08583956956863403
[92mINFO [0m:      		round 9: 0.06537487404420972
[92mINFO [0m:      		round 10: 0.04976273118518293
[92mINFO [0m:      


# Validação do modelo salvo

Após o treinamento, carregamos o modelo salvo em `model_round_10.pth` e testamos alguns casos.
Isso demonstra que o modelo global treinado federadamente aprendeu a lógica desejada.


In [11]:
import torch

checkpoint = torch.load("model_round_10.pth", map_location=DEVICE)
input_dim = checkpoint['input_dim']
model = MigrationModel(input_dim=input_dim).to(DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

test_cases = [
    [1, 1], # Deve ser True
    [0, 0], # Deve ser False
    [1, 0], # Deve ser False
    [0, 1], # Deve ser False
]

test_tensor = torch.tensor(test_cases, dtype=torch.float32).to(DEVICE)

with torch.no_grad():
    outputs = model(test_tensor).squeeze()
    probabilities = torch.sigmoid(outputs)
    preds = (probabilities > 0.6).float()

for i, (case, pred, prob) in enumerate(zip(test_cases, preds.cpu().numpy(), probabilities.cpu().numpy())):
    print(f"Caso {i+1}: Inputs={case}")
    print(f"  Predição: {'True' if pred == 1.0 else 'False'} (prob: {prob:.4f})\n")

Caso 1: Inputs=[1, 1]
  Predição: True (prob: 0.8301)

Caso 2: Inputs=[0, 0]
  Predição: False (prob: 0.0000)

Caso 3: Inputs=[1, 0]
  Predição: False (prob: 0.0142)

Caso 4: Inputs=[0, 1]
  Predição: False (prob: 0.0067)



  checkpoint = torch.load("model_round_10.pth", map_location=DEVICE)
