In [26]:
import itertools
import random

import numpy as np
import torch
import torch.nn.functional as F
from torch.nn import Linear, Sequential
from torch_geometric.nn import GCNConv, GINConv
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import global_mean_pool, global_max_pool, global_add_pool

In [27]:
triangle_array = np.array([[0, 1, 0, 2, 1, 2],
                           [1, 0, 2, 0, 2, 1]])
edge_index_triangle = torch.tensor(triangle_array, dtype=torch.long)
x_triangle = torch.tensor([[1], [1], [1]], dtype=torch.float)
data_triangle = Data(x=x_triangle, edge_index=edge_index_triangle)

square_array = np.array([[0, 1, 0, 3, 1, 2, 2, 3],
                         [1, 0, 3, 0, 2, 1, 3, 2]])
edge_index_square = torch.tensor(square_array, dtype=torch.long)
x_square = torch.tensor([[1], [1], [1], [1]], dtype=torch.float)
data_square = Data(x=x_square, edge_index=edge_index_square)

penta_array = np.array([[0, 1, 0, 4, 1, 2, 2, 3, 3, 4],
                        [1, 0, 4, 0, 2, 1, 3, 2, 4, 3]])
edge_index_penta = torch.tensor(penta_array, dtype=torch.long)
x_penta = torch.tensor([[1], [1], [1], [1], [1]], dtype=torch.float)
data_penta = Data(x=x_penta, edge_index=edge_index_penta)


star4_array = np.array([[0, 1, 1, 2, 1, 3],
                        [1, 0, 2, 1, 3, 1]])
edge_index_star4 = torch.tensor(star4_array, dtype=torch.long)
x_star4 = torch.tensor([[1], [1], [1], [1]], dtype=torch.float)
data_star = Data(x=x_star4, edge_index=edge_index_star4)

star5_array = np.array([[0, 1, 1, 2, 1, 3, 1, 4],
                        [1, 0, 2, 1, 3, 1, 4, 1]])
edge_index_star5 = torch.tensor(star4_array, dtype=torch.long)
x_star5 = torch.tensor([[1], [1], [1], [1], [1]], dtype=torch.float)
data_star5 = Data(x=x_star5, edge_index=edge_index_star5)

In [28]:
def generate_renamings(vertex_ls, graph_arr):
    all_renamed = []
    renames = itertools.permutations(vertex_ls)
    for rename in renames:
        renaming_dict = {i: j for i, j in zip(vertex_ls, rename)}
        renamed = np.zeros(graph_arr.shape)
        for i, row in enumerate(graph_arr):
            renamed[i] = np.array([renaming_dict.get(number) for number in row])
        all_renamed.append(renamed)
    return all_renamed

In [43]:
dataset = []
labels = []

print(len(generate_renamings([0,1,2], triangle_array)))
for renaming in generate_renamings([0,1,2], triangle_array):
    edge_index_triangle = torch.tensor(renaming, dtype=torch.long)
    data_triangle = Data(x=x_triangle, edge_index=edge_index_triangle, y=0)
    dataset.append(data_triangle)
    
print(len(generate_renamings([0,1,2,3], square_array)))
for renaming in generate_renamings([0,1,2,3], square_array):
    edge_index_square = torch.tensor(renaming, dtype=torch.long)
    data_square = Data(x=x_square, edge_index=edge_index_square, y=0)
    dataset.append(data_square)

print(len(generate_renamings([0,1,2,3,4], penta_array)))
for renaming in generate_renamings([0,1,2,3,4], penta_array):
    edge_index_penta = torch.tensor(renaming, dtype=torch.long)
    data_penta = Data(x=x_penta, edge_index=edge_index_penta, y=0)
    dataset.append(data_penta)    
    
print(len(generate_renamings([0,1,2,3], star4_array)))
for renaming in generate_renamings([0,1,2,3], star4_array):
    edge_index_star4 = torch.tensor(renaming, dtype=torch.long)
    data_star4 = Data(x=x_star4, edge_index=edge_index_star4, y=0)
    dataset.append(data_star4)

print(len(generate_renamings([0,1,2,3,4], star5_array)))
for renaming in generate_renamings([0,1,2,3,4], star5_array):
    edge_index_star5 = torch.tensor(renaming, dtype=torch.long)
    data_star5 = Data(x=x_star5, edge_index=edge_index_star5, y=1)
    dataset.append(data_star5)

