<a href="https://colab.research.google.com/github/thangthao22/python-training/blob/main/FL_phi_m%C3%A1y_ch%E1%BB%A7_d%E1%BB%B1_%C4%91o%C3%A1n_giao_th%C3%B4ng.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Phase 1: chuẩn bị**

Cell 1: Import Libraries và Setup

In [1]:
import os
import sys
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import logging
import time
import warnings
warnings.filterwarnings('ignore')

# Kiểm tra GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Tạo thư mục cần thiết
for dir_name in ['logs', 'checkpoints', 'results']:
    os.makedirs(f'/content/{dir_name}', exist_ok=True)

print("Current time:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("Current user:", os.getenv('USER'))  # Trong Colab thường là None

Using device: cpu
Current time: 2025-05-02 10:05:56
Current user: None


Cell 2: Logger Setup



In [2]:
def setup_logger(log_dir):
    """
    Khởi tạo logger cho việc tracking quá trình training
    """
    logger = logging.getLogger('AGCRN')
    logger.setLevel(logging.INFO)

    # File handler
    log_filename = os.path.join(log_dir, f'training_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    fh = logging.FileHandler(log_filename)
    fh.setLevel(logging.INFO)

    # Console handler
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)

    # Formatter
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)

    return logger

logger = setup_logger('/content/logs')
logger.info("Starting AGCRN training process...")

2025-05-02 10:06:01,748 - INFO - Starting AGCRN training process...
INFO:AGCRN:Starting AGCRN training process...


Cell 3: Data Loading và Processing

In [3]:
class DataLoader:
    def __init__(self, data_path):
        self.data_path = data_path

    def load_metr_la(self):
        """
        Load dữ liệu METR-LA từ file csv
        """
        try:
            # Load traffic data
            df = pd.read_csv(os.path.join(self.data_path, 'metr-la.csv'), index_col=0)
            df.index = pd.to_datetime(df.index)

            # Load sensor locations
            sensor_df = pd.read_csv(os.path.join(self.data_path, 'sensor_locations.csv'))

            logger.info(f"Data loaded successfully!")
            logger.info(f"Traffic data shape: {df.shape}")
            logger.info(f"Number of sensors: {len(sensor_df)}")
            logger.info(f"Time range: from {df.index[0]} to {df.index[-1]}")

            return df, sensor_df

        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise

    def preprocess_data(self, df):
        """
        Tiền xử lý dữ liệu
        """
        # Convert to numpy array
        data = df.values

        # Add channel dimension
        data = np.expand_dims(data, axis=-1)

        # Normalize data
        mean = np.mean(data)
        std = np.std(data)
        data_normalized = (data - mean) / std

        # Create scaler dictionary for later use
        scaler = {
            'mean': mean,
            'std': std
        }

        logger.info(f"Data preprocessed:")
        logger.info(f"Mean: {mean:.4f}")
        logger.info(f"Std: {std:.4f}")

        return data_normalized, scaler

    def train_test_split(self, data, val_ratio=0.1, test_ratio=0.2):
        """
        Chia dữ liệu thành train, validation và test
        """
        num_samples = data.shape[0]
        num_test = int(num_samples * test_ratio)
        num_val = int(num_samples * val_ratio)
        num_train = num_samples - num_test - num_val

        train_data = data[:num_train]
        val_data = data[num_train:num_train+num_val]
        test_data = data[num_train+num_val:]

        logger.info("Data split completed:")
        logger.info(f"Training set: {train_data.shape}")
        logger.info(f"Validation set: {val_data.shape}")
        logger.info(f"Test set: {test_data.shape}")

        return train_data, val_data, test_data

# Khởi tạo DataLoader và load dữ liệu
data_loader = DataLoader('/content/data')
traffic_df, sensor_df = data_loader.load_metr_la()
data_normalized, scaler = data_loader.preprocess_data(traffic_df)
train_data, val_data, test_data = data_loader.train_test_split(data_normalized)

2025-05-02 10:06:06,134 - INFO - Data loaded successfully!
INFO:AGCRN:Data loaded successfully!
2025-05-02 10:06:06,137 - INFO - Traffic data shape: (34272, 207)
INFO:AGCRN:Traffic data shape: (34272, 207)
2025-05-02 10:06:06,139 - INFO - Number of sensors: 207
INFO:AGCRN:Number of sensors: 207
2025-05-02 10:06:06,142 - INFO - Time range: from 2012-03-01 00:00:00 to 2012-06-27 23:55:00
INFO:AGCRN:Time range: from 2012-03-01 00:00:00 to 2012-06-27 23:55:00
2025-05-02 10:06:06,236 - INFO - Data preprocessed:
INFO:AGCRN:Data preprocessed:
2025-05-02 10:06:06,239 - INFO - Mean: 53.7190
INFO:AGCRN:Mean: 53.7190
2025-05-02 10:06:06,242 - INFO - Std: 20.2614
INFO:AGCRN:Std: 20.2614
2025-05-02 10:06:06,245 - INFO - Data split completed:
INFO:AGCRN:Data split completed:
2025-05-02 10:06:06,247 - INFO - Training set: (23991, 207, 1)
INFO:AGCRN:Training set: (23991, 207, 1)
2025-05-02 10:06:06,250 - INFO - Validation set: (3427, 207, 1)
INFO:AGCRN:Validation set: (3427, 207, 1)
2025-05-02 10:06:0

Cell 4: Dataset và Batch Generator

In [5]:
class TrafficDataset(torch.utils.data.Dataset):
    def __init__(self, data, seq_len, horizon, num_nodes):
        """
        Args:
            data: numpy array shape (time_steps, num_nodes, features)
            seq_len: input sequence length
            horizon: prediction horizon
            num_nodes: number of nodes/sensors
        """
        self.data = torch.FloatTensor(data)
        self.seq_len = seq_len
        self.horizon = horizon
        self.num_nodes = num_nodes
        self.samples = self._generate_samples()

    def _generate_samples(self):
        """
        Generate samples for training/testing
        """
        num_samples = len(self.data) - self.seq_len - self.horizon + 1
        samples = []
        for i in range(num_samples):
            x = self.data[i:i+self.seq_len]
            y = self.data[i+self.seq_len:i+self.seq_len+self.horizon]
            samples.append((x, y))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]

def create_data_loaders(train_data, val_data, test_data, batch_size, seq_len, horizon, num_nodes):
    """
    Create DataLoader objects for train, validation and test sets
    """
    train_dataset = TrafficDataset(train_data, seq_len, horizon, num_nodes)
    val_dataset = TrafficDataset(val_data, seq_len, horizon, num_nodes)
    test_dataset = TrafficDataset(test_data, seq_len, horizon, num_nodes)

    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=0,  # Colab recommendation
        pin_memory=True if torch.cuda.is_available() else False
    )

    val_loader = torch.utils.data.DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True if torch.cuda.is_available() else False
    )

    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=0,
        pin_memory=True if torch.cuda.is_available() else False
    )

    logger.info("DataLoaders created:")
    logger.info(f"Training batches: {len(train_loader)}")
    logger.info(f"Validation batches: {len(val_loader)}")
    logger.info(f"Test batches: {len(test_loader)}")

    return train_loader, val_loader, test_loader

# Create dataloaders
config = {
    'batch_size': 64,
    'seq_len': 12,
    'horizon': 12,
    'num_nodes': traffic_df.shape[1]
}

train_loader, val_loader, test_loader = create_data_loaders(
    train_data, val_data, test_data,
    config['batch_size'], config['seq_len'],
    config['horizon'], config['num_nodes']
)

2025-05-02 10:06:17,211 - INFO - DataLoaders created:
INFO:AGCRN:DataLoaders created:
2025-05-02 10:06:17,213 - INFO - Training batches: 375
INFO:AGCRN:Training batches: 375
2025-05-02 10:06:17,215 - INFO - Validation batches: 54
INFO:AGCRN:Validation batches: 54
2025-05-02 10:06:17,216 - INFO - Test batches: 107
INFO:AGCRN:Test batches: 107


Cell 5: Visualization Functions

In [6]:
class Visualizer:
    def __init__(self, save_dir='/content/results'):
        self.save_dir = save_dir
        os.makedirs(save_dir, exist_ok=True)

    def plot_training_progress(self, train_losses, val_losses):
        """
        Plot training và validation losses
        """
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses, label='Training Loss', marker='o')
        plt.plot(val_losses, label='Validation Loss', marker='s')
        plt.title('Training Progress')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        # Save plot
        plt.savefig(os.path.join(self.save_dir, 'training_progress.png'))
        plt.close()

    def plot_prediction_comparison(self, predictions, targets, sensor_id=0, num_steps=100):
        """
        Plot so sánh giữa giá trị dự đoán và thực tế
        """
        plt.figure(figsize=(15, 6))

        # Plot actual values
        plt.plot(targets[:num_steps, sensor_id, 0],
                 label='Actual', marker='o', markersize=2)

        # Plot predictions
        plt.plot(predictions[:num_steps, sensor_id, 0],
                 label='Predicted', marker='s', markersize=2)

        plt.title(f'Traffic Flow Predictions vs Actual (Sensor {sensor_id})')
        plt.xlabel('Time Steps')
        plt.ylabel('Traffic Flow')
        plt.legend()
        plt.grid(True)

        # Save plot
        plt.savefig(os.path.join(self.save_dir, f'prediction_comparison_sensor_{sensor_id}.png'))
        plt.close()

    def plot_spatial_heatmap(self, correlation_matrix, sensor_ids):
        """
        Plot heatmap của spatial correlations
        """
        plt.figure(figsize=(12, 10))
        sns.heatmap(correlation_matrix,
                    xticklabels=sensor_ids,
                    yticklabels=sensor_ids,
                    cmap='RdYlBu_r')
        plt.title('Spatial Correlations between Sensors')

        # Save plot
        plt.savefig(os.path.join(self.save_dir, 'spatial_correlations.png'))
        plt.close()

# Initialize visualizer
visualizer = Visualizer()

# Example usage:
# visualizer.plot_training_progress(train_losses, val_losses)
# visualizer.plot_prediction_comparison(predictions, targets, sensor_id=0)
# visualizer.plot_spatial_heatmap(correlation_matrix, sensor_ids)

Cell 6: Metrics

