In [1]:
import os
import pandas as pd
import numpy as np
import copy
from sklearn.metrics import accuracy_score
import random
from collections import defaultdict

import torch
import torch.nn.functional as F
import torch_geometric
import torch_geometric.nn as pyg_nn
import torch.nn as nn
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.data import Data, DataLoader

In [2]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')#


In [None]:
def adjacency_to_edge_index(adjacency_matrix):
    edge_index = []
    for i in range(adjacency_matrix.shape[0]):
        for j in range(adjacency_matrix.shape[1]):
            if adjacency_matrix[i, j] != 0:
                edge_index.append([i, j])
    return torch.tensor(edge_index, dtype=torch.long).t()


def load_graph_and_label(graph_file, label_file):
    features_df = pd.read_csv(graph_file, delimiter=',', header=None, skiprows=1, nrows=3)
    features_df = features_df.apply(pd.to_numeric, errors='coerce')

    if features_df.isnull().values.any():
        features_df = features_df.fillna(0)
    feature_tensor = torch.tensor(features_df.values, dtype=torch.float)
    adjacency_matrix = pd.read_csv(graph_file, delimiter=',', header=None, skiprows=5)
    adjacency_matrix_tensor = torch.tensor(adjacency_matrix.values, dtype=torch.long)

    edge_index = adjacency_to_edge_index(adjacency_matrix_tensor)


    #print(edge_index.shape)
    label = pd.read_csv(label_file, header=None).values[0,0]

    data = Data(x=feature_tensor, edge_index=edge_index, y=torch.tensor(label, dtype=torch.long))

    return data


In [None]:
label_mappings = {
    0: [0, 1, 2],
    1: [0, 2, 1],
    2: [1, 0, 2],
    3: [1, 2, 0],
    4: [2, 0, 1],
    5: [2, 1, 0]
}


In [None]:
gnn_graphs_dir = '/content/drive/MyDrive/GNN_Colab/sampled_gnn_data-Copy/gnn_graphs'
gnn_labels_dir = "/content/drive/MyDrive/GNN_Colab/sampled_gnn_data-Copy/gnn_labels"

"""
dataset = []
i=0
# Iterate through the files in the graphs directory
for filename in os.listdir(gnn_graphs_dir):
      if filename.endswith(".csv"):
          graph_file = os.path.join(gnn_graphs_dir, filename)
          label_file = os.path.join(gnn_labels_dir, filename.replace('_combined.csv', '_label.txt'))
          #/content/drive/MyDrive/GNN_Colab/gnn_data/gnn_graphs
          # Check if both the graph and label files exist
          if os.path.exists(graph_file) and os.path.exists(label_file):
              data = load_graph_and_label(graph_file, label_file)
              dataset.append(data)
          else:
              print(f"Files not found for: {filename} ",graph_file,label_file)"""

dataset_file_path = '/content/drive/MyDrive/GNN_Colab/final_full_dataset_file.pt'  # Specify the path to save your dataset
torch.save(dataset, dataset_file_path)


In [None]:
dataset = torch.load('/content/drive/MyDrive/GNN_Colab/final_full_dataset')

In [None]:
print(len(dataset))

In [None]:
random.shuffle(dataset)
dataset_groups = defaultdict(list)

for data in dataset:
    if data.edge_index.dim() == 2:
        num_edges = data.edge_index.size(1)
    else:
        num_edges = 0

    dataset_groups[num_edges].append(data)

for num_edges in dataset_groups:
    random.shuffle(dataset_groups[num_edges])


In [None]:
def dataset_to_dataframe(dataset):
    data_list = []

    for data in dataset:
        x_flattened = data.x.reshape(-1).numpy()
        data_dict = {'features': x_flattened}
        if hasattr(data, 'y'):
            data_dict['label'] = data.y.item()
        if hasattr(data, 'name'):
            data_dict['name'] = data.name
        data_list.append(data_dict)
    df = pd.DataFrame(data_list)
    return df

df = dataset_to_dataframe(dataset)

features_df = pd.DataFrame(df['features'].tolist(), index=df.index)
df_expanded = pd.concat([df.drop(columns=['features']), features_df], axis=1)

print(df.head())

output_file_path = '/content/drive/MyDrive/CAD_Project/extra_features_data'
df_expanded.to_csv(output_file_path, index=False)

In [None]:
for data in dataset:
    if hasattr(data, 'name'):
        del data.name

