In [132]:
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from torch_geometric.data import Data
from torch_geometric.nn import SAGEConv, BatchNorm
import json
import networkx as nx
from torch import nn
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, adjusted_rand_score, normalized_mutual_info_score

In [127]:
embedding = np.load("./ds/v1/graph2vec_embeddings.npy")
labels = np.load("./ds/v1/graph2vec_labels.npy")

In [120]:
class NodeClassificationGNN(nn.Module):
    def __init__(self, input_dim, hidden_channel, num_classes, dropout=0.3, activation=nn.LeakyReLU):
        super().__init__()
        self.activation = activation()  # Initialize activation function
        self.conv1 = SAGEConv(input_dim, hidden_channel)
        self.bn1 = BatchNorm(hidden_channel)  # BatchNorm for first layer
        self.conv2 = SAGEConv(hidden_channel, hidden_channel)
        self.bn2 = BatchNorm(hidden_channel)  # BatchNorm for second layer
        self.dropout = nn.Dropout(dropout)
        
        # MLP for final classification
        self.mlp = nn.Sequential(
            nn.Linear(hidden_channel, hidden_channel),
            self.activation,
            nn.Dropout(dropout),
            nn.Linear(hidden_channel, num_classes)
        )

    def forward(self, x, edge_index):
        # First convolutional layer with batch normalization and activation
        x = self.conv1(x, edge_index)
        x = self.bn1(x)
        x = self.activation(x)
        x = self.dropout(x)
        
        # Second convolutional layer with batch normalization and activation
        x = self.conv2(x, edge_index)
        x = self.bn2(x)
        x = self.activation(x)
        x = self.dropout(x)
        
        # Pass through the MLP for final predictions
        x = self.mlp(x)
        return x


In [124]:
def build_similarity_graph(embeddings, threshold):
    num_nodes = embeddings.shape[0]
    similarity_matrix = cosine_similarity(embeddings)

    G = nx.Graph()
    for i in range(num_nodes):
        G.add_node(i, embedding=embeddings[i])

    for i in range(num_nodes):
        for j in range(i + 1, num_nodes):
            if similarity_matrix[i, j] >= threshold:
                G.add_edge(i, j, weight=similarity_matrix[i, j])

    return G

def convert_to_torch_geometric(graph):
    node_features = np.array([graph.nodes[n]['embedding'] for n in graph.nodes])
    edge_index = np.array(list(graph.edges)).T
    edge_weights = np.array([graph[u][v]['weight'] for u, v in graph.edges])

    x = torch.tensor(node_features, dtype=torch.float)
    edge_index = torch.tensor(edge_index, dtype=torch.long)
    edge_weights = torch.tensor(edge_weights, dtype=torch.float)

    data = Data(x=x, edge_index=edge_index, edge_attr=edge_weights)

    return data

def split_data(data, labels, train_ratio=0.7, val_ratio=0.2):
    train_idx, temp_idx, train_labels, temp_labels = train_test_split(
        np.arange(len(labels)), labels, test_size=(1 - train_ratio), stratify=labels
    )
    val_size = val_ratio / (1 - train_ratio)
    val_idx, test_idx, val_labels, test_labels = train_test_split(
        temp_idx, temp_labels, test_size=(1 - val_size), stratify=temp_labels
    )
    return train_idx, val_idx, test_idx


In [128]:
with open('/Users/arjuns/Downloads/model_results/v2/working_dir/results/cv_results.json', 'r') as f:
    results = json.load(f)

best_threshold = results['best_threshold']
print(f"Using best threshold: {best_threshold}")

model_path = f'/Users/arjuns/Downloads/model_results/v2/working_dir/checkpoints/best_model_threshold_{best_threshold:.2f}.pt'
checkpoint = torch.load(model_path, map_location='cpu')

input_dim = 128
hidden_channels = 256
out_ch = len(set(labels))

model = NodeClassificationGNN(input_dim, hidden_channels, out_ch, dropout=0.7)
model.load_state_dict(checkpoint['model_state_dict'])

sim_graph = build_similarity_graph(embedding, best_threshold)
data = convert_to_torch_geometric(sim_graph)

train_idx, val_idx, test_idx = split_data(data, labels)
sim_graph = build_similarity_graph(embedding, best_threshold)
data = convert_to_torch_geometric(sim_graph)
data.y = torch.tensor(labels, dtype=torch.long)

train_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
val_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
test_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
train_mask[train_idx] = True
val_mask[val_idx] = True
test_mask[test_idx] = True
data.train_mask = train_mask
data.val_mask = val_mask
data.test_mask = test_mask

model.eval()
out = model(data.x, data.edge_index)
pred = out.argmax(dim=-1)
correct = pred[data.test_mask] == data.y[data.test_mask]
acc = correct.sum().item() / correct.size(0)
print(f"Accuracy: {acc:.4f}")

f1 = f1_score(data.y[data.test_mask].numpy(), pred[data.test_mask].numpy(), average='micro')
print(f"F1 Score: {f1:.4f}")

Using best threshold: 0.11000000000000001


  checkpoint = torch.load(model_path, map_location='cpu')


Accuracy: 0.9127
F1 Score: 0.9050


In [133]:
adjusted_rand_score_val = adjusted_rand_score(data.y[data.test_mask].numpy(), pred[data.test_mask].numpy())
print(f"Adjusted Rand Score: {adjusted_rand_score_val:.4f}")

nmi = normalized_mutual_info_score(data.y[data.test_mask].numpy(), pred[data.test_mask].numpy())
print(f"Normalized Mutual Information: {nmi:.4f}")

Adjusted Rand Score: 0.8379
Normalized Mutual Information: 0.9043