In [7]:
class Metrics:
    @staticmethod
    def mae(pred, true):
        """Mean Absolute Error"""
        return np.mean(np.abs(pred - true))

    @staticmethod
    def rmse(pred, true):
        """Root Mean Square Error"""
        return np.sqrt(np.mean((pred - true) ** 2))

    @staticmethod
    def mape(pred, true):
        """Mean Absolute Percentage Error"""
        mask = true != 0
        return np.mean(np.abs((true[mask] - pred[mask]) / true[mask])) * 100

    @staticmethod
    def evaluate(pred, true):
        """Calculate all metrics"""
        mae = Metrics.mae(pred, true)
        rmse = Metrics.rmse(pred, true)
        mape = Metrics.mape(pred, true)

        return {
            'MAE': mae,
            'RMSE': rmse,
            'MAPE': mape
        }

# Example usage:
# metrics = Metrics.evaluate(predictions, targets)
# logger.info("Test Results:")
# for metric_name, value in metrics.items():
#     logger.info(f"{metric_name}: {value:.4f}")

Cell 7: AGCRN Model Components

In [8]:
class AVWGCN(nn.Module):
    def __init__(self, dim_in, dim_out, cheb_k, embed_dim):
        super(AVWGCN, self).__init__()
        self.cheb_k = cheb_k
        self.weights_pool = nn.Parameter(torch.FloatTensor(embed_dim, cheb_k, dim_in, dim_out))
        self.bias_pool = nn.Parameter(torch.FloatTensor(embed_dim, dim_out))
        nn.init.xavier_uniform_(self.weights_pool)
        nn.init.uniform_(self.bias_pool)
        logger.info(f"AVWGCN initialized with dim_in={dim_in}, dim_out={dim_out}, cheb_k={cheb_k}")

    def forward(self, x, node_embeddings):
        node_num = node_embeddings.shape[0]
        supports = F.softmax(F.relu(torch.mm(node_embeddings, node_embeddings.transpose(0, 1))), dim=1)
        support_set = [torch.eye(node_num).to(supports.device), supports]

        for k in range(2, self.cheb_k):
            support_set.append(torch.matmul(2 * supports, support_set[-1]) - support_set[-2])
        supports = torch.stack(support_set, dim=0)

        weights = torch.einsum('nd,dkio->nkio', node_embeddings, self.weights_pool)
        bias = torch.matmul(node_embeddings, self.bias_pool)
        x_g = torch.einsum("knm,bmc->bknc", supports, x)
        x_g = x_g.permute(0, 2, 1, 3)
        x_gconv = torch.einsum('bnki,nkio->bno', x_g, weights) + bias
        return x_gconv

class AGCRNCell(nn.Module):
    def __init__(self, node_num, dim_in, dim_out, cheb_k, embed_dim):
        super(AGCRNCell, self).__init__()
        self.node_num = node_num
        self.hidden_dim = dim_out
        self.gate = AVWGCN(dim_in+self.hidden_dim, 2*dim_out, cheb_k, embed_dim)
        self.update = AVWGCN(dim_in+self.hidden_dim, dim_out, cheb_k, embed_dim)
        logger.info(f"AGCRNCell initialized with node_num={node_num}, dim_in={dim_in}, dim_out={dim_out}")

    def forward(self, x, state, node_embeddings):
        state = state.to(x.device)
        input_and_state = torch.cat((x, state), dim=-1)
        z_r = torch.sigmoid(self.gate(input_and_state, node_embeddings))
        z, r = torch.split(z_r, self.hidden_dim, dim=-1)
        candidate = torch.cat((x, z*state), dim=-1)
        hc = torch.tanh(self.update(candidate, node_embeddings))
        h = r*state + (1-r)*hc
        return h

    def init_hidden_state(self, batch_size):
        return torch.zeros(batch_size, self.node_num, self.hidden_dim)

Cell 8: Main AGCRN Model

In [9]:
class AGCRN(nn.Module):
    def __init__(self, num_nodes, input_dim, hidden_dim, output_dim, horizon, num_layers, cheb_k, embed_dim):
        super(AGCRN, self).__init__()
        self.num_nodes = num_nodes
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.horizon = horizon
        self.num_layers = num_layers

        self.node_embeddings = nn.Parameter(torch.randn(self.num_nodes, embed_dim), requires_grad=True)

        self.encoder = nn.ModuleList()
        self.encoder.append(AGCRNCell(num_nodes, input_dim, hidden_dim, cheb_k, embed_dim))
        for _ in range(1, num_layers):
            self.encoder.append(AGCRNCell(num_nodes, hidden_dim, hidden_dim, cheb_k, embed_dim))

        self.end_conv = nn.Conv2d(1, horizon * output_dim, kernel_size=(1, hidden_dim), bias=True)
        logger.info(f"AGCRN initialized with {num_layers} layers")

    def forward(self, source, target=None):
        init_state = self.init_hidden(source.shape[0])
        output, _ = self.encoder_forward(source, init_state)
        output = output[:, -1:, :, :]
        output = self.end_conv(output)
        output = output.squeeze(-1).reshape(-1, self.horizon, self.output_dim, self.num_nodes)
        output = output.permute(0, 1, 3, 2)
        return output

    def init_hidden(self, batch_size):
        init_states = []
        for i in range(self.num_layers):
            init_states.append(self.encoder[i].init_hidden_state(batch_size))
        return torch.stack(init_states, dim=0)

    def encoder_forward(self, input_data, init_state):
        seq_length = input_data.shape[1]
        current_inputs = input_data
        output_hidden = []

        for i in range(self.num_layers):
            state = init_state[i]
            inner_states = []
            for t in range(seq_length):
                state = self.encoder[i](current_inputs[:, t, :, :], state, self.node_embeddings)
                inner_states.append(state)
            output_hidden.append(state)
            current_inputs = torch.stack(inner_states, dim=1)

        return current_inputs, output_hidden

Cell 9: Model Configuration và Training Setup

In [10]:
class ModelConfig:
    def __init__(self):
        # Model Parameters
        self.num_nodes = 207  # METR-LA dataset
        self.input_dim = 1
        self.output_dim = 1
        self.hidden_dim = 64
        self.embed_dim = 10
        self.num_layers = 2
        self.cheb_k = 2
        self.horizon = 12
        self.seq_len = 12

        # Training Parameters
        self.batch_size = 64
        self.epochs = 100
        self.learning_rate = 0.001
        self.weight_decay = 0.0001
        self.early_stop_patience = 10
        self.grad_norm = True
        self.max_grad_norm = 5
        self.use_gpu = torch.cuda.is_available()

        # Logging
        self.log_step = 20

        logger.info("Model configuration initialized")
        self.log_config()

    def log_config(self):
        logger.info("\nModel Configuration:")
        for attr, value in vars(self).items():
            logger.info(f"{attr}: {value}")

config = ModelConfig()

2025-05-02 10:06:30,744 - INFO - Model configuration initialized
INFO:AGCRN:Model configuration initialized
2025-05-02 10:06:30,746 - INFO - 
Model Configuration:
INFO:AGCRN:
Model Configuration:
2025-05-02 10:06:30,748 - INFO - num_nodes: 207
INFO:AGCRN:num_nodes: 207
2025-05-02 10:06:30,750 - INFO - input_dim: 1
INFO:AGCRN:input_dim: 1
2025-05-02 10:06:30,754 - INFO - output_dim: 1
INFO:AGCRN:output_dim: 1
2025-05-02 10:06:30,756 - INFO - hidden_dim: 64
INFO:AGCRN:hidden_dim: 64
2025-05-02 10:06:30,758 - INFO - embed_dim: 10
INFO:AGCRN:embed_dim: 10
2025-05-02 10:06:30,759 - INFO - num_layers: 2
INFO:AGCRN:num_layers: 2
2025-05-02 10:06:30,762 - INFO - cheb_k: 2
INFO:AGCRN:cheb_k: 2
2025-05-02 10:06:30,763 - INFO - horizon: 12
INFO:AGCRN:horizon: 12
2025-05-02 10:06:30,764 - INFO - seq_len: 12
INFO:AGCRN:seq_len: 12
2025-05-02 10:06:30,765 - INFO - batch_size: 64
INFO:AGCRN:batch_size: 64
2025-05-02 10:06:30,766 - INFO - epochs: 100
INFO:AGCRN:epochs: 100
2025-05-02 10:06:30,767 - IN

Cell 10: Training Functions

