# GNN Classification of Reeb Graphs from ModelNet10 Point Clouds

This notebook reproduces the GNN classification of Decorated Reeb Graphs extracted from ModelNet10 point clouds which is presented in Section 6 of our paper.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
import warnings
import itertools
import plotly.graph_objects as go

from sklearn.metrics import pairwise_distances
from sklearn import datasets
from sklearn.cluster import DBSCAN
from sklearn.decomposition import PCA
from sklearn.neighbors import kneighbors_graph
from sklearn.cluster import AgglomerativeClustering

from gtda.plotting import plot_point_cloud
from ripser import ripser
from persim import PersistenceImager, plot_diagrams
import gudhi as gd
from ot import fused_gromov_wasserstein2
import ot

import pickle

from DecoratedReebGraphs import *

import torch
from torch_geometric.data import Data
from torch_geometric.loader import DataLoader

## Loading Point Cloud Data for Visualizations

**Note:** You need to unzip the `modelNetPointClouds1024.pickle` file before running this.

In [None]:
with open("./Data/modelNetPointClouds1024.pickle", "rb") as f:
    full_dataset = pickle.load(f)
    
training_data_point_clouds = full_dataset['train_points']
testing_data_point_clouds = full_dataset['test_points']

In [None]:
training_labels = full_dataset['train_labels']

In [None]:
testing_labels = full_dataset['test_labels']

# Experiment 1: 1024 Points, Unnormalized, Height Function

Load precomputed DRGs.

In [None]:
with open("./Data/modelNetReebGraphs_1024_UnNormalized_height.pickle", "rb") as f:
    full_dataset_Reeb = pickle.load(f)

In [None]:
training_data = full_dataset_Reeb['train_data']
testing_data = full_dataset_Reeb['test_data']

### Visualizing the Reeb Graphs

In [None]:
class_id = 1
sample_number = 2
print('Class:',full_dataset['class_map'][class_id])

idx = np.where(full_dataset['test_labels']==class_id)[0][0] + sample_number

G = testing_data[idx]

plot_3d_graph(G)

In [None]:
plot_point_cloud(testing_data_point_clouds[idx,:,:])

## Preparing Data

In [None]:
from torch_geometric.utils import from_scipy_sparse_matrix

This version of node features includes:
- persistence statistics
- function value
- 3d coordinate of the node (averaged position of all real points corresponding to the node)
- number of real points corresponding to the node

In [None]:
def get_node_feature_matrix(graph,attribute = 'persistence_statistics'):
    
    return np.array([list(graph.nodes[node][attribute]) + [graph.nodes[node]['function value'],graph.nodes[node]['3d pos'][0],graph.nodes[node]['3d pos'][1],graph.nodes[node]['3d pos'][2],len(graph.nodes[node]['component indices'])] for node in graph.nodes])

In [None]:
def convert_to_torch_format(graph,label):
    
    adjacency_matrix = nx.adjacency_matrix(graph)
    edge_data = from_scipy_sparse_matrix(adjacency_matrix)
    x = get_node_feature_matrix(graph)
    y = label
    
    data = Data(x=torch.Tensor(x), edge_index=edge_data[0], edge_attr = edge_data[1], y=torch.tensor([y],dtype=torch.long))
    
    return data

Setting up data for the GNN.

In [None]:
batch_size = 512

training_data_list = [convert_to_torch_format(training_data[j],training_labels[j]) for j in range(len(training_data))]
train_loader = DataLoader(training_data_list, batch_size = batch_size, shuffle=True)

testing_data_list = [convert_to_torch_format(testing_data[j],testing_labels[j]) for j in range(len(testing_data))]
test_loader = DataLoader(testing_data_list, batch_size = len(testing_data), shuffle=False)

## Define Model and Train

We will use a simple 4 layer graph convolutional neural network. 

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