print('length of dataset:', len(dataset))
train_size = round(0.8*len(dataset))
dataset = random.sample(dataset, len(dataset))
# random.shuffle(dataset)

train_data = dataset[:train_size]
test_data = dataset[train_size:]
# loader = DataLoader(dataset, batch_size=32, shuffle=True)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_data, batch_size=64, shuffle=True)

6
24
120
24
120
length of dataset: 294


In [47]:
class GIN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GIN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GINConv(Linear(1, hidden_channels))
        self.conv2 = GINConv(Linear(hidden_channels, hidden_channels))
        self.conv3 = GINConv(Linear(hidden_channels, hidden_channels))
        self.lin = Linear(hidden_channels, 2)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
#         x = x.relu()
#         x = self.conv3(x, edge_index)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GIN(hidden_channels=64)
print(model)

GIN(
  (conv1): GINConv(nn=Linear(in_features=1, out_features=64, bias=True))
  (conv2): GINConv(nn=Linear(in_features=64, out_features=64, bias=True))
  (conv3): GINConv(nn=Linear(in_features=64, out_features=64, bias=True))
  (lin): Linear(in_features=64, out_features=2, bias=True)
)


In [49]:
model = GIN(hidden_channels=32)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.
        out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
        loss = criterion(out, data.y)  # Compute the loss.
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        optimizer.zero_grad()  # Clear gradients.
    print(loss.item())

def test(loader):
    model.eval()
    misclass = []

    correct = 0
    for data in loader:  # Iterate in batches over the training/test dataset.
        out = model(data.x, data.edge_index, data.batch)  
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        correct += int((pred == data.y).sum())  # Check against ground-truth labels.
        misclass += list(data[pred != data.y])
    return correct / len(loader.dataset), misclass  # Derive ratio of correct predictions.


for epoch in range(1, 101):
    train()
    train_acc, trM = test(train_loader)
    test_acc, testM = test(test_loader)
#     print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}')
    if epoch % 1 == 0:
        print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

print(len(trM))
for datapoint in trM:
    print(datapoint.edge_index)
    print(datapoint.y)
        
print(len(testM))
for datapoint in testM:
    print(datapoint.edge_index)
    print(datapoint.y)

1.1974271535873413
Epoch: 001, Train Acc: 0.6043, Test Acc: 0.5424
0.7413402199745178
Epoch: 002, Train Acc: 0.3957, Test Acc: 0.4576
0.636299192905426
Epoch: 003, Train Acc: 0.6043, Test Acc: 0.5424
0.6534236669540405
Epoch: 004, Train Acc: 0.6043, Test Acc: 0.5424
0.6590278148651123
Epoch: 005, Train Acc: 0.6043, Test Acc: 0.5424
0.6384422183036804
Epoch: 006, Train Acc: 1.0000, Test Acc: 1.0000
0.6053441166877747
Epoch: 007, Train Acc: 0.6043, Test Acc: 0.5424
0.6027304530143738
Epoch: 008, Train Acc: 0.6043, Test Acc: 0.5424
0.43549609184265137
Epoch: 009, Train Acc: 1.0000, Test Acc: 1.0000
0.44496798515319824
Epoch: 010, Train Acc: 1.0000, Test Acc: 1.0000
0.3796491324901581
Epoch: 011, Train Acc: 1.0000, Test Acc: 1.0000
0.3043319880962372
Epoch: 012, Train Acc: 0.9191, Test Acc: 0.9153
0.2521142363548279
Epoch: 013, Train Acc: 0.9191, Test Acc: 0.9153
0.1712341010570526
Epoch: 014, Train Acc: 0.9191, Test Acc: 0.9153
0.16135990619659424
Epoch: 015, Train Acc: 1.0000, Test Acc: 