In [11]:
class Trainer:
    def __init__(self, model, train_loader, val_loader, test_loader, config, device):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.config = config
        self.device = device

        self.optimizer = torch.optim.Adam(model.parameters(),
                                        lr=config.learning_rate,
                                        weight_decay=config.weight_decay)
        self.criterion = nn.MSELoss()
        self.scaler = torch.cuda.amp.GradScaler()

        self.best_val_loss = float('inf')
        self.not_improved_count = 0

        logger.info("Trainer initialized")

    def train_epoch(self, epoch):
        self.model.train()
        total_loss = 0
        for batch_idx, (data, target) in enumerate(self.train_loader):
            data, target = data.to(self.device), target.to(self.device)

            self.optimizer.zero_grad()
            with torch.cuda.amp.autocast():
                output = self.model(data)
                loss = self.criterion(output, target)

            # Backward and optimize
            self.scaler.scale(loss).backward()
            if self.config.grad_norm:
                self.scaler.unscale_(self.optimizer)
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.max_grad_norm)
            self.scaler.step(self.optimizer)
            self.scaler.update()

            total_loss += loss.item()

            if batch_idx % self.config.log_step == 0:
                logger.info(f'Train Epoch: {epoch} [{batch_idx}/{len(self.train_loader)} '
                          f'({100. * batch_idx / len(self.train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

        return total_loss / len(self.train_loader)

    def validate(self, epoch):
        self.model.eval()
        total_loss = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(self.val_loader):
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                loss = self.criterion(output, target)
                total_loss += loss.item()

        val_loss = total_loss / len(self.val_loader)
        logger.info(f'Validation Epoch: {epoch}\tLoss: {val_loss:.6f}')
        return val_loss

    def train(self):
        train_losses = []
        val_losses = []

        for epoch in range(1, self.config.epochs + 1):
            train_loss = self.train_epoch(epoch)
            val_loss = self.validate(epoch)

            train_losses.append(train_loss)
            val_losses.append(val_loss)

            # Early stopping
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                torch.save(self.model.state_dict(), '/content/checkpoints/best_model.pth')
                self.not_improved_count = 0
            else:
                self.not_improved_count += 1
                if self.not_improved_count == self.config.early_stop_patience:
                    logger.info(f"Early stopping triggered after {epoch} epochs")
                    break

        return train_losses, val_losses

Cell 11: Model Testing và Initial Run

In [12]:
def test_model():
    logger.info("Initializing model test...")

    # Initialize model
    model = AGCRN(
        num_nodes=config.num_nodes,
        input_dim=config.input_dim,
        hidden_dim=config.hidden_dim,
        output_dim=config.output_dim,
        horizon=config.horizon,
        num_layers=config.num_layers,
        cheb_k=config.cheb_k,
        embed_dim=config.embed_dim
    ).to(device)

    # Initialize trainer
    trainer = Trainer(model, train_loader, val_loader, test_loader, config, device)

    # Train model for a few epochs as test
    logger.info("Starting test training...")
    train_losses, val_losses = trainer.train()

    # Plot results
    visualizer.plot_training_progress(train_losses, val_losses)

    # Test prediction on a sample
    model.eval()
    with torch.no_grad():
        sample_data, sample_target = next(iter(test_loader))
        sample_data = sample_data.to(device)
        predictions = model(sample_data)
        predictions = predictions.cpu().numpy()
        sample_target = sample_target.numpy()

    # Plot sample predictions
    visualizer.plot_prediction_comparison(predictions, sample_target)

    return model, train_losses, val_losses

if __name__ == "__main__":
    logger.info("Starting model test...")
    model, train_losses, val_losses = test_model()
    logger.info("Model test completed!")

2025-05-02 10:06:50,579 - INFO - Starting model test...
INFO:AGCRN:Starting model test...
2025-05-02 10:06:50,581 - INFO - Initializing model test...
INFO:AGCRN:Initializing model test...
2025-05-02 10:06:50,597 - INFO - AVWGCN initialized with dim_in=65, dim_out=128, cheb_k=2
INFO:AGCRN:AVWGCN initialized with dim_in=65, dim_out=128, cheb_k=2
2025-05-02 10:06:50,601 - INFO - AVWGCN initialized with dim_in=65, dim_out=64, cheb_k=2
INFO:AGCRN:AVWGCN initialized with dim_in=65, dim_out=64, cheb_k=2
2025-05-02 10:06:50,603 - INFO - AGCRNCell initialized with node_num=207, dim_in=1, dim_out=64
INFO:AGCRN:AGCRNCell initialized with node_num=207, dim_in=1, dim_out=64
2025-05-02 10:06:50,609 - INFO - AVWGCN initialized with dim_in=128, dim_out=128, cheb_k=2
INFO:AGCRN:AVWGCN initialized with dim_in=128, dim_out=128, cheb_k=2
2025-05-02 10:06:50,613 - INFO - AVWGCN initialized with dim_in=128, dim_out=64, cheb_k=2
INFO:AGCRN:AVWGCN initialized with dim_in=128, dim_out=64, cheb_k=2
2025-05-02 1

KeyboardInterrupt: 

Hoàn thành phần 1
|
|
|
|
|
|
v
Phần 2: Data Pipeline


Cell 12: Enhace Data Processing

In [13]:
class DataProcessor:
    def __init__(self, data_path):
        self.data_path = data_path
        logger.info(f"Initializing DataProcessor with path: {data_path}")

    def load_and_preprocess(self):
        """
        Load và xử lý nâng cao cho dữ liệu METR-LA
        """
        try:
            # Load data
            df = pd.read_csv(os.path.join(self.data_path, 'metr-la.csv'), index_col=0)
            df.index = pd.to_datetime(df.index)
            sensor_df = pd.read_csv(os.path.join(self.data_path, 'sensor_locations.csv'))

            # Fill missing values
            df = self._handle_missing_values(df)

            # Remove outliers
            df = self._remove_outliers(df)

            # Add time features
            df = self._add_time_features(df)

            logger.info(f"Data processed successfully:")
            logger.info(f"Shape: {df.shape}")
            logger.info(f"Time range: {df.index[0]} to {df.index[-1]}")

            return df, sensor_df

        except Exception as e:
            logger.error(f"Error in data processing: {e}")
            raise

    def _handle_missing_values(self, df):
        """
        Xử lý missing values với các phương pháp nâng cao
        """
        # Calculate missing value statistics
        missing_stats = df.isnull().sum()
        logger.info(f"Missing values before processing:\n{missing_stats}")

        # Linear interpolation for small gaps
        df = df.interpolate(method='linear', limit=6)

        # Forward fill for remaining gaps
        df = df.fillna(method='ffill', limit=24)

        # Backward fill for any remaining values
        df = df.fillna(method='bfill')

        logger.info("Missing values handled")
        return df

    def _remove_outliers(self, df):
        """
        Loại bỏ outliers using IQR method
        """
        Q1 = df.quantile(0.25)
        Q3 = df.quantile(0.75)
        IQR = Q3 - Q1

        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        # Replace outliers with boundary values
        df = df.clip(lower=lower_bound, upper=upper_bound, axis=1)

        logger.info("Outliers removed")
        return df

    def _add_time_features(self, df):
        """
        Thêm time-based features
        """
        df['hour'] = df.index.hour
        df['day_of_week'] = df.index.dayofweek
        df['month'] = df.index.month
        df['is_weekend'] = df.index.dayofweek.isin([5, 6]).astype(int)

        logger.info("Time features added")
        return df

Cell 13: Enhanced DataLoader với Data Augmentation

In [14]:
from sklearn.preprocessing import StandardScaler
import numpy as np
import torch
from torch.utils.data import DataLoader
import pandas as pd
import matplotlib.pyplot as plt

class EnhancedTrafficDataset(torch.utils.data.Dataset):
    def __init__(self, data, seq_len, horizon, num_nodes, scaler, augment=False):
        """
        Args:
            data: pandas DataFrame
            seq_len: input sequence length
            horizon: prediction horizon
            num_nodes: number of nodes/sensors
            scaler: StandardScaler object
            augment: whether to apply data augmentation
        """
        self.data = data
        self.seq_len = seq_len
        self.horizon = horizon
        self.num_nodes = num_nodes
        self.scaler = scaler
        self.augment = augment
        self.samples = self._generate_samples()

    def _generate_samples(self):
        """
        Generate samples for training/testing
        """
        num_samples = len(self.data) - self.seq_len - self.horizon + 1
        samples = []
        for i in range(num_samples):
            x = self.data.iloc[i:i+self.seq_len].values
            y = self.data.iloc[i+self.seq_len:i+self.seq_len+self.horizon].values

            # Apply scaling
            x = self.scaler.transform(x)
            y = self.scaler.transform(y)

            # Apply augmentation if enabled
            if self.augment:
                x, y = self._augment_data(x, y)

            # Convert to tensors
            x = torch.FloatTensor(x)
            y = torch.FloatTensor(y)

            samples.append((x, y))
        return samples

    def _augment_data(self, x, y):
        """
        Apply data augmentation (e.g., random noise, scaling)
        """
        # Example: Add random noise
        noise_factor = 0.05
        x += np.random.normal(scale=noise_factor, size=x.shape)
        y += np.random.normal(scale=noise_factor, size=y.shape)
        return x, y

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]


class DataPipeline:
    def __init__(self, config):
        self.config = config
        self.processor = DataProcessor(config.data_path)

    def create_data_loaders(self):
        """
        Create enhanced data loaders with augmentation
        """
        # Load and process data
        df, sensor_df = self.processor.load_and_preprocess()

        # Create scaler
        scaler = StandardScaler()
        scaler.fit(df.values)

        # Split data
        train_data, val_data, test_data = self._split_data(df)

        # Create datasets
        train_dataset = EnhancedTrafficDataset(
            train_data,
            self.config.seq_len,
            self.config.horizon,
            self.config.num_nodes,
            scaler=scaler,
            augment=True
        )

        val_dataset = EnhancedTrafficDataset(
            val_data,
            self.config.seq_len,
            self.config.horizon,
            self.config.num_nodes,
            scaler=scaler,
            augment=False
        )

        test_dataset = EnhancedTrafficDataset(
            test_data,
            self.config.seq_len,
            self.config.horizon,
            self.config.num_nodes,
            scaler=scaler,
            augment=False
        )

        # Create data loaders
        train_loader = DataLoader(
            train_dataset,
            batch_size=self.config.batch_size,
            shuffle=True,
            num_workers=2,
            pin_memory=True
        )

        val_loader = DataLoader(
            val_dataset,
            batch_size=self.config.batch_size,
            shuffle=False,
            num_workers=2
        )

        test_loader = DataLoader(
            test_dataset,
            batch_size=self.config.batch_size,
            shuffle=False,
            num_workers=2
        )

        return train_loader, val_loader, test_loader, scaler

    def _split_data(self, df):
        """
        Split data into train, validation, and test sets
        """
        num_samples = len(df)
        train_size = int(num_samples * 0.7)
        val_size = int(num_samples * 0.1)

        train_data = df[:train_size]
        val_data = df[train_size:train_size+val_size]
        test_data = df[train_size+val_size:]

        return train_data, val_data, test_data

Cell 14: Run Enhanced Data Pipeline

In [15]:
# Update config with data path
config.data_path = '/content/data'

# Initialize and run pipeline
pipeline = DataPipeline(config)
train_loader, val_loader, test_loader, scaler = pipeline.create_data_loaders()

# Print dataset statistics
logger.info("\nDataset Statistics:")
logger.info(f"Training batches: {len(train_loader)}")
logger.info(f"Validation batches: {len(val_loader)}")
logger.info(f"Test batches: {len(test_loader)}")

# Test data augmentation
sample_batch, sample_target = next(iter(train_loader))
logger.info(f"\nSample batch shape: {sample_batch.shape}")
logger.info(f"Sample target shape: {sample_target.shape}")

# Visualize augmented data
def plot_augmented_samples(batch_data, num_samples=3):
    plt.figure(figsize=(15, 5))
    for i in range(num_samples):
        plt.subplot(1, num_samples, i+1)
        # Changed line to access correct dimension
        plt.plot(batch_data[i, :, 0].numpy())
        plt.title(f'Augmented Sample {i+1}')
        plt.xlabel('Time Steps')
        plt.ylabel('Traffic Flow')
    plt.tight_layout()
    plt.savefig('/content/results/augmented_samples.png')
    plt.close()

plot_augmented_samples(sample_batch)
logger.info("Augmented samples visualization saved to /content/results/augmented_samples.png")

2025-05-02 10:07:27,212 - INFO - Initializing DataProcessor with path: /content/data
INFO:AGCRN:Initializing DataProcessor with path: /content/data
2025-05-02 10:07:28,457 - INFO - Missing values before processing:
773869    0
767541    0
767542    0
717447    0
717446    0
         ..
717592    0
717595    0
772168    0
718141    0
769373    0
Length: 207, dtype: int64
INFO:AGCRN:Missing values before processing:
773869    0
767541    0
767542    0
717447    0
717446    0
         ..