node_feature_size = 35 # Need to change, depending on feature style
num_data_classes = 10

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(node_feature_size, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, num_data_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

Training the model. 

In [None]:
%%time

model = GCN(hidden_channels=256)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.

         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.

train_accs = []
test_accs = []

num_epochs = 300
best_test_acc = 0

for epoch in range(1, num_epochs):
    train()
    train_acc = test(train_loader)
    train_accs.append(train_acc)
    test_acc = test(test_loader)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_model = model
    test_accs.append(test_acc)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
    
xs = list(range(1,num_epochs))    
plt.plot(xs, train_accs,label='Training Accuracy')
plt.plot(xs, test_accs,label='Testing Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'Max Testing Accuracy: {100*best_test_acc:.2f}%')

### Looking at (in)correctly classified categories

In [None]:
from sklearn.metrics import confusion_matrix

for data in test_loader:
    
    out = best_model(data.x,data.edge_index,data.batch)
    y_pred = out.argmax(dim=1)
    y_true = data.y

In [None]:
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))

In [None]:
# function from https://deeplizard.com/learn/video/0LhiS6yu2qQ
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
#         print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)

In [None]:
torch.save(best_model,'DRGNet1024_height.pth')

# Experiment 2: 1024 Points, Unnormalized, Width Function

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_width.pickle", "rb") as f:
    full_dataset_Reeb = pickle.load(f)

In [None]:
training_data = full_dataset_Reeb['train_data']
testing_data = full_dataset_Reeb['test_data']

In [None]:
batch_size = 512

training_data_list = [convert_to_torch_format(training_data[j],training_labels[j]) for j in range(len(training_data))]
train_loader = DataLoader(training_data_list, batch_size = batch_size, shuffle=True)

testing_data_list = [convert_to_torch_format(testing_data[j],testing_labels[j]) for j in range(len(testing_data))]
test_loader = DataLoader(testing_data_list, batch_size = len(testing_data), shuffle=False)

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

node_feature_size = 35 # Need to change, depending on feature style
num_data_classes = 10

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(node_feature_size, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, num_data_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

In [None]:
%%time

model = GCN(hidden_channels=256)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.

         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.

train_accs = []
test_accs = []

num_epochs = 300
best_test_acc = 0

for epoch in range(1, num_epochs):
    train()
    train_acc = test(train_loader)
    train_accs.append(train_acc)
    test_acc = test(test_loader)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_model = model
    test_accs.append(test_acc)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
    
xs = list(range(1,num_epochs))    
plt.plot(xs, train_accs,label='Training Accuracy')
plt.plot(xs, test_accs,label='Testing Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'Max Testing Accuracy: {100*best_test_acc:.2f}%')

In [None]:
for data in test_loader:
    out = best_model(data.x,data.edge_index,data.batch)
    y_pred = out.argmax(dim=1)
    y_true = data.y
    
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))
plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)

In [None]:
torch.save(best_model,'DRGNet1024_width.pth')

## Combining Predictions for height and width

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_height.pickle", "rb") as f:
    full_dataset_Reeb_height = pickle.load(f)

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_width.pickle", "rb") as f:
    full_dataset_Reeb_width = pickle.load(f)

In [None]:
testing_data_list_height = [convert_to_torch_format(testing_data_height[j],testing_labels[j]) for j in range(len(testing_data_height))]
test_loader_height = DataLoader(testing_data_list_height, batch_size = len(testing_data_height), shuffle=False)

testing_data_list_width = [convert_to_torch_format(testing_data_width[j],testing_labels[j]) for j in range(len(testing_data_width))]
test_loader_width = DataLoader(testing_data_list_width, batch_size = len(testing_data_width), shuffle=False)

In [None]:
height_model = torch.load('DRGNet1024_height.pth')

In [None]:
num_params = sum(p.numel() for p in height_model.parameters())
num_params

In [None]:
width_model = torch.load('DRGNet1024_width.pth')

In [None]:
for data in test_loader_height:
    out_height = height_model(data.x,data.edge_index,data.batch)
    
for data in test_loader_width:
    out_width = width_model(data.x,data.edge_index,data.batch)
    
out = out_height + out_width
y_pred = out.argmax(dim=1)
y_true = data.y

