In [4]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics as sk
from sklearn.metrics import confusion_matrix
import networkx as nx
from torch_geometric.typing import SparseTensor
from torch_geometric.nn.conv.gcn_conv import gcn_norm
import torch.nn.functional as F
import torch

In [None]:
TRAIN_IMAGES_PATH = './data/images/train'
TEST_IMAGES_PATH = './data/images/test'
VAL_IMAGES_PATH = './data/images/val'

TRAIN_NPZ_FILE = './data/npz/train_images.npz'
TEST_NPZ_FILE = './data/npz/test_images.npz'
VAL_NPZ_FILE = './data/npz/val_images.npz'

NUM_FEATURES = 224 * 224 * 3

## Load Data

In [6]:
def load_npz_as_tensors(file_path):
    """
    Load the .npz files as tensors

    Args:
        file_path (string): To get the .npz file and load it as a tensor
    """
    data = np.load(file_path, allow_pickle=True)

    images = data['images']
    labels = data['labels']

    images_tensor = torch.tensor(images, dtype=torch.float32)
    labels_tensor = torch.tensor(labels, dtype=torch.long)

    return images_tensor, labels_tensor

## Process Data and Save Masked Layers

In [None]:
def process_data(sparsity):
    """
    Process and save the data in both sparse and dense formats.

    Args:
        sparsity (bool): Whether to process the dataset as sparse or dense
    """
    
    # Load training data
    train_images, train_labels = load_npz_as_tensors(TRAIN_NPZ_FILE)
    train_data = train_images.reshape(train_images.shape[0], NUM_FEATURES)
    num_train = train_images.shape[0]

    # Load validation data
    val_images, val_labels = load_npz_as_tensors(VAL_NPZ_FILE)
    val_data = val_images.reshape(val_images.shape[0], NUM_FEATURES)
    num_val = val_images.shape[0]

    # Load test data
    test_images, test_labels = load_npz_as_tensors(TEST_NPZ_FILE)
    test_data = test_images.reshape(test_images.shape[0], NUM_FEATURES)
    num_test = test_images.shape[0]

    # Concatenate train, validation, and test data
    num_data = num_train + num_val + num_test
    data_feat = np.concatenate((train_data, val_data, test_data), axis=0)
    data_label = np.concatenate((train_labels, val_labels, test_labels), axis=0).reshape(-1)

    # Construct and scale adjacency matrix
    adj_matrix = sk.pairwise.cosine_similarity(data_feat, data_feat)
    adj_matrix = (adj_matrix - adj_matrix.min())/(adj_matrix.max()-adj_matrix.min())

    # Apply sparsity thresholds
    threshold = 0.977 if sparsity else 0.970

    adj_matrix = adj_matrix > threshold

    # Generate masks
    train_mask = np.zeros(num_data, dtype=bool)
    train_mask[:num_train] = True
    val_mask = np.zeros(num_data, dtype=bool)
    val_mask[num_train:num_train + num_val] = True
    test_mask = np.zeros(num_data, dtype=bool)
    test_mask[num_train + num_val:] = True

    # Save masks, features, labels, and edge index
    suffix = 'sparse' if sparsity else 'dense'
    base_path = f"./data/npz/{suffix}"

    np.savez_compressed(f"{base_path}/train_mask.npz", train_mask=train_mask)
    np.savez_compressed(f"{base_path}/val_mask.npz", val_mask=val_mask)
    np.savez_compressed(f"{base_path}/test_mask.npz", test_mask=test_mask)
    np.savez_compressed(f"{base_path}/data_feat.npz", data_feat=data_feat)
    np.savez_compressed(f"{base_path}/data_label.npz", data_label=data_label)

    # Generate and save edge index
    edge_index = np.array([[i, j] for i in range(num_data) for j in range(num_data) if i != j and adj_matrix[i, j]])
    np.savez_compressed(f"{base_path}/edge_index.npz", edge_index=edge_index)

    print(f"View-({'Sparse' if sparsity else 'Dense'}) generated!")