717592    0
717595    0
772168    0
718141    0
769373    0
Length: 207, dtype: int64
2025-05-02 10:07:28,662 - INFO - Missing values handled
INFO:AGCRN:Missing values handled
2025-05-02 10:07:29,997 - INFO - Outliers removed
INFO:AGCRN:Outliers removed
2025-05-02 10:07:30,041 - INFO - Time features added
INFO:AGCRN:Time features added
2025-05-02 10:07:30,047 - INFO - Data processed successfully:
INFO:AGCRN:Data processed successfully:
2025-05-02 10:07:30,051 - INFO - Shape: (34272, 211)
INFO:AGCRN:Shape:

## **Phase 2: 2.1 Phân cụm và Fog**
- HDBSCAN
- Triển khai thuật toán phân cụm
- Tối ưu hyperparameters
- Visualization kết quả phân cụm


Cell 1: Import và Utilities

In [16]:
import os
import pandas as pd
import numpy as np
import hdbscan
import math
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from tqdm import tqdm
import logging

logger = logging.getLogger('AGCRN.clustering')

Cell 2: Distance Calculator

In [17]:
class DistanceCalculator:
    def __init__(self):
        self.R_e = 6371  # Earth radius in km

    def haversine_distance(self, lat1, lon1, lat2, lon2):
        """Calculate Haversine distance between two points"""
        lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
        c = 2 * math.asin(math.sqrt(a))
        return self.R_e * c

    def create_distance_matrix(self, coords):
        """Create distance matrix for all points"""
        n = len(coords)
        distances = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                dist = self.haversine_distance(
                    coords[i][0], coords[i][1],
                    coords[j][0], coords[j][1]
                )
                distances[i, j] = dist
                distances[j, i] = dist
        return distances

Cell 3: HDBSCAN Clustering

In [18]:
class TrafficClusterer:
    def __init__(self, min_cluster_size=3, min_samples=2):
        self.min_cluster_size = min_cluster_size
        self.min_samples = min_samples
        self.distance_calculator = DistanceCalculator()
        self.clusterer = None
        self.labels = None

    def fit(self, location_df):
        """
        Perform HDBSCAN clustering on sensor locations
        """
        # Extract coordinates
        coords = location_df[['latitude', 'longitude']].values

        # Calculate distance matrix
        logger.info("Calculating Haversine distance matrix...")
        distance_matrix = self.distance_calculator.create_distance_matrix(coords)

        # Perform clustering
        logger.info("Performing HDBSCAN clustering...")
        self.clusterer = hdbscan.HDBSCAN(
            min_cluster_size=self.min_cluster_size,
            min_samples=self.min_samples,
            metric='precomputed'
        )
        self.labels = self.clusterer.fit_predict(distance_matrix)

        # Add cluster labels to DataFrame
        location_df['cluster'] = self.labels

        # Assign noise points to nearest clusters
        logger.info("Assigning noise points to nearest clusters...")
        location_df = self._assign_noise_points(location_df, coords)

        return location_df

    def _assign_noise_points(self, location_df, coords):
        """
        Assign noise points (labeled as -1) to their nearest cluster
        """
        # Get noise points
        noise_points = location_df[location_df['cluster'] == -1]

        if len(noise_points) > 0:
            logger.info(f"Found {len(noise_points)} noise points to assign")

            # Get cluster centers
            cluster_centers = {}
            for cluster_id in set(self.labels) - {-1}:
                cluster_points = location_df[location_df['cluster'] == cluster_id]
                cluster_centers[cluster_id] = (
                    cluster_points['latitude'].mean(),
                    cluster_points['longitude'].mean()
                )

            # Assign each noise point to nearest cluster
            for idx in noise_points.index:
                min_distance = float('inf')
                nearest_cluster = None
                point_lat = location_df.loc[idx, 'latitude']
                point_lon = location_df.loc[idx, 'longitude']

                # Find nearest cluster
                for cluster_id, center in cluster_centers.items():
                    dist = self.distance_calculator.haversine_distance(
                        point_lat, point_lon,
                        center[0], center[1]
                    )
                    if dist < min_distance:
                        min_distance = dist
                        nearest_cluster = cluster_id

                # Assign to nearest cluster
                if nearest_cluster is not None:
                    location_df.loc[idx, 'cluster'] = nearest_cluster
                    logger.info(f"Assigned noise point {location_df.loc[idx, 'sensor_id']} "
                              f"to cluster {nearest_cluster} (distance: {min_distance:.2f} km)")

        return location_df

    def calculate_fog_nodes(self, location_df):
        """
        Calculate Fog nodes for each cluster
        """
        n_clusters = len(set(self.labels)) - (1 if -1 in self.labels else 0)
        fog_nodes = []

        logger.info("Calculating Fog nodes...")
        for cluster_id in range(n_clusters):
            cluster_data = location_df[location_df['cluster'] == cluster_id]
            if len(cluster_data) == 0:
                continue

            # Calculate cluster center
            lat_center = cluster_data['latitude'].mean()
            lon_center = cluster_data['longitude'].mean()
            center = (lat_center, lon_center)

            # Find nearest station to center
            min_distance = float('inf')
            nearest_station = None
            for _, station in cluster_data.iterrows():
                dist = self.distance_calculator.haversine_distance(
                    station['latitude'],
                    station['longitude'],
                    center[0],
                    center[1]
                )
                if dist < min_distance:
                    min_distance = dist
                    nearest_station = station

            fog_nodes.append({
                'cluster_id': cluster_id,
                'latitude': nearest_station['latitude'],
                'longitude': nearest_station['longitude'],
                'num_stations': len(cluster_data),
                'station_ids': cluster_data['sensor_id'].tolist(),
                'center_distance': min_distance  # Added distance from center
            })

        return pd.DataFrame(fog_nodes)

    def get_cluster_statistics(self, location_df):
        """
        Get statistics about the clusters
        """
        stats = {
            'total_points': len(location_df),
            'num_clusters': len(set(location_df['cluster'].unique())),
            'cluster_sizes': location_df['cluster'].value_counts().to_dict(),
            'avg_cluster_size': location_df['cluster'].value_counts().mean(),
            'min_cluster_size': location_df['cluster'].value_counts().min(),
            'max_cluster_size': location_df['cluster'].value_counts().max()
        }

        logger.info("Cluster Statistics:")
        for key, value in stats.items():
            logger.info(f"{key}: {value}")

        return stats

Cell 4: Visualization

In [19]:
class ClusterVisualizer:
    def __init__(self, save_dir='/content/results/clusters'):
        self.save_dir = save_dir
        os.makedirs(save_dir, exist_ok=True)

    def plot_clusters(self, location_df, fog_df):
        """
        Plot clustering results with Fog nodes
        """
        plt.figure(figsize=(12, 8))

        # Plot clusters
        unique_labels = set(location_df['cluster'])
        colors = ListedColormap(plt.cm.Set1(np.linspace(0, 1, len(unique_labels))))

        for i, label in enumerate(unique_labels):
            cluster_data = location_df[location_df['cluster'] == label]
            plt.scatter(
                cluster_data['longitude'],
                cluster_data['latitude'],
                c=[colors.colors[i]],
                s=50,
                alpha=0.7,
                label=f'Cluster {label}'
            )

        # Plot connections to Fog nodes
        for _, station in location_df.iterrows():
            cluster_id = station['cluster']
            if cluster_id != -1:
                fog_row = fog_df[fog_df['cluster_id'] == cluster_id]
                if not fog_row.empty:
                    fog_coords = fog_row[['longitude', 'latitude']].values[0]
                    plt.plot(
                        [station['longitude'], fog_coords[0]],
                        [station['latitude'], fog_coords[1]],
                        color='black',
                        linestyle='--',
                        linewidth=0.5,
                        alpha=0.3
                    )

        # Plot Fog nodes
        plt.scatter(
            fog_df['longitude'],
            fog_df['latitude'],
            c='red',
            marker='o',
            s=60,
            edgecolors='black',
            label='Fog Nodes'
        )

        plt.title("Traffic Sensor Clustering (HDBSCAN)")
        plt.xlabel("Longitude")
        plt.ylabel("Latitude")
        plt.grid(True)
        plt.legend(loc='best')

        # Save plot
        save_path = os.path.join(self.save_dir, 'traffic_clusters.png')
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()

        logger.info(f"Clustering visualization saved to {save_path}")

Cell 5: Main Execution

In [20]:
def main():
    # Load sensor locations
    location_df = pd.read_csv('/content/data/sensor_locations.csv')

    # Initialize clusterer
    clusterer = TrafficClusterer(min_cluster_size=3, min_samples=2)

    # Perform clustering
    location_df = clusterer.fit(location_df)

    # Calculate Fog nodes
    fog_df = clusterer.calculate_fog_nodes(location_df)

    # Visualize results
    visualizer = ClusterVisualizer()
    visualizer.plot_clusters(location_df, fog_df)

    # Save results
    location_df.to_csv('/content/results/clusters/station_clusters.csv', index=False)
    fog_df.to_csv('/content/results/clusters/fog_nodes.csv', index=False)

    # Log summary
    n_clusters = len(set(clusterer.labels)) - (1 if -1 in clusterer.labels else 0)
    logger.info(f"\nClustering completed:")
    logger.info(f"- Number of sensors: {len(location_df)}")
    logger.info(f"- Number of clusters: {n_clusters}")
    logger.info(f"- Number of Fog nodes: {len(fog_df)}")

if __name__ == "__main__":
    main()

2025-05-02 10:08:22,332 - INFO - Calculating Haversine distance matrix...
INFO:AGCRN.clustering:Calculating Haversine distance matrix...
2025-05-02 10:08:22,387 - INFO - Performing HDBSCAN clustering...
INFO:AGCRN.clustering:Performing HDBSCAN clustering...
2025-05-02 10:08:22,406 - INFO - Assigning noise points to nearest clusters...
INFO:AGCRN.clustering:Assigning noise points to nearest clusters...
2025-05-02 10:08:22,409 - INFO - Found 34 noise points to assign
INFO:AGCRN.clustering:Found 34 noise points to assign
2025-05-02 10:08:22,428 - INFO - Assigned noise point 717804 to cluster 18 (distance: 7.51 km)
INFO:AGCRN.clustering:Assigned noise point 717804 to cluster 18 (distance: 7.51 km)
2025-05-02 10:08:22,431 - INFO - Assigned noise point 767572 to cluster 33 (distance: 0.73 km)
INFO:AGCRN.clustering:Assigned noise point 767572 to cluster 33 (distance: 0.73 km)
2025-05-02 10:08:22,433 - INFO - Assigned noise point 767573 to cluster 33 (distance: 0.74 km)
INFO:AGCRN.clustering:A