In [20]:
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(1, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, 2)

    def forward(self, x, edge_index, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
#         x = x.relu()
#         x = self.conv3(x, edge_index)

        # 2. Readout layer
#         x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]
        x = global_add_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GCN(hidden_channels=64)
print(model)

GCN(
  (conv1): GCNConv(1, 64)
  (conv2): GCNConv(64, 64)
  (conv3): GCNConv(64, 64)
  (lin): Linear(in_features=64, out_features=2, bias=True)
)


In [131]:
model = GCN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.
        out = model(data.x, data.edge_index, data.batch)  # Perform a single forward pass.
        loss = criterion(out, data.y)  # Compute the loss.
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        optimizer.zero_grad()  # Clear gradients.
    print(loss.item())

def test(loader):
    model.eval()
    misclass = []

    correct = 0
    for data in loader:  # Iterate in batches over the training/test dataset.
        out = model(data.x, data.edge_index, data.batch)  
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        correct += int((pred == data.y).sum())  # Check against ground-truth labels.
        misclass += list(data[pred != data.y])
    return correct / len(loader.dataset), misclass  # Derive ratio of correct predictions.


for epoch in range(1, 101):
    train()
    train_acc, trM = test(train_loader)
    test_acc, testM = test(test_loader)
#     print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}')
    if epoch % 1 == 0:
        print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

print(len(trM))
for datapoint in trM:
    print(datapoint.edge_index)
    print(datapoint.y)
        
print(len(testM))
for datapoint in testM:
    print(datapoint.edge_index)
    print(datapoint.y)

0.6450372934341431
Epoch: 001, Train Acc: 0.5745, Test Acc: 0.6610
0.6301229596138
Epoch: 002, Train Acc: 0.5745, Test Acc: 0.6610
0.8484597206115723
Epoch: 003, Train Acc: 0.4255, Test Acc: 0.3390
0.7093400955200195
Epoch: 004, Train Acc: 0.5745, Test Acc: 0.6610
0.7700427770614624
Epoch: 005, Train Acc: 0.5745, Test Acc: 0.6610
0.6686123013496399
Epoch: 006, Train Acc: 0.5745, Test Acc: 0.6610
0.6830257177352905
Epoch: 007, Train Acc: 0.5745, Test Acc: 0.6610
0.6735721826553345
Epoch: 008, Train Acc: 0.5745, Test Acc: 0.6610
0.7066959142684937
Epoch: 009, Train Acc: 0.5745, Test Acc: 0.6610
0.6703315377235413
Epoch: 010, Train Acc: 0.5745, Test Acc: 0.6610
0.7121647000312805
Epoch: 011, Train Acc: 0.5745, Test Acc: 0.6610
0.6695734262466431
Epoch: 012, Train Acc: 0.5745, Test Acc: 0.6610
0.6574337482452393
Epoch: 013, Train Acc: 0.5745, Test Acc: 0.6610
0.633428156375885
Epoch: 014, Train Acc: 0.5745, Test Acc: 0.6610
0.6236926913261414
Epoch: 015, Train Acc: 0.5745, Test Acc: 0.6610

In [52]:
class NN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(NN, self).__init__()
        torch.manual_seed(12345)
        self.lin1 = Linear(1, hidden_channels)
        self.lin2 = Linear(hidden_channels, hidden_channels)
        self.lin3 = Linear(hidden_channels, 2)

    def forward(self, x):
        x = self.lin1(x)
        x = x.relu()
        x = self.lin2(x)
        x = x.relu()
        x = self.lin3(x)
        return x

model = NN(hidden_channels=64)
print(model)

NN(
  (lin1): Linear(in_features=1, out_features=64, bias=True)
  (lin2): Linear(in_features=64, out_features=64, bias=True)
  (lin3): Linear(in_features=64, out_features=2, bias=True)
)


In [54]:
model = NN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.
        print(data.edge_index)
        out = model(data.edge_index)  # Perform a single forward pass.
        loss = criterion(out, data.y)  # Compute the loss.
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        optimizer.zero_grad()  # Clear gradients.
    print(loss.item())

def test(loader):
    model.eval()
    misclass = []

    correct = 0
    for data in loader:  # Iterate in batches over the training/test dataset.
        out = model(data.edge_index)  
        pred = out.argmax(dim=1)  # Use the class with highest probability.
        correct += int((pred == data.y).sum())  # Check against ground-truth labels.
        misclass += list(data[pred != data.y])
    return correct / len(loader.dataset), misclass  # Derive ratio of correct predictions.


for epoch in range(1, 201):
    train()
    train_acc, trM = test(train_loader)
    test_acc, testM = test(test_loader)
#     print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}')
    if epoch % 1 == 0:
        print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

print(len(trM))
for datapoint in trM:
    print(datapoint.edge_index)
    print(datapoint.y)
        
print(len(testM))
for datapoint in testM:
    print(datapoint.edge_index)
    print(datapoint.y)

tensor([[  2,   0,   2,  ..., 308, 307, 306],
        [  0,   2,   4,  ..., 307, 306, 307]])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (2x552 and 1x64)