In [None]:
# Call the function with sparsity parameter
process_data(sparsity=True)
process_data(sparsity=False)

## Load Data, Masks, and Print Statistics

In [None]:
def print_statistics(features, labels, edge_index, train_mask, val_mask, test_mask):
    """
    Print statistics of the dataset

    Args:
        features (np.ndarray): Node features
        labels (np.ndarray): Node labels
        edge_index (np.ndarray): Edge indices
        train_mask (torch.Tensor): Mask for training nodes
        val_mask (torch.Tensor): Mask for validation nodes
        test_mask (torch.Tensor): Mask for test nodes
    """
    print("=============== Dataset Types ==============")
    print(f"Type of features: {type(features)}")
    print(f"Type of labels: {type(labels)}")
    print(f"Type of edge_index: {type(edge_index)}")
    print(f"Type of train_mask: {type(train_mask)}")
    print(f"Type of val_mask: {type(val_mask)}")
    print(f"Type of test_mask: {type(test_mask)}")
    print("=============== Dataset Properties ==================")
    print(f"Total Nodes: {features.shape[0]}")
    print(f"Total Edges: {edge_index.shape[0]}")
    print(f"Number of Features: {features.shape[1]}")
    if labels.ndim == 1:
        print(f"Number of Labels: {labels.max() + 1}")
        print("Task Type: Multi-class Classification")
    else:
        print(f"Number of Labels: {labels.shape[1]}")
        print("Task Type: Multi-label Classification")
    print(f"Training Nodes: {train_mask.sum().item()}")
    print(f"Validation Nodes: {val_mask.sum().item()}")
    print(f"Testing Nodes: {test_mask.sum().item()}")
    print()

In [None]:
def get_dataset(sparse=True, balanced=True):
    """
    Load the dataset in either sparse or dense format

    Args:
        sparse (bool): Whether to load the sparse or dense version of the dataset
    """
    print(f"Loading Dataset")
    
    # Load masks
    suffix = 'sparse' if sparse else 'dense'
    train_mask = torch.tensor(np.load(f"./data/npz/{suffix}/train_mask.npz"))
    val_mask = torch.tensor(np.load(f"./data/npz/{suffix}/val_mask.npz"))
    test_mask = torch.tensor(np.load(f"./data/npz/{suffix}/test_mask.npz"))

    # Load labels
    labels = np.load(f"./data/npz/{suffix}/data_label.npz")

    # Load and normalize features
    labels = np.load(f"./data/npz/{suffix}/data_features.npz")
    features = sklearn.preprocessing.StandardScaler().fit_transform(features)

    # Load edge indices
    edge_index = np.load(f"./data/npz/{suffix}/edge_index.npz")

    # Print dataset statistics
    print_statistics(features, labels, edge_index, train_mask, val_mask, test_mask)

    if not balanced:
        all_labels = [0, 1, 2, 3, 4, 5]
        chosen_labels = [0, 1, 3, 5]

        print("[Before unbalancing] Class distribution in the training set:")
        for label in all_labels:
                count = np.sum(labels[train_mask] == label)
                print(f"Label {label}: {count} samples")
        print("[Before unbalancing] Class distribution in the validation set:")
        for label in all_labels:
                count = np.sum(labels[val_mask] == label)
                print(f"Label {label}: {count} samples")

        print("[Before unbalancing] Class distribution in the test set:")
        for label in all_labels:
                count = np.sum(labels[test_mask] == label)
                print(f"Label {label}: {count} samples")

        chosen_indices = np.where(np.isin(labels[train_mask], chosen_labels))[0]
        train_indices, test_indices = train_test_split(chosen_indices, test_size=0.8, stratify=labels[train_mask][chosen_indices])
                
        new_train_mask = torch.full_like(train_mask, False)
        new_train_mask[train_indices] = True

        for i, label in enumerate(labels):
                if label in chosen_labels and new_train_mask[i] == False:
                        train_mask[i] = False

        train_mask[train_indices] = True
        test_mask[test_indices] = True

        print("Class distribution in the training set:")
        for label in all_labels:
                count = np.sum(labels[train_mask] == label)
                print(f"Label {label}: {count} samples")

        print("Class distribution in the validation set:")
        for label in all_labels:
                count = np.sum(labels[val_mask] == label)
                print(f"Label {label}: {count} samples")

        print("Class distribution in the test set:")
        for label in all_labels:
                count = np.sum(labels[test_mask] == label)
                print(f"Label {label}: {count} samples")

        return features, labels, edge_index, train_mask, val_mask, test_mask