In [None]:
print(f'Combined Prediction: {np.round(100*(sum(y_pred == y_true)/len(y_true)).item(),2)}%')

In [None]:
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))
plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)
plt.savefig('BestClassificationConfusionMatrix.pdf',bbox_inches='tight')

## Experiment 3: Combining Height and Width in the Training Phase

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_height.pickle", "rb") as f:
    full_dataset_Reeb_height = pickle.load(f)

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_width.pickle", "rb") as f:
    full_dataset_Reeb_width = pickle.load(f)

In [None]:
training_data_height = full_dataset_Reeb_height['train_data']
testing_data_height = full_dataset_Reeb_height['test_data']

training_data_width = full_dataset_Reeb_width['train_data']
testing_data_width = full_dataset_Reeb_width['test_data']

In [None]:
def combine_DRGs(G,H):
    
    Gnew = nx.convert_node_labels_to_integers(G)
    Hnew = nx.convert_node_labels_to_integers(H,first_label = len(G))
    GH = nx.union(Gnew,Hnew)
    
    return GH

In [None]:
training_data = [combine_DRGs(training_data_height[j],training_data_width[j]) for j in range(len(training_data_height))]
testing_data = [combine_DRGs(testing_data_height[j],testing_data_width[j]) for j in range(len(testing_data_height))]

In [None]:
batch_size = 512

training_data_list = [convert_to_torch_format(training_data[j],training_labels[j]) for j in range(len(training_data))]
train_loader = DataLoader(training_data_list, batch_size = batch_size, shuffle=True)

testing_data_list = [convert_to_torch_format(testing_data[j],testing_labels[j]) for j in range(len(testing_data))]
test_loader = DataLoader(testing_data_list, batch_size = len(testing_data), shuffle=False)

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

node_feature_size = 35 # Need to change, depending on feature style
num_data_classes = 10

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(node_feature_size, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, num_data_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

In [None]:
%%time

model = GCN(hidden_channels=256)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.

         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.

train_accs = []
test_accs = []

num_epochs = 300
best_test_acc = 0

for epoch in range(1, num_epochs):
    train()
    train_acc = test(train_loader)
    train_accs.append(train_acc)
    test_acc = test(test_loader)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_model = model
    test_accs.append(test_acc)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
    
xs = list(range(1,num_epochs))    
plt.plot(xs, train_accs,label='Training Accuracy')
plt.plot(xs, test_accs,label='Testing Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'Max Testing Accuracy: {100*best_test_acc:.2f}%')

In [None]:
for data in test_loader:
    out = best_model(data.x,data.edge_index,data.batch)
    y_pred = out.argmax(dim=1)
    y_true = data.y
    
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))

plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)

In [None]:
torch.save(best_model,'DRGNet1024_height_and_width.pth')

# Ablation: Trying without topological attributes

In [None]:
with open("modelNetReebGraphs_1024_UnNormalized_height.pickle", "rb") as f:
    full_dataset_Reeb = pickle.load(f)

In [None]:
training_data = full_dataset_Reeb['train_data']
testing_data = full_dataset_Reeb['test_data']

In [None]:
def get_node_feature_matrix_ablation(graph,attribute = 'persistence_statistics'):
    
    return np.array([[graph.nodes[node]['function value'],graph.nodes[node]['3d pos'][0],graph.nodes[node]['3d pos'][1],graph.nodes[node]['3d pos'][2],len(graph.nodes[node]['component indices'])] for node in graph.nodes])

In [None]:
def convert_to_torch_format_ablation(graph,label):
    
    adjacency_matrix = nx.adjacency_matrix(graph)
    edge_data = from_scipy_sparse_matrix(adjacency_matrix)
    x = get_node_feature_matrix_ablation(graph)
    y = label
    
    data = Data(x=torch.Tensor(x), edge_index=edge_data[0], edge_attr = edge_data[1], y=torch.tensor([y],dtype=torch.long))
    
    return data

In [None]:
batch_size = 512

