In [1]:
import warnings
import numpy as np
import pandas as pd
from tqdm import tqdm
from torch_geometric.data import Data
from torch_geometric.utils import to_undirected, train_test_split_edges
from sklearn.preprocessing import LabelEncoder

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Linear
from torch import Tensor
from torch_geometric.utils import to_networkx
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from collections import Counter

In [3]:
# validartion
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, matthews_corrcoef

In [4]:
# set seed
torch.manual_seed(42)
np.random.seed(42)
warnings.filterwarnings('ignore')

In [5]:
def pre_processing(df_graph, 
                   df_features, 
                   col_features,
                   col_target='severity'):
    # create node
    df_features['node'] = np.arange(0, df_features.shape[0])

    # select graph with same node features
    nodes_of_features = list(df_features.leg_pos.unique())
    df_graph_subsample = df_graph.query(" pos1 in @nodes_of_features and pos2 in @nodes_of_features ")
    
    # sync nodes
    df_graph_subsample['src'] = pd.NA
    df_graph_subsample['dst'] = pd.NA
    for i in tqdm(range(df_features.shape[0])):
        node_emb, node = df_features[['leg_pos', 'node']].values[i]
        df_graph_subsample['src'][df_graph_subsample.query(f" pos1 == '{node_emb}' ").index] = node
        df_graph_subsample['dst'][df_graph_subsample.query(f" pos2 == '{node_emb}' ").index] = node

    print(df_graph_subsample.info())
    # subsample graph
    df_graph_subsample = df_graph_subsample.astype({'src': int, 'dst': int})
    # 
    # define x features and target
    #col_features = ['relSESA','consurf_old']
    # 
    pos = df_features.leg_pos.values
    x = torch.tensor(df_features[col_features].values,  dtype=torch.float)
    #y = torch.tensor(df_features[col_target].values, dtype=torch.long)
    # index of graph
    _edge_index = torch.tensor(df_graph_subsample[['src', 'dst']].values, dtype=torch.long)
    
    # encoder target
    encoder = LabelEncoder()
    df_graph_subsample[col_target] = encoder.fit_transform(df_graph_subsample[col_target].values).astype(float)
    
    edge_labels = torch.tensor(df_graph_subsample[col_target].values, dtype=torch.long)
    # weigths edges
    edge_w = torch.tensor(df_graph_subsample['weight'].values, dtype=torch.float)

    return x, edge_labels, _edge_index, edge_w, pos

In [6]:
df_v = pd.read_parquet('df_features.parquet')
df_e = pd.read_parquet('df_edges.parquet')

df_v = df_v.drop(['vm', 'target'], axis=1)
df_v = df_v.rename({'ponto': 'leg_pos'}, axis=1)
df_v = df_v.astype({'leg_pos': str})
df_e = df_e.drop(['vm_edge'], axis=1)
df_v.head()

FileNotFoundError: [Errno 2] No such file or directory: 'df_features.parquet'

In [7]:
f_selected = ['carregamento', 'desce', 'linha', 'sobe', 'trip_id', 'veiculo']

In [None]:
df_e.head()
df_e = df_e.rename({'src': 'pos1', 'dst': 'pos2', 'loader': 'weight'}, axis=1)
df_e = df_e.astype({'pos1': str, 'pos2': str})
df_e.head()

In [None]:
xa, edge_labelsa, edge_indexa, edge_wa, posa =  pre_processing(df_e, 
                                                              df_v, 
                                                              f_selected,
                                                              col_target='target')

In [None]:
# Ensure the graph is undirected
#edge_index = to_undirected(edge_indexa)

In [None]:
edge_labelsa.shape, edge_indexa.shape

In [None]:
edge_labelsa.shape

In [None]:
edge_labels_oh = torch.nn.functional.one_hot(edge_labelsa)
edge_labels_oh.shape

In [None]:
edge_labels_oh = edge_labels_oh.float()
edge_labels_oh