## Graphs and Graph Constructions

In [None]:
def construct_graph(x, y, edge_index, train_mask, val_mask, test_mask):
    """
    Construct a NetworkX graph from node features, labels, and edge information.

    Args:
        x (np.ndarray or torch.Tensor): Node features with shape (num_nodes, feature_dim).
        y (np.ndarray or torch.Tensor): Node labels with shape (num_nodes,).
        edge_index (torch.Tensor or list of tuples): Edge indices either in PyTorch Geometric format (2xN tensor) or standard edge list format.
        train_mask (np.ndarray or list): Boolean mask indicating training nodes.
        val_mask (np.ndarray or list): Boolean mask indicating validation nodes.
        test_mask (np.ndarray or list): Boolean mask indicating test nodes.

    Returns:
        nx.Graph: A NetworkX graph with nodes having attributes for features, labels, and masks, and edges with default weights.
    """
    # Construct NetworkX Graph
    nodes = [i for i in range(x.shape[0])]

    G = nx.Graph()

    # Add nodes with attributes
    for i in nodes:
        G.add_node(i, x=x[i], y=y[i], train=train_mask[i], 
                   val=val_mask[i], test=test_mask[i])
    
    # Handle edge_index input (PyTorch Geometric format)
    if isinstance(edge_index, torch.Tensor) and edge_index.dim() == 2 and edge_index.shape[0] == 2:
        edge_list = edge_index.t().tolist()
    else:
        # Assuming edge_index is in standard edge list format
        edge_list = edge_index

    # Add edges with a default weight of 1
    weighted_edges = [(edge[0], edge[1], 1) for edge in edge_list]
    G.add_weighted_edges_from(weighted_edges)

    return G