training_data_list = [convert_to_torch_format_ablation(training_data[j],training_labels[j]) for j in range(len(training_data))]
train_loader = DataLoader(training_data_list, batch_size = batch_size, shuffle=True)

testing_data_list = [convert_to_torch_format_ablation(testing_data[j],testing_labels[j]) for j in range(len(testing_data))]
test_loader = DataLoader(testing_data_list, batch_size = len(testing_data), shuffle=False)

## Define Model and Train

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

node_feature_size = 5
num_data_classes = 10

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(node_feature_size, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, num_data_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

In [None]:
model = GCN(hidden_channels=256)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.

         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.

train_accs = []
test_accs = []

num_epochs = 200
best_test_acc = 0

for epoch in range(1, num_epochs):
    train()
    train_acc = test(train_loader)
    train_accs.append(train_acc)
    test_acc = test(test_loader)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_model = model
    test_accs.append(test_acc)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
    
xs = list(range(1,num_epochs))    
plt.plot(xs, train_accs,label='Training Accuracy')
plt.plot(xs, test_accs,label='Testing Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'Max Testing Accuracy: {100*best_test_acc:.2f}%')

In [None]:
for data in test_loader:
    out = best_model(data.x,data.edge_index,data.batch)
    y_pred = out.argmax(dim=1)
    y_true = data.y
    
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))

plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)

In [None]:
torch.save(best_model,'DRGNet1024_only_Reeb.pth')

# Ablation: Trying without Graph Structure (bag of  persistence diagrams)

In [None]:
def get_node_feature_matrix_ablation2(graph,attribute = 'persistence_statistics'):
    
    return np.array([graph.nodes[node][attribute] for node in graph.nodes])

In [None]:
def convert_to_torch_format_ablation2(graph,label):
    
    adjacency_matrix = nx.adjacency_matrix(nx.complete_graph(len(graph)))
    edge_data = from_scipy_sparse_matrix(adjacency_matrix)
    x = get_node_feature_matrix_ablation2(graph)
    y = label
    
    data = Data(x=torch.Tensor(x), edge_index=edge_data[0], edge_attr = edge_data[1], y=torch.tensor([y],dtype=torch.long))
    
    return data

In [None]:
batch_size = 512

training_data_list = [convert_to_torch_format_ablation2(training_data[j],training_labels[j]) for j in range(len(training_data))]
train_loader = DataLoader(training_data_list, batch_size = batch_size, shuffle=True)

testing_data_list = [convert_to_torch_format_ablation2(testing_data[j],testing_labels[j]) for j in range(len(testing_data))]
test_loader = DataLoader(testing_data_list, batch_size = len(testing_data), shuffle=False)

## Define Model and Train

In [None]:
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool

node_feature_size = 30
num_data_classes = 10

class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(node_feature_size, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, num_data_classes)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)
        x = x.relu()
        x = self.conv4(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

In [None]:
model = GCN(hidden_channels=256)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.

         out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.

train_accs = []
test_accs = []

num_epochs = 200
best_test_acc = 0

for epoch in range(1, num_epochs):
    train()
    train_acc = test(train_loader)
    train_accs.append(train_acc)
    test_acc = test(test_loader)
    if test_acc > best_test_acc:
        best_test_acc = test_acc
        best_model = model
    test_accs.append(test_acc)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')
    
xs = list(range(1,num_epochs))    
plt.plot(xs, train_accs,label='Training Accuracy')
plt.plot(xs, test_accs,label='Testing Accuracy')
plt.legend()
plt.show()

In [None]:
print(f'Max Testing Accuracy: {100*best_test_acc:.2f}%')

In [None]:
for data in test_loader:
    out = best_model(data.x,data.edge_index,data.batch)
    y_pred = out.argmax(dim=1)
    y_true = data.y
    
conf_mat = confusion_matrix(np.array(y_true), np.array(y_pred))

plt.figure(figsize=(8,8))
plot_confusion_matrix(conf_mat, list(full_dataset['class_map'].values()), normalize=True)

In [None]:
torch.save(best_model,'DRGNet1024_only_Diagrams.pth')