In [None]:
data = Data(x=xa, 
            edge_index=edge_indexa.t().contiguous())

In [None]:
data

In [None]:
# Split edges into train and test sets
data = train_test_split_edges(data, val_ratio=0.0, test_ratio=0.2)

In [None]:
data

In [None]:
type(data.x)

In [None]:
# Extract train and test edge indices and labels
train_edge_index = data.train_pos_edge_index
test_edge_index = data.test_pos_edge_index

# For simplicity, use the same labels for train and test (replace with actual labels if available)
train_edge_labels = edge_labels_oh[:train_edge_index.size(1)]
test_edge_labels  = edge_labels_oh[:test_edge_index.size(1)]

In [None]:
class GCNEdgeClassifier(torch.nn.Module):
    
    def __init__(self, 
                 in_channels, 
                 hidden_channels, 
                 out_channels, 
                 edge_hidden_dim, 
                 num_edge_classes):
        
        super(GCNEdgeClassifier, self).__init__()
        # vertex
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, out_channels)
        
        # MLP to 
        self.fc1 = torch.nn.Linear(out_channels*2, edge_hidden_dim)
        self.fc2 = torch.nn.Linear(edge_hidden_dim, num_edge_classes)
        
    def encoder(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return x
        
    def decoder(self, node_embeddings, edge_index):
        src, tgt = edge_index
        edge_features = torch.cat((node_embeddings[src], node_embeddings[tgt]), dim=1)
        
        x = self.fc1(edge_features)
        x = F.relu(x)
        x = self.fc2(x)
        
        return x

    def forward(self, x, edge_index):
        
        z = self.encoder(x, edge_index)
        
        out = self.decoder(z, edge_index)

        return out

In [None]:
# Hyperparameters
in_channels = data.x.shape[1]
hidden_channels = 16
node_embedding_dim = 16
edge_hidden_dim = 8
num_edge_classes = edge_labels_oh.shape[1]  # Example: binary classification

# Initialize model
model = GCNEdgeClassifier(in_channels, 
                  hidden_channels, 
                  node_embedding_dim, 
                  edge_hidden_dim, 
                  num_edge_classes)

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
# Training loop
scores = []
for epoch in tqdm(range(500)):  # Number of epochs
    
    model.train()
    optimizer.zero_grad()

    # Forward pass through GCN to get node embeddings
    train_edge_predictions = model(data.x, train_edge_index)
    
    # Compute loss
    loss = F.cross_entropy(train_edge_predictions, train_edge_labels)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()
    
    scores.append(loss.item())

    #print(f'Epoch {epoch+1}, Loss: {loss.item()}')

print(f'Training completed with init loss {scores[0]} and last loss: {scores[-1]}.')

## Test

In [None]:
model.eval()
#edge_classifier.eval()

# Forward pass through GCN to get node embeddings
test_edge_predictions = model(data.x, test_edge_index)

# Get edge features from node embeddings for test edges
#test_edge_features = get_edge_features(node_embeddings, test_edge_index)

# Classify test edges
#test_edge_predictions = edge_classifier(test_edge_features)

y_true = test_edge_labels.argmax(dim=1)
y_pred = test_edge_predictions.argmax(dim=1)

# Example evaluation metric: accuracy
correct = (y_pred == y_true).sum()
accuracy = int(correct) / test_edge_labels.size(0)

print(f'Accuracy: {accuracy:.4f}')

In [None]:
# Compute confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(conf_matrix)

# Compute MCC
mcc = matthews_corrcoef(y_true, y_pred)
print("MCC:", mcc)

In [None]:
Counter(y_true.numpy())

In [None]:
# Create a heatmap plot of the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, 
            fmt='d', cmap='Blues', 
            xticklabels=['0', '1', '2', '3'], 
            yticklabels=['0', '1', '2', '3'])
plt.xlabel('Predicted Labels')
plt.ylabel('Actual Labels')
plt.title('Confusion Matrix')
plt.show()