# **2.2 Fog computing**
- Xây dựng class Fognode
- Triển khai fog alliance formation
- Quản lý trạng thái nút

Cell 1: Fog Node Class

In [21]:
import os
import pandas as pd
import numpy as np
import math
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import logging

logger = logging.getLogger('AGCRN.fog')

# Kiểm tra và tạo thư mục kết quả
os.makedirs('/content/results/fog', exist_ok=True)

In [22]:
class FogNode:
    def __init__(self, node_id, latitude, longitude, capacity=100):
        self.node_id = node_id
        self.latitude = latitude
        self.longitude = longitude
        self.capacity = capacity
        self.current_load = 0
        self.status = 'active'  # active, overloaded, inactive
        self.sensors = []  # List of connected sensors
        self.alliance_id = None
        self.is_aggregator = False
        self.neighbors = []  # Other fog nodes in the same alliance

        logger.info(f"FogNode {node_id} initialized at ({latitude}, {longitude})")

    def add_sensor(self, sensor_id):
        """Add a sensor to this fog node"""
        if self.current_load < self.capacity:
            self.sensors.append(sensor_id)
            self.current_load += 1
            self._update_status()
            return True
        return False

    def remove_sensor(self, sensor_id):
        """Remove a sensor from this fog node"""
        if sensor_id in self.sensors:
            self.sensors.remove(sensor_id)
            self.current_load -= 1
            self._update_status()
            return True
        return False

    def _update_status(self):
        """Update node status based on current load"""
        load_ratio = self.current_load / self.capacity
        if load_ratio >= 0.9:
            self.status = 'overloaded'
        elif load_ratio <= 0.1:
            self.status = 'inactive'
        else:
            self.status = 'active'

    def get_state(self):
        """Return current state of the fog node"""
        return {
            'node_id': self.node_id,
            'location': (self.latitude, self.longitude),
            'capacity': self.capacity,
            'current_load': self.current_load,
            'status': self.status,
            'num_sensors': len(self.sensors),
            'alliance_id': self.alliance_id,
            'is_aggregator': self.is_aggregator
        }

Cell 2: Fog Alliance Manager