In [None]:
def split_graph(G, multilabel = True):
    """
    Split the graph into training, validation, and test sets.

    Args:
        G (nx.Graph): The input NetworkX graph with node attributes specifying train, val, and test masks.
        multilabel (bool): Flag indicating if the graph is multilabel. Defaults to True.

    Returns:
        tuple: A tuple containing:
            - x_train (np.ndarray or torch.Tensor): Node features for training nodes.
            - y_train (np.ndarray or torch.Tensor): Node labels for training nodes.
            - edge_train (np.ndarray or torch.Tensor): Edge indices for training nodes.
            - train_mask (np.ndarray or torch.Tensor): Boolean mask for training nodes.
            - x_val (np.ndarray or torch.Tensor): Node features for validation nodes.
            - y_val (np.ndarray or torch.Tensor): Node labels for validation nodes.
            - edge_val (np.ndarray or torch.Tensor): Edge indices for validation nodes.
            - val_mask (np.ndarray or torch.Tensor): Boolean mask for validation nodes.
            - x_test (np.ndarray or torch.Tensor): Node features for test nodes.
            - y_test (np.ndarray or torch.Tensor): Node labels for test nodes.
            - edge_test (np.ndarray or torch.Tensor): Edge indices for test nodes.
            - test_mask (np.ndarray or torch.Tensor): Boolean mask for test nodes.
    """
    print("Splitting Graph...")
    print("=============== Graph Splitting ===============")
    
    # Get complete test graph
    x_test, y_test, edge_test, _, _, test_mask = convert_graph_to_tensor(G, multilabel=multilabel)
    
    print(f"Unlabeled + Test + Validation + Training graph nodes: {x_test.shape[0]}")
    print(f"Unlabeled + Test + Validation + Training graph edges: {edge_test.shape[0]}")
    print(f"Total test nodes: {test_mask.sum()}")
    
    # Get training + val graph
    # remove all test nodes
    test_nodes = []
    for node in G.nodes(data=True):
        if node[1]['test']:
            test_nodes.append(node[0])
    G.remove_nodes_from(test_nodes)
    G = nx.convert_node_labels_to_integers(G, first_label=0, ordering='default')
    x_val, y_val, edge_val, _, val_mask, _ = convert_graph_to_tensor(G, multilabel=multilabel)
    
    print(f"Unlabeled + Validation + Training graph nodes: {x_val.shape[0]}")
    print(f"Unlabeled + Validation + Training graph edges: {edge_val.shape[0]}")
    print(f"Total val nodes: {val_mask.sum()}")
    # Get training graph
    # remove all val nodes
    val_nodes = []
    for node in G.nodes(data=True):
        if node[1]['val']:
            val_nodes.append(node[0])
    G.remove_nodes_from(val_nodes)
    G = nx.convert_node_labels_to_integers(G, first_label=0, ordering='default')
    
    x_train, y_train, edge_train, train_mask, _, _ = convert_graph_to_tensor(G, multilabel = multilabel)
    
    print(f"Unlabeled + Training graph nodes: {x_train.shape[0]}")
    print(f"Unlabeled + Training graph edges: {edge_train.shape[0]}")
    print(f"Total train nodes: {train_mask.sum()}")
    print()
    
    return (x_train, y_train, edge_train, train_mask, x_val, y_val, edge_val, 
            val_mask, x_test, y_test, edge_test, test_mask)

In [None]:
def convert_graph_to_tensor(G, multilabel = True):
    """
    Convert a NetworkX graph into tensors or numpy arrays for use in machine learning models.

    Args:
        G (nx.Graph): The input NetworkX graph with node attributes for features, labels, and masks.
        multilabel (bool): Flag indicating if the graph is multilabel. Defaults to True.

    Returns:
        tuple: A tuple containing:
            - x (np.ndarray or torch.Tensor): Node features.
            - y (np.ndarray or torch.Tensor): Node labels.
            - edge_index (np.ndarray or torch.Tensor): Edge indices.
            - train_mask (np.ndarray or torch.Tensor): Boolean mask for training nodes.
            - val_mask (np.ndarray or torch.Tensor): Boolean mask for validation nodes.
            - test_mask (np.ndarray or torch.Tensor): Boolean mask for test nodes.
    """
    x = np.empty((G.number_of_nodes(),G.nodes[0]['x'].shape[0]))
    if multilabel:
        y = np.empty((G.number_of_nodes(),G.nodes[0]['y'].shape[0]),dtype = 'int')
    else:
        y = np.empty((G.number_of_nodes(),),dtype = 'int')
        
    edge_index = np.array([edge for edge in G.edges()])
    train_mask = np.empty((G.number_of_nodes(),),dtype = 'bool')
    val_mask = np.empty((G.number_of_nodes(),),dtype = 'bool')
    test_mask = np.empty((G.number_of_nodes(),),dtype = 'bool')
    
    for node in G.nodes(data=True):
        x[node[0],:] = node[1]['x']
        if multilabel:
            y[node[0],:] = node[1]['y']
        else:
            y[node[0]] = node[1]['y']
        
        train_mask[node[0]] = node[1]['train']
        val_mask[node[0]] = node[1]['val']
        test_mask[node[0]] = node[1]['test']
    
    return x, y, edge_index, train_mask, val_mask, test_mask