In [None]:
halved_dataset= dataset[:int(len(dataset) * 0.25)]
train_dataset = halved_dataset[:int(len(halved_dataset) * 0.8)]
test_dataset = halved_dataset[int(len(halved_dataset) * 0.2):]
acc_test_dataset = dataset#[int(len(dataset) * 0.2):]

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
acc_test_loader = DataLoader(acc_test_dataset, batch_size=32, shuffle=False)
data_loader = DataLoader(dataset, batch_size=32, shuffle=False)

In [None]:
import copy

dataset_modified = copy.deepcopy(dataset)

for data in dataset_modified:
    # Skips 5th and 6th elements
    data.x = torch.cat((data.x[:, :4], data.x[:, 6:-2]), dim=1)

In [None]:
class VariableOrderGNN(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(VariableOrderGNN, self).__init__()
        self.conv1 = pyg_nn.GCNConv(input_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.conv2 = pyg_nn.GCNConv(hidden_dim, hidden_dim * 2)
        self.bn2 = nn.BatchNorm1d(hidden_dim * 2)
        self.conv3 = pyg_nn.GCNConv(hidden_dim * 2, hidden_dim * 4)
        self.bn3 = nn.BatchNorm1d(hidden_dim * 4)
        self.fc1 = nn.Linear(hidden_dim * 4, hidden_dim * 2)
        self.fc2 = nn.Linear(hidden_dim * 2, 6)

        self.dropout = nn.Dropout(0.25)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        #data.edge_index = data.edge_index.t()
        x = F.relu(self.bn1(self.conv1(x, edge_index)))
        x = self.dropout(x)
        x = F.relu(self.bn2(self.conv2(x, edge_index)))
        x = self.dropout(x)
        x = F.relu(self.bn3(self.conv3(x, edge_index)))

        x = pyg_nn.global_mean_pool(x, batch)  # Aggregate node features to graph features

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

'\nimport torch\nimport torch.nn as nn\nimport torch.nn.functional as F\nimport torch_geometric.nn as pyg_nn\n\nclass VariableOrderGNN(nn.Module):\n    def __init__(self, input_dim, hidden_dim):\n        super(VariableOrderGNN, self).__init__()\n\n        # GCN layers\n        self.conv1 = pyg_nn.GCNConv(input_dim, hidden_dim)\n        self.bn1 = nn.BatchNorm1d(hidden_dim)\n        self.conv2 = pyg_nn.GCNConv(hidden_dim, hidden_dim * 2)\n        self.bn2 = nn.BatchNorm1d(hidden_dim * 2)\n        self.conv3 = pyg_nn.GCNConv(hidden_dim * 2, hidden_dim * 4)\n        self.bn3 = nn.BatchNorm1d(hidden_dim * 4)\n\n        # Dropout layer for regularization\n        self.dropout = nn.Dropout(0.25)\n\n        # Fully connected layers\n        self.fc1 = nn.Linear(hidden_dim * 4, hidden_dim * 4)\n        self.fc2 = nn.Linear(hidden_dim * 4, hidden_dim * 2)\n        self.fc3 = nn.Linear(hidden_dim * 2, hidden_dim)  # Additional layer\n        self.fc4 = nn.Linear(hidden_dim, hidden_dim // 2)  # A

In [None]:
model = VariableOrderGNN(input_dim=14, hidden_dim=5)
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
loss_fn = torch.nn.CrossEntropyLoss()

def train():
    model.train()
    total_loss = 0
    for data in train_loader:

        optimizer.zero_grad()
        output = model(data)
        #print("Model output shape:", output.shape)

        loss = loss_fn(output, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

for epoch in range(1, 50):
    loss = train()
    if epoch % 10 == 0:
        print(f'Epoch: {epoch}, Loss: {loss}')

Epoch: 10, Loss: 1.7838718730050165
Epoch: 20, Loss: 1.7828442765018655
Epoch: 30, Loss: 1.7814240736390634
Epoch: 40, Loss: 1.780410682372605


In [None]:
 def test_accuracy(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            output = model(data)
            _, predicted = torch.max(output, dim=1)
            total += data.y.size(0)
            correct += (predicted == data.y).sum().item()

    accuracy = correct / total
    return accuracy

# Test the model
test_acc = test_accuracy(acc_test_loader)
print(f'Test Accuracy: {test_acc:.4f}')


Test Accuracy: 0.2320