In [23]:
class FogAllianceManager:
    def __init__(self, max_distance=5):
        self.max_distance = max_distance
        self.fog_nodes = {}  # Dictionary of FogNode objects
        self.alliances = {}  # Dictionary of alliance_id: [fog_node_ids]
        self.aggregators = {}  # Dictionary of alliance_id: aggregator_node_id

    def add_fog_node(self, node_id, latitude, longitude):
        """Add a new fog node to the network"""
        self.fog_nodes[node_id] = FogNode(node_id, latitude, longitude)

    def form_alliances(self, fog_df, n_clusters=None):
        """Form alliances using K-Means clustering"""
        # Prepare coordinates for clustering
        coords = fog_df[['latitude', 'longitude']].values
        scaler = StandardScaler()
        coords_scaled = scaler.fit_transform(coords)

        # Determine number of clusters if not specified
        if n_clusters is None:
            n_clusters = max(1, len(fog_df) // 2)

        # Perform K-Means clustering
        kmeans = KMeans(n_clusters=n_clusters, random_state=0)
        alliance_labels = kmeans.fit_predict(coords_scaled)
        cluster_centers = scaler.inverse_transform(kmeans.cluster_centers_)

        # Assign alliance IDs and find aggregators
        for idx, row in fog_df.iterrows():
            node_id = row['cluster_id']
            alliance_id = alliance_labels[idx]

            # Create alliance if it doesn't exist
            if alliance_id not in self.alliances:
                self.alliances[alliance_id] = []

            # Add node to alliance
            self.alliances[alliance_id].append(node_id)
            self.fog_nodes[node_id].alliance_id = alliance_id

            # Check if this node should be aggregator (closest to cluster center)
            center_lat, center_lon = cluster_centers[alliance_id]
            dist = self._haversine_distance(
                row['latitude'], row['longitude'],
                center_lat, center_lon
            )

            if alliance_id not in self.aggregators or dist < self._haversine_distance(
                self.fog_nodes[self.aggregators[alliance_id]].latitude,
                self.fog_nodes[self.aggregators[alliance_id]].longitude,
                center_lat, center_lon
            ):
                self.aggregators[alliance_id] = node_id

        # Set aggregator flags and update neighbors
        for alliance_id, members in self.alliances.items():
            aggregator_id = self.aggregators[alliance_id]
            self.fog_nodes[aggregator_id].is_aggregator = True

            # Set neighbors for each node in alliance
            for node_id in members:
                self.fog_nodes[node_id].neighbors = [
                    m for m in members if m != node_id
                ]

        return self._create_alliance_graph()

    def _haversine_distance(self, lat1, lon1, lat2, lon2):
        """Calculate Haversine distance between two points"""
        R_e = 6371
        lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
        c = 2 * math.asin(math.sqrt(a))
        return R_e * c

    def _create_alliance_graph(self):
        """Create a NetworkX graph of the alliance structure"""
        G = nx.Graph()

        # Add nodes
        for node_id, fog_node in self.fog_nodes.items():
            G.add_node(node_id,
                      pos=(fog_node.longitude, fog_node.latitude),
                      alliance_id=fog_node.alliance_id,
                      is_aggregator=fog_node.is_aggregator)

        # Add edges within alliances
        for alliance_members in self.alliances.values():
            aggregator = next(n for n in alliance_members
                            if self.fog_nodes[n].is_aggregator)
            for member in alliance_members:
                if member != aggregator:
                    G.add_edge(member, aggregator)

        return G

Cell 3: Visualization and State Management

In [24]:
class FogVisualizer:
    def __init__(self, save_dir='/content/results/fog'):
        self.save_dir = save_dir
        os.makedirs(save_dir, exist_ok=True)

    def plot_alliance_graph(self, G, title="Fog Alliances"):
        """Plot the alliance graph"""
        plt.figure(figsize=(12, 8))

        # Get node positions
        pos = nx.get_node_attributes(G, 'pos')

        # Color nodes by alliance
        alliance_ids = [G.nodes[n]['alliance_id'] for n in G.nodes()]
        unique_alliances = sorted(set(alliance_ids))
        colors = plt.cm.Set1(np.linspace(0, 1, len(unique_alliances)))

        # Draw regular nodes
        nx.draw_networkx_nodes(G, pos,
                             node_color=[colors[aid] for aid in alliance_ids],
                             node_size=100,
                             alpha=0.8)

        # Draw aggregator nodes with different style
        aggregators = [n for n in G.nodes() if G.nodes[n]['is_aggregator']]
        nx.draw_networkx_nodes(G, pos,
                             nodelist=aggregators,
                             node_color='red',
                             node_size=200,
                             node_shape='s')

        # Draw edges
        nx.draw_networkx_edges(G, pos, alpha=0.5)

        # Add labels
        nx.draw_networkx_labels(G, pos, font_size=8)

        plt.title(title)
        plt.xlabel("Longitude")
        plt.ylabel("Latitude")
        plt.grid(True)

        # Save plot
        save_path = os.path.join(self.save_dir, 'fog_alliances.png')
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close()

        logger.info(f"Alliance graph saved to {save_path}")

Cell 4: Main Execution

In [25]:
def main():
    # Load fog nodes from previous clustering
    fog_df = pd.read_csv('/content/results/clusters/fog_nodes.csv')

    # Initialize alliance manager
    alliance_manager = FogAllianceManager(max_distance=5)

    # Add fog nodes
    for _, row in fog_df.iterrows():
        alliance_manager.add_fog_node(
            row['cluster_id'],
            row['latitude'],
            row['longitude']
        )

    # Form alliances
    G = alliance_manager.form_alliances(fog_df, n_clusters=5)

    # Visualize results
    visualizer = FogVisualizer()
    visualizer.plot_alliance_graph(G)

    # Save alliance information
    alliance_info = {
        'alliances': alliance_manager.alliances,
        'aggregators': alliance_manager.aggregators
    }

    with open('/content/results/fog/alliance_info.txt', 'w') as f:
        f.write("Alliance Information:\n")
        for alliance_id, members in alliance_info['alliances'].items():
            f.write(f"\nAlliance {alliance_id}:\n")
            f.write(f"Aggregator: {alliance_info['aggregators'][alliance_id]}\n")
            f.write(f"Members: {members}\n")

    logger.info("Fog computing setup completed successfully")

if __name__ == "__main__":
    main()

2025-05-02 10:08:44,779 - INFO - FogNode 0 initialized at (34.20164, -118.40366)
INFO:AGCRN.fog:FogNode 0 initialized at (34.20164, -118.40366)
2025-05-02 10:08:44,781 - INFO - FogNode 1 initialized at (34.20663, -118.20101)
INFO:AGCRN.fog:FogNode 1 initialized at (34.20663, -118.20101)
2025-05-02 10:08:44,783 - INFO - FogNode 2 initialized at (34.21356, -118.23113)
INFO:AGCRN.fog:FogNode 2 initialized at (34.21356, -118.23113)
2025-05-02 10:08:44,784 - INFO - FogNode 3 initialized at (34.05767, -118.21435)
INFO:AGCRN.fog:FogNode 3 initialized at (34.05767, -118.21435)
2025-05-02 10:08:44,786 - INFO - FogNode 4 initialized at (34.21216, -118.47341)
INFO:AGCRN.fog:FogNode 4 initialized at (34.21216, -118.47341)
2025-05-02 10:08:44,788 - INFO - FogNode 5 initialized at (34.15367, -118.3484)
INFO:AGCRN.fog:FogNode 5 initialized at (34.15367, -118.3484)
2025-05-02 10:08:44,789 - INFO - FogNode 6 initialized at (34.17109, -118.50495)
INFO:AGCRN.fog:FogNode 6 initialized at (34.17109, -118.5

# ***Phase 3: Decentralized Laerning ***
3.1 Parameter sharing
- Triển khai cơ chế chia sẻ
- Bảo mật và mã hoá
- xử lý lỗi và recovery

Cell 1: Import Libraries và Setup



In [26]:
import os
import numpy as np
import torch
import torch.nn as nn
from cryptography.fernet import Fernet
from base64 import b64encode, b64decode
import json
import hashlib
import threading
import queue
import time
from typing import Dict, List, Optional
import logging

logger = logging.getLogger('AGCRN.parameter_sharing')

class ParameterSharingConfig:
    def __init__(self):
        self.encryption_key = Fernet.generate_key()
        self.sharing_interval = 10  # seconds
        self.retry_attempts = 3
        self.timeout = 5  # seconds
        self.min_participants = 2
        self.recovery_mode = 'latest'  # 'latest' or 'average'

        # Paths
        self.checkpoint_dir = '/content/checkpoints/parameter_sharing'
        os.makedirs(self.checkpoint_dir, exist_ok=True)

        logger.info("Parameter sharing configuration initialized")

Cell 2: Security Manager

In [27]:
class SecurityManager:
    def __init__(self, config: ParameterSharingConfig):
        self.config = config
        self.fernet = Fernet(config.encryption_key)

    def encrypt_parameters(self, parameters: Dict[str, torch.Tensor]) -> bytes:
        """
        Encrypt model parameters
        """
        try:
            # Convert parameters to bytes
            param_bytes = self._serialize_parameters(parameters)

            # Add checksum
            checksum = hashlib.sha256(param_bytes).digest()
            data = checksum + param_bytes

            # Encrypt
            encrypted_data = self.fernet.encrypt(data)
            logger.debug("Parameters encrypted successfully")
            return encrypted_data

        except Exception as e:
            logger.error(f"Encryption failed: {e}")
            raise

    def decrypt_parameters(self, encrypted_data: bytes) -> Dict[str, torch.Tensor]:
        """
        Decrypt and verify parameters
        """
        try:
            # Decrypt
            decrypted_data = self.fernet.decrypt(encrypted_data)

            # Verify checksum
            checksum = decrypted_data[:32]
            param_bytes = decrypted_data[32:]
            if hashlib.sha256(param_bytes).digest() != checksum:
                raise ValueError("Checksum verification failed")

            # Deserialize parameters
            parameters = self._deserialize_parameters(param_bytes)
            logger.debug("Parameters decrypted and verified successfully")
            return parameters

        except Exception as e:
            logger.error(f"Decryption failed: {e}")
            raise

    def _serialize_parameters(self, parameters: Dict[str, torch.Tensor]) -> bytes:
        """
        Serialize parameters to bytes
        """
        serialized = {}
        for name, param in parameters.items():
            serialized[name] = {
                'data': b64encode(param.cpu().numpy().tobytes()).decode('utf-8'),
                'shape': param.shape,
                'dtype': str(param.dtype)
            }
        return json.dumps(serialized).encode('utf-8')

    def _deserialize_parameters(self, param_bytes: bytes) -> Dict[str, torch.Tensor]:
        """
        Deserialize parameters from bytes
        """
        serialized = json.loads(param_bytes.decode('utf-8'))
        parameters = {}
        for name, param_data in serialized.items():
            data = np.frombuffer(
                b64decode(param_data['data']),
                dtype=np.dtype(param_data['dtype'])
            ).reshape(param_data['shape'])
            parameters[name] = torch.from_numpy(data)
        return parameters

Cell 3: Parameter Sharing Manager

In [28]:
class ParameterSharingManager:
    def __init__(self, config: ParameterSharingConfig, security_manager: SecurityManager):
        self.config = config
        self.security_manager = security_manager
        self.parameter_queue = queue.Queue()
        self.shared_parameters = {}
        self.sharing_lock = threading.Lock()
        self.is_sharing = False
        self.failed_attempts = {}

    def start_sharing(self):
        """
        Start parameter sharing process
        """
        self.is_sharing = True
        self.sharing_thread = threading.Thread(target=self._sharing_loop)
        self.sharing_thread.daemon = True
        self.sharing_thread.start()
        logger.info("Parameter sharing started")

    def stop_sharing(self):
        """
        Stop parameter sharing process
        """
        self.is_sharing = False
        self.sharing_thread.join()
        logger.info("Parameter sharing stopped")

    def share_parameters(self, node_id: str, parameters: Dict[str, torch.Tensor]):
        """
        Share parameters from a node
        """
        try:
            # Encrypt parameters
            encrypted_params = self.security_manager.encrypt_parameters(parameters)

            # Add to queue
            self.parameter_queue.put((node_id, encrypted_params))
            logger.debug(f"Parameters from node {node_id} queued for sharing")

            # Reset failed attempts
            self.failed_attempts[node_id] = 0

        except Exception as e:
            logger.error(f"Failed to share parameters from node {node_id}: {e}")
            self._handle_failure(node_id)

    def get_shared_parameters(self, node_id: str) -> Optional[Dict[str, torch.Tensor]]:
        """
        Get latest shared parameters
        """
        with self.sharing_lock:
            if node_id in self.shared_parameters:
                return self.shared_parameters[node_id]
        return None

    def _sharing_loop(self):
        """
        Main parameter sharing loop
        """
        while self.is_sharing:
            try:
                # Process queued parameters
                while not self.parameter_queue.empty():
                    node_id, encrypted_params = self.parameter_queue.get()
                    self._process_parameters(node_id, encrypted_params)

                # Checkpoint current state
                self._save_checkpoint()

                time.sleep(self.config.sharing_interval)

            except Exception as e:
                logger.error(f"Error in sharing loop: {e}")

    def _process_parameters(self, node_id: str, encrypted_params: bytes):
        """
        Process received parameters
        """
        try:
            # Decrypt parameters
            parameters = self.security_manager.decrypt_parameters(encrypted_params)

            # Update shared parameters
            with self.sharing_lock:
                self.shared_parameters[node_id] = parameters

            logger.debug(f"Processed parameters from node {node_id}")

        except Exception as e:
            logger.error(f"Failed to process parameters from node {node_id}: {e}")
            self._handle_failure(node_id)

    def _handle_failure(self, node_id: str):
        """
        Handle parameter sharing failures
        """
        self.failed_attempts[node_id] = self.failed_attempts.get(node_id, 0) + 1

        if self.failed_attempts[node_id] >= self.config.retry_attempts:
            logger.warning(f"Node {node_id} exceeded maximum retry attempts")
            self._initiate_recovery(node_id)

    def _initiate_recovery(self, node_id: str):
        """
        Initiate recovery process for failed node
        """
        try:
            if self.config.recovery_mode == 'latest':
                # Use latest successful parameters
                self._recover_from_checkpoint(node_id)
            else:
                # Use average of other nodes' parameters
                self._recover_from_average(node_id)

            logger.info(f"Recovery completed for node {node_id}")

        except Exception as e:
            logger.error(f"Recovery failed for node {node_id}: {e}")

    def _save_checkpoint(self):
        """
        Save current state to checkpoint
        """
        checkpoint_path = os.path.join(
            self.config.checkpoint_dir,
            f'sharing_checkpoint_{int(time.time())}.pt'
        )

        try:
            with self.sharing_lock:
                torch.save({
                    'shared_parameters': self.shared_parameters,
                    'failed_attempts': self.failed_attempts,
                    'timestamp': time.time()
                }, checkpoint_path)

            logger.debug(f"Checkpoint saved: {checkpoint_path}")

        except Exception as e:
            logger.error(f"Failed to save checkpoint: {e}")

Cell 4: Usage Example

In [29]:
def main():
    # Initialize configuration
    config = ParameterSharingConfig()

    # Initialize security manager
    security_manager = SecurityManager(config)

    # Initialize parameter sharing manager
    sharing_manager = ParameterSharingManager(config, security_manager)

    # Start parameter sharing
    sharing_manager.start_sharing()

    try:
        # Example: Share parameters from multiple nodes
        for i in range(3):
            # Simulate parameters from a node
            parameters = {
                'weight': torch.randn(64, 64),
                'bias': torch.randn(64)
            }

            # Share parameters
            sharing_manager.share_parameters(f'node_{i}', parameters)

            # Get shared parameters
            shared_params = sharing_manager.get_shared_parameters(f'node_{i}')
            if shared_params is not None:
                logger.info(f"Retrieved parameters from node_{i}")

            time.sleep(2)

    finally:
        # Stop parameter sharing
        sharing_manager.stop_sharing()

if __name__ == "__main__":
    main()

2025-05-02 10:09:02,961 - INFO - Parameter sharing configuration initialized
INFO:AGCRN.parameter_sharing:Parameter sharing configuration initialized
2025-05-02 10:09:02,966 - INFO - Parameter sharing started
INFO:AGCRN.parameter_sharing:Parameter sharing started
2025-05-02 10:09:12,965 - INFO - Parameter sharing stopped
INFO:AGCRN.parameter_sharing:Parameter sharing stopped


# **3.2 DFL Integration**
- Tích hợp DFL với AGCRN
- Cơ chế aggregation
- Đồng bộ hoá mô hình

Cell 1: Imports và Cấu hình

In [30]:
import os
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from typing import Dict, List, Optional
from collections import OrderedDict
import threading
import time
import copy
import logging
from torch.utils.data import DataLoader

# Set up device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('AGCRN.dfl')

class DFLConfig:
    def __init__(self):
        # DFL parameters
        self.num_local_epochs = 5
        self.num_rounds = 100
        self.local_batch_size = 32
        self.learning_rate = 0.001
        self.min_clients = 2
        self.aggregation_method = 'fedavg'
        self.sync_interval = 10
        self.timeout = 30

        # Model parameters
        self.seq_len = 12
        self.horizon = 12
        self.input_dim = 1
        self.hidden_dim = 64
        self.output_dim = 1
        self.num_layers = 2
        self.cheb_k = 2
        self.embed_dim = 10

        # Paths
        self.model_dir = '/content/models/dfl'
        os.makedirs(self.model_dir, exist_ok=True)

        logger.info("DFL configuration initialized")

Cell 2: Dataset và Model State Management

In [31]:
class TrafficDataset(torch.utils.data.Dataset):
    def __init__(self, data, seq_len, horizon, num_nodes):
        self.data = torch.FloatTensor(data)  # Shape: [time_steps, num_nodes]
        self.seq_len = seq_len
        self.horizon = horizon
        self.num_nodes = num_nodes
        self.samples = self._generate_samples()

    def _generate_samples(self):
        num_samples = len(self.data) - self.seq_len - self.horizon + 1
        samples = []
        for i in range(num_samples):
            x = self.data[i:i+self.seq_len]  # Shape: [seq_len, num_nodes]
            y = self.data[i+self.seq_len:i+self.seq_len+self.horizon]  # Shape: [horizon, num_nodes]
            # Reshape x to [seq_len, num_nodes, 1]
            x = x.unsqueeze(-1)
            # Reshape y to [horizon, num_nodes, 1]
            y = y.unsqueeze(-1)
            samples.append((x, y))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]

class ModelState:
    def __init__(self, model: nn.Module):
        self.state_dict = OrderedDict({
            k: v.clone().detach().cpu()
            for k, v in model.state_dict().items()
        })

    def apply_to_model(self, model: nn.Module):
        model.load_state_dict(self.state_dict)

    @staticmethod
    def aggregate_states(states: List['ModelState'], weights: Optional[List[float]] = None) -> 'ModelState':
        if not states:
            raise ValueError("No states to aggregate")

        if weights is None:
            weights = [1.0 / len(states)] * len(states)

        if len(weights) != len(states):
            raise ValueError("Number of weights must match number of states")

        if not np.isclose(sum(weights), 1.0):
            weights = [w / sum(weights) for w in weights]

        aggregated_state = OrderedDict()
        for key in states[0].state_dict.keys():
            # Initialize with the first weighted state
            aggregated_state[key] = states[0].state_dict[key] * weights[0]

            # Add remaining weighted states
            for state, weight in zip(states[1:], weights[1:]):
                aggregated_state[key] += state.state_dict[key] * weight

        result = ModelState(None)
        result.state_dict = aggregated_state
        return result

Cell 3: Local Training Manager

In [32]:
class LocalTrainingManager:
    def __init__(self, model, optimizer, train_loader, config):
        self.model = model
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.config = config
        self.criterion = nn.MSELoss()
        self.current_state = ModelState(model)

    def train_local_epoch(self) -> float:
        """
        Train model for one local epoch
        """
        self.model.train()
        total_loss = 0
        num_batches = 0

        for batch_idx, (data, target) in enumerate(self.train_loader):
            # data shape: [batch_size, seq_len, num_nodes, 1]
            # target shape: [batch_size, horizon, num_nodes, 1]
            data = data.to(device)
            target = target.to(device)

            self.optimizer.zero_grad()
            output = self.model(data)  # shape: [batch_size, horizon, num_nodes, 1]

            loss = self.criterion(output, target)
            loss.backward()
            self.optimizer.step()

            total_loss += loss.item()
            num_batches += 1

        avg_loss = total_loss / num_batches
        # Update current state after training
        self.current_state = ModelState(self.model)
        logger.debug(f"Local epoch completed with average loss: {avg_loss:.4f}")
        return avg_loss

    def update_model(self, new_state: ModelState):
        """
        Update local model with new state
        """
        new_state.apply_to_model(self.model)
        self.current_state = ModelState(self.model)

Cell 4: DFL Coordinator

In [33]:
class DFLCoordinator:
    def __init__(self, config: DFLConfig):
        self.config = config
        self.clients: Dict[str, LocalTrainingManager] = {}
        self.global_state: Optional[ModelState] = None
        self._round = 0
        self._lock = threading.Lock()

    def add_client(self, client_id: str, client: LocalTrainingManager):
        """Add a client to the federation"""
        self.clients[client_id] = client
        if self.global_state is None:
            self.global_state = ModelState(client.model)
        logger.info(f"Added client {client_id} to federation")

    def _train_client(self, client_id: str, client: LocalTrainingManager):
        """Train a single client for one round"""
        try:
            for _ in range(self.config.num_local_epochs):
                loss = client.train_local_epoch()
            logger.info(f"Client {client_id} completed training with final loss: {loss:.4f}")
            return True
        except Exception as e:
            logger.error(f"Client {client_id} failed in round {self._round}: {str(e)}")
            return False

    def _aggregate_models(self, successful_clients: Dict[str, LocalTrainingManager]):
        """Aggregate models from successful clients"""
        if not successful_clients:
            return None

        states = [client.current_state for client in successful_clients.values()]
        weights = [1.0 / len(successful_clients)] * len(successful_clients)

        return ModelState.aggregate_states(states, weights)

    def start_training(self):
        """Start federated learning process"""
        logger.info("Started federated learning")

        for round_num in range(self.config.num_rounds):
            self._round = round_num
            logger.info(f"Starting round {round_num}")

            # Train clients in parallel
            successful_clients = {}
            for client_id, client in self.clients.items():
                if self._train_client(client_id, client):
                    successful_clients[client_id] = client

            # Check if we have enough successful clients
            if len(successful_clients) < self.config.min_clients:
                logger.warning(f"Insufficient clients for round {round_num}")
                continue

            # Aggregate models
            new_global_state = self._aggregate_models(successful_clients)
            if new_global_state is None:
                continue

            # Update global state
            self.global_state = new_global_state

            # Synchronize all clients
            for client in self.clients.values():
                client.update_model(self.global_state)

            # Save checkpoint
            if round_num % self.config.sync_interval == 0:
                self._save_checkpoint(round_num)

        logger.info("Federated learning completed")

    def _save_checkpoint(self, round_num: int):
        """Save checkpoint of global model"""
        checkpoint_path = os.path.join(self.config.model_dir, f'checkpoint_round_{round_num}.pt')
        torch.save(self.global_state.state_dict, checkpoint_path)
        logger.info(f"Saved checkpoint for round {round_num}")

Cell 5: Data Loading Functions

In [34]:
def load_fog_nodes():
    """
    Load fog nodes from saved results
    """
    try:
        fog_nodes_path = '/content/results/clusters/fog_nodes.csv'
        fog_df = pd.read_csv(fog_nodes_path)

        fog_nodes = {}
        for _, row in fog_df.iterrows():
            fog_nodes[str(row['cluster_id'])] = {
                'latitude': row['latitude'],
                'longitude': row['longitude'],
                'num_stations': row['num_stations'],
                'station_ids': eval(row['station_ids']) if isinstance(row['station_ids'], str) else row['station_ids']
            }

        logger.info(f"Loaded {len(fog_nodes)} fog nodes")
        return fog_nodes

    except Exception as e:
        logger.error(f"Failed to load fog nodes: {e}")
        raise

def prepare_data_loaders(fog_nodes, config):
    """
    Prepare data loaders for each fog node
    """
    try:
        data_path = '/content/data/metr-la.csv'
        df = pd.read_csv(data_path, index_col=0)

        df.columns = df.columns.astype(str)
        logger.info(f"Data loaded with shape: {df.shape}")
        logger.info(f"Sample columns: {list(df.columns[:5])}")

        train_loaders = {}
        for node_id, node_info in fog_nodes.items():
            station_ids = [str(sid) for sid in node_info['station_ids']]
            valid_stations = [sid for sid in station_ids if sid in df.columns]

            if not valid_stations:
                logger.warning(f"No valid stations found for node {node_id}")
                continue

            node_data = df[valid_stations]

            dataset = TrafficDataset(
                data=node_data.values,
                seq_len=config.seq_len,
                horizon=config.horizon,
                num_nodes=len(valid_stations)
            )

            train_loaders[node_id] = DataLoader(
                dataset,
                batch_size=config.local_batch_size,
                shuffle=True,
                num_workers=2,
                pin_memory=True if torch.cuda.is_available() else False
            )

            logger.info(f"Created dataloader for node {node_id} with {len(valid_stations)} stations")

        if not train_loaders:
            raise ValueError("No valid data loaders could be created")

        logger.info(f"Prepared data loaders for {len(train_loaders)} fog nodes")
        return train_loaders

    except Exception as e:
        logger.error(f"Failed to prepare data loaders: {e}")
        logger.error(f"Data info: {df.shape if 'df' in locals() else 'Not loaded'}")
        logger.error(f"Sample stations: {next(iter(fog_nodes.values()))['station_ids'][:5]}")
        raise

Cell 6: Model Setup and Main

In [None]:
def setup_dfl_with_agcrn(fog_nodes, train_loaders, config):
    """
    Setup DFL system with AGCRN models for each fog node
    """
    dfl_coordinator = DFLCoordinator(config)

    for node_id, fog_node in fog_nodes.items():
        if node_id not in train_loaders:
            continue

        num_nodes = len(fog_node['station_ids'])

        model = AGCRN(
            num_nodes=num_nodes,
            input_dim=1,
            hidden_dim=config.hidden_dim,
            output_dim=1,
            horizon=config.horizon,
            num_layers=config.num_layers,
            cheb_k=config.cheb_k,
            embed_dim=config.embed_dim
        ).to(device)

        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=config.learning_rate
        )

        client = LocalTrainingManager(
            model=model,
            optimizer=optimizer,
            train_loader=train_loaders[node_id],
            config=config
        )

        dfl_coordinator.add_client(node_id, client)
        logger.info(f"Added client {node_id} with {num_nodes} nodes")

    return dfl_coordinator