In [None]:
def construct_normalized_adj(edge_index, num_nodes):
    """
    Construct a normalized adjacency matrix from edge indices.

    Args:
        edge_index (np.ndarray or torch.Tensor): Edge indices in the format [2, num_edges].
        num_nodes (int): Number of nodes in the graph.

    Returns:
        SparseTensor: Normalized adjacency matrix with self-loops added and GCN normalization applied.
    """
    edge_index = torch.tensor(edge_index)
    edge_index = torch.transpose(edge_index,0,1)
    edge_index_flip = torch.flip(edge_index,[0]) # re-adds flipped edges that were removed by networkx
    edge_index = torch.cat((edge_index, edge_index_flip), 1)
    adj = SparseTensor(row=edge_index[0], col=edge_index[1], sparse_sizes=(num_nodes,num_nodes))
    adj = adj.set_diag() # adding self loops
    adj = gcn_norm(adj, add_self_loops=False) # normalization

    return adj

## Get Metrics of the Training

In [None]:
def logit_to_label(out):
    """
    Convert logits to predicted labels using argmax

    Args:
        out (torch.Tensor): Logits tensor

    Returns:
        torch.Tensor: Predicated labels
    """
    return out.argmax(dim=1)

In [None]:
def metrics(logits, y):
    if y.dim() == 1: # Multi-class
        y_pred = logit_to_label(logits)
        cm = confusion_matrix(y.cup(), y_pred.cpu())
        FP = cm.sum(axis=0) - np.diag(cm)
        FN = cm.sum(axis=1) - np.diag(cm)
        TP = np.diag(cm)
        TN = cm.sum() - (FP + FN + TP)

        acc = np.diag(cm).sum() / cm.sum()
        micro_f1 = acc # accuracy for multi-class
        sens = TP.sum() / (TP.sum() + FN.sum())
        spec = TN.sum() / (TN.sum() + FP.sum())
    else: # Multi-label
        y_pred = logits >= 0
        y_true = y >= 0.5

        tp = int((y_true & y_pred).sum())
        tn = int((~y_true & ~y_pred).sum())
        fp = int((~y_true & y_pred).sum())
        fn = int((y_true & ~y_pred).sum())

        acc = (tp + tn)/(tp + fp + tn + fn)
        precision = tp / (tp + fp)
        recall = tp / (tp + fn)
        micro_f1 = 2 * (precision * recall) / (precision + recall)
        sens = (tp)/(tp + fn)
        spec = (tn)/(tn + fp)

    return acc, micro_f1, sens, spec

In [None]:
def edge_degree_centrality(graph):
    edge_degree = {}

    # Iterate over edges
    for edge in graph.edges():
        u, v = edge

        # Compute degrees of the nodes incident to the edges
        degree_u = graph.degree(u)
        degree_v = graph.degree(v)

        # Calculate average degree
        avg_degree = (degree_u + degree_v) / 2

        # Assign average degrees as edge degree centrality
        edge_degree[edge] = avg_degree
    return edge_degree

In [None]:
def edge_metric_compute(metric, graph):
    edges = {}

    # Iterate over edges
    for edge in graph.edges():
        u, v = edge

        metric_u = metric[u]
        metric_v = metric[v]

        avg_centrality = (metric_u + metric_v) / 2

        edges[edge] = avg_centrality

    return edges

In [None]:
def calculate_homophily_ratios(adj, x, y):
    homophily_ratios = []

    # Extract COO format components
    row, col, _ = adj.coo()

    for r,c in zip(row.tolist(), col.tolist()):
        # Extract features and labels using row and col indices
        x1, y1 = x[r], y[r]
        x2, y2 = x[c], y[c]

        # Calculate similarity in features (assuming features are numpy arrays)
        feature_similarity = np.dot(x1, x2) / (np.linalg.norm(x1) * np.linalg.norm(x2))

        # Calculate similarity in labels
        label_similarity = 1 if y1 = y2 else 0

        # Homophily ratio can be a combination of both similarities
        homophily_ratio = 0.5 * feature_similarity + 0.5 * label_similarity

        homophily_ratios.append(homophily_ratio)

    return homophily_ratios