def main():
    # Load configuration
    config = DFLConfig()

    # Setup data and fog nodes
    fog_nodes = load_fog_nodes()
    train_loaders = prepare_data_loaders(fog_nodes, config)

    # Setup DFL system
    coordinator = setup_dfl_with_agcrn(fog_nodes, train_loaders, config)

    # Start federated learning
    coordinator.start_training()

if __name__ == "__main__":
    main()

2025-05-02 10:09:38,466 - INFO - DFL configuration initialized
INFO:AGCRN.dfl:DFL configuration initialized
2025-05-02 10:09:38,476 - INFO - Loaded 34 fog nodes
INFO:AGCRN.dfl:Loaded 34 fog nodes
2025-05-02 10:09:39,730 - INFO - Data loaded with shape: (34272, 207)
INFO:AGCRN.dfl:Data loaded with shape: (34272, 207)
2025-05-02 10:09:39,732 - INFO - Sample columns: ['773869', '767541', '767542', '717447', '717446']
INFO:AGCRN.dfl:Sample columns: ['773869', '767541', '767542', '717447', '717446']
2025-05-02 10:09:40,142 - INFO - Created dataloader for node 0 with 3 stations
INFO:AGCRN.dfl:Created dataloader for node 0 with 3 stations
2025-05-02 10:09:41,015 - INFO - Created dataloader for node 1 with 7 stations
INFO:AGCRN.dfl:Created dataloader for node 1 with 7 stations
2025-05-02 10:09:41,645 - INFO - Created dataloader for node 2 with 3 stations
INFO:AGCRN.dfl:Created dataloader for node 2 with 3 stations
2025-05-02 10:09:42,750 - INFO - Created dataloader for node 3 with 4 stations
I

# **Phase 4: Evaluation**
- Accuracy metrics
- Communication cost
- Convergence analysis

Cell 1: Import và Setup

In [None]:
import numpy as np
import torch
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import time
from typing import Dict, List, Tuple
import logging
from datetime import datetime

logger = logging.getLogger('AGCRN.evaluation')

class EvaluationConfig:
    def __init__(self):
        self.metrics_log_interval = 100  # batches
        self.save_dir = '/content/results/evaluation'
        self.plot_dir = f'{self.save_dir}/plots'

        os.makedirs(self.save_dir, exist_ok=True)
        os.makedirs(self.plot_dir, exist_ok=True)

        logger.info("Evaluation configuration initialized")

Cell 2: Accuracy Metrics

In [None]:
class AccuracyMetrics:
    def __init__(self, scaler=None):
        self.scaler = scaler
        self.metrics_history = {
            'mae': [],
            'rmse': [],
            'mape': [],
            'r2': []
        }

    def calculate_metrics(self, y_true: torch.Tensor, y_pred: torch.Tensor) -> Dict[str, float]:
        """
        Calculate all accuracy metrics
        """
        # Convert to numpy and rescale if scaler exists
        if self.scaler:
            y_true = self.scaler.inverse_transform(y_true.cpu().numpy())
            y_pred = self.scaler.inverse_transform(y_pred.cpu().numpy())
        else:
            y_true = y_true.cpu().numpy()
            y_pred = y_pred.cpu().numpy()

        # Calculate metrics
        mae = mean_absolute_error(y_true, y_pred)
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))

        # Calculate MAPE with handling for zeros
        mask = y_true != 0
        mape = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

        # Calculate R2 score
        r2 = r2_score(y_true, y_pred)

        metrics = {
            'mae': mae,
            'rmse': rmse,
            'mape': mape,
            'r2': r2
        }

        # Update history
        for metric, value in metrics.items():
            self.metrics_history[metric].append(value)

        return metrics

    def plot_metrics_history(self):
        """
        Plot metrics history
        """
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Training Metrics History')

        for ax, (metric, values) in zip(axes.flat, self.metrics_history.items()):
            ax.plot(values)
            ax.set_title(metric.upper())
            ax.set_xlabel('Batch')
            ax.grid(True)

        plt.tight_layout()
        plt.savefig(f'{EvaluationConfig().plot_dir}/metrics_history.png')
        plt.close()

Cell 3: Communication Cost Analysis

In [None]:
class CommunicationAnalyzer:
    def __init__(self):
        self.comm_history = {
            'bytes_sent': [],
            'bytes_received': [],
            'latency': [],
            'bandwidth_usage': []
        }
        self.start_time = time.time()

    def log_communication(self, message_size: int, direction: str, latency: float):
        """
        Log communication metrics
        """
        timestamp = time.time() - self.start_time

        if direction == 'sent':
            self.comm_history['bytes_sent'].append((timestamp, message_size))
        else:
            self.comm_history['bytes_received'].append((timestamp, message_size))

        self.comm_history['latency'].append((timestamp, latency))

        # Calculate bandwidth usage (bytes/second)
        bandwidth = message_size / latency if latency > 0 else 0
        self.comm_history['bandwidth_usage'].append((timestamp, bandwidth))

    def analyze_communication_cost(self) -> Dict[str, float]:
        """
        Analyze communication metrics
        """
        total_sent = sum(size for _, size in self.comm_history['bytes_sent'])
        total_received = sum(size for _, size in self.comm_history['bytes_received'])
        avg_latency = np.mean([lat for _, lat in self.comm_history['latency']])
        avg_bandwidth = np.mean([bw for _, bw in self.comm_history['bandwidth_usage']])

        return {
            'total_bytes_sent_mb': total_sent / (1024 * 1024),
            'total_bytes_received_mb': total_received / (1024 * 1024),
            'average_latency_ms': avg_latency * 1000,
            'average_bandwidth_mbps': avg_bandwidth * 8 / (1024 * 1024)
        }

    def plot_communication_metrics(self):
        """
        Plot communication metrics
        """
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Communication Metrics')

        # Plot bytes sent/received
        ax = axes[0, 0]
        sent_data = np.array(self.comm_history['bytes_sent'])
        received_data = np.array(self.comm_history['bytes_received'])
        ax.plot(sent_data[:, 0], sent_data[:, 1], label='Sent')
        ax.plot(received_data[:, 0], received_data[:, 1], label='Received')
        ax.set_title('Data Transfer')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Bytes')
        ax.legend()

        # Plot latency
        ax = axes[0, 1]
        latency_data = np.array(self.comm_history['latency'])
        ax.plot(latency_data[:, 0], latency_data[:, 1])
        ax.set_title('Latency')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Seconds')

        # Plot bandwidth usage
        ax = axes[1, 0]
        bandwidth_data = np.array(self.comm_history['bandwidth_usage'])
        ax.plot(bandwidth_data[:, 0], bandwidth_data[:, 1])
        ax.set_title('Bandwidth Usage')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Bytes/second')

        # Plot cumulative data transfer
        ax = axes[1, 1]
        cumsum_sent = np.cumsum(sent_data[:, 1])
        cumsum_received = np.cumsum(received_data[:, 1])
        ax.plot(sent_data[:, 0], cumsum_sent, label='Sent')
        ax.plot(received_data[:, 0], cumsum_received, label='Received')
        ax.set_title('Cumulative Data Transfer')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Total Bytes')
        ax.legend()

        plt.tight_layout()
        plt.savefig(f'{EvaluationConfig().plot_dir}/communication_metrics.png')
        plt.close()

Cell 4: Convergence Analysis

In [None]:
class ConvergenceAnalyzer:
    def __init__(self):
        self.loss_history = []
        self.gradient_norms = []
        self.parameter_changes = []
        self.convergence_metrics = {}

    def log_training_step(self, loss: float, model: torch.nn.Module,
                         prev_params: Optional[Dict[str, torch.Tensor]] = None):
        """
        Log training metrics for convergence analysis
        """
        self.loss_history.append(loss)

        # Calculate gradient norms
        grad_norm = 0
        for param in model.parameters():
            if param.grad is not None:
                grad_norm += param.grad.norm().item() ** 2
        self.gradient_norms.append(np.sqrt(grad_norm))

        # Calculate parameter changes if previous parameters available
        if prev_params is not None:
            param_change = 0
            current_params = dict(model.named_parameters())
            for name, prev_param in prev_params.items():
                param_change += torch.norm(
                    current_params[name] - prev_param
                ).item() ** 2
            self.parameter_changes.append(np.sqrt(param_change))

    def analyze_convergence(self) -> Dict[str, float]:
        """
        Analyze convergence metrics
        """
        # Calculate convergence rate
        if len(self.loss_history) > 1:
            loss_diffs = np.diff(self.loss_history)
            convergence_rate = np.mean(loss_diffs)
        else:
            convergence_rate = 0

        # Check for oscillations
        if len(self.loss_history) > 2:
            oscillation_metric = np.std(loss_diffs)
        else:
            oscillation_metric = 0

        # Calculate stability metrics
        gradient_stability = np.std(self.gradient_norms)

        self.convergence_metrics = {
            'convergence_rate': convergence_rate,
            'oscillation_metric': oscillation_metric,
            'gradient_stability': gradient_stability,
            'final_loss': self.loss_history[-1] if self.loss_history else None
        }

        return self.convergence_metrics

    def plot_convergence_analysis(self):
        """
        Plot convergence analysis
        """
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('Convergence Analysis')

        # Plot loss history
        ax = axes[0, 0]
        ax.plot(self.loss_history)
        ax.set_title('Loss History')
        ax.set_xlabel('Step')
        ax.set_ylabel('Loss')

        # Plot gradient norms
        ax = axes[0, 1]
        ax.plot(self.gradient_norms)
        ax.set_title('Gradient Norms')
        ax.set_xlabel('Step')
        ax.set_ylabel('L2 Norm')

        # Plot parameter changes
        ax = axes[1, 0]
        if self.parameter_changes:
            ax.plot(self.parameter_changes)
            ax.set_title('Parameter Changes')
            ax.set_xlabel('Step')
            ax.set_ylabel('L2 Norm')

        # Plot loss distribution
        ax = axes[1, 1]
        sns.histplot(self.loss_history, ax=ax)
        ax.set_title('Loss Distribution')

        plt.tight_layout()
        plt.savefig(f'{EvaluationConfig().plot_dir}/convergence_analysis.png')
        plt.close()

Cell 5: Evaluation Manager

In [None]:
class EvaluationManager:
    def __init__(self, config: EvaluationConfig):
        self.config = config
        self.accuracy_metrics = AccuracyMetrics()
        self.comm_analyzer = CommunicationAnalyzer()
        self.convergence_analyzer = ConvergenceAnalyzer()

    def log_batch(self, y_true: torch.Tensor, y_pred: torch.Tensor,
                  loss: float, model: torch.nn.Module,
                  message_size: int, latency: float,
                  prev_params: Optional[Dict[str, torch.Tensor]] = None):
        """
        Log metrics for a training batch
        """
        # Log accuracy metrics
        metrics = self.accuracy_metrics.calculate_metrics(y_true, y_pred)

        # Log communication costs
        self.comm_analyzer.log_communication(message_size, 'sent', latency)

        # Log convergence metrics
        self.convergence_analyzer.log_training_step(loss, model, prev_params)

        return metrics

    def generate_evaluation_report(self):
        """
        Generate comprehensive evaluation report
        """
        # Analyze all metrics
        accuracy_results = dict(zip(
            self.accuracy_metrics.metrics_history.keys(),
            [np.mean(values) for values in self.accuracy_metrics.metrics_history.values()]
        ))

        comm_results = self.comm_analyzer.analyze_communication_cost()
        convergence_results = self.convergence_analyzer.analyze_convergence()

        # Create report
        report = {
            'timestamp': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
            'accuracy_metrics': accuracy_results,
            'communication_metrics': comm_results,
            'convergence_metrics': convergence_results
        }

        # Save report
        report_path = f'{self.config.save_dir}/evaluation_report.json'
        with open(report_path, 'w') as f:
            json.dump(report, f, indent=4)

        # Generate plots
        self.accuracy_metrics.plot_metrics_history()
        self.comm_analyzer.plot_communication_metrics()
        self.convergence_analyzer.plot_convergence_analysis()

        logger.info(f"Evaluation report saved to {report_path}")
        return report

Cell 6: Usage Example

In [None]:
def main():
    # Initialize evaluation
    config = EvaluationConfig()
    evaluator = EvaluationManager(config)

    # Simulate training loop
    for batch_idx in range(100):
        # Simulate batch training
        y_true = torch.randn(32, 12, 207, 1)  # Example dimensions
        y_pred = torch.randn(32, 12, 207, 1)
        loss = torch.nn.functional.mse_loss(y_pred, y_true).item()

        # Log metrics
        metrics = evaluator.log_batch(
            y_true=y_true,
            y_pred=y_pred,
            loss=loss,
            model=model,  # Your AGCRN model
            message_size=1000000,  # Example size
            latency=0.1,  # Example latency
            prev_params=prev_params  # Previous model parameters
        )

        if batch_idx % config.metrics_log_interval == 0:
            logger.info(f"Batch {batch_idx}: {metrics}")

    # Generate final report
    report = evaluator.generate_evaluation_report()
    logger.info("Evaluation completed")

if __name__ == "__main__":
    main()