In [1]:
### Imports for pytorch

import torch
from torch.autograd import Variable
from groupy.gconv.pytorch_gconv import P4MConvZ2, P4MConvP4M, P4ConvZ2, P4ConvP4

import torchvision
from torchvision import datasets
import torchvision.transforms as transforms

from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

from RotMNIST import RotMNIST


In [2]:
### Instatiate RotMNIST and verify behaviour below with the dataloaders
dataset_rot = RotMNIST(
    root = 'data',
    download=True,
    train=True,
    transform=torchvision.transforms.Compose(
        [torchvision.transforms.Resize(32), torchvision.transforms.ToTensor()]
    ),
    rotation_mirroring=True
)

test_dataset_rot = RotMNIST(
    root = 'data',
    download=True,
    train=False,
    transform=torchvision.transforms.Compose(
        [torchvision.transforms.Resize(32), torchvision.transforms.ToTensor()]
    ),
    rotation_mirroring=True
)

dataset_upright = RotMNIST(
    root = 'data',
    download=True,
    train=True,
    transform=torchvision.transforms.Compose(
        [torchvision.transforms.Resize(32), torchvision.transforms.ToTensor()]
    ),
    rotation_mirroring=False,
)

### Instantiate dataloader for RotMNIST and get batches
train_dataloader_rot = DataLoader(dataset_rot, batch_size=64, shuffle=True)
test_dataloader_rot = DataLoader(test_dataset_rot, batch_size=64, shuffle=True)
train_dataloader_upright = DataLoader(dataset_upright, batch_size=64, shuffle=True)

  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [3]:
import torch.nn as nn
import torch.nn.functional as F

# Define max pooling as from @COGNAR and mnist expiriments from @adambielski (found through pytorch implementation of GrouPy)

import torch.nn.functional as F

def plane_group_spatial_max_pooling(x, ksize, stride=None, pad=0):
    xs = x.size()
    x = x.view(xs[0], xs[1] * xs[2], xs[3], xs[4])
    x = F.max_pool2d(input=x, kernel_size=ksize, stride=stride, padding=pad)
    x = x.view(xs[0], xs[1], xs[2], x.size()[2], x.size()[3])
    return x

In [4]:
### G-Conv p4m training

# Three settings for convolutional layers:
#   * self.conv1 = P4MConvZ2(in_channels=1, out_channels=2, kernel_size=5, stride=1),   self.conv2 = P4MConvP4M(in_channels=2, out_channels=4, kernel_size=5, stride=1)
#   * self.conv1 = P4ConvZ2(in_channels=1, out_channels=4, kernel_size=5, stride=1),    self.conv2 = P4ConvP4(in_channels=4, out_channels=6, kernel_size=5, stride=1)
#   * self.conv1 = nn.Conv2d(1, 6, 5), self.conv2 = nn.Conv2d(6, 16, 5)

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__() 
        self.conv1 = P4MConvZ2(in_channels=1, out_channels=8, kernel_size=5, stride=1)
        self.conv2 = P4MConvP4M(in_channels=8, out_channels=16, kernel_size=5, stride=1)
        self.conv3 = P4MConvP4M(in_channels=16, out_channels=32, kernel_size=5)

        self.fc1 = nn.Linear(32, 10)
        self.fc2 = nn.Linear(10, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = plane_group_spatial_max_pooling(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = plane_group_spatial_max_pooling(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = plane_group_spatial_max_pooling(x, 2, 2)
        x = torch.max(x, dim=2)[0]
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x,dim=1)

net = Net()


print(net)
pytorch_total_params = sum(p.numel() for p in net.parameters() if p.requires_grad)
print("Number of trainable params: " + str(pytorch_total_params))

Net(
  (conv1): P4MConvZ2()
  (conv2): P4MConvP4M()
  (conv3): P4MConvP4M()
  (fc1): Linear(in_features=32, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)
Number of trainable params: 128696


In [5]:
### G-Conv p4 training

# Three settings for convolutional layers:
#   * self.conv1 = P4MConvZ2(in_channels=1, out_channels=2, kernel_size=5, stride=1),   self.conv2 = P4MConvP4M(in_channels=2, out_channels=4, kernel_size=5, stride=1)
#   * self.conv1 = P4ConvZ2(in_channels=1, out_channels=4, kernel_size=5, stride=1),    self.conv2 = P4ConvP4(in_channels=4, out_channels=6, kernel_size=5, stride=1)
#   * self.conv1 = nn.Conv2d(1, 6, 5), self.conv2 = nn.Conv2d(6, 16, 5)

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__() 
        # self.conv1 = nn.Conv2d(1, 6, 5)
        self.conv1 = P4MConvZ2(in_channels=1, out_channels=6, kernel_size=5, stride=1)
        # self.pool = nn.MaxPool2d(2, 2) - getting replaced by plane_group_max_pooling
        # self.conv2 = nn.Conv2d(6, 16, 5)
        self.conv2 = P4MConvP4M(in_channels=6, out_channels=16, kernel_size=5, stride=1)
        self.conv3 = P4MConvP4M(in_channels=16, out_channels=32, kernel_size=5)

        self.fc1 = nn.Linear(32, 10)
        self.fc2 = nn.Linear(10, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = plane_group_spatial_max_pooling(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = plane_group_spatial_max_pooling(x, 2, 2)
        x = F.relu(self.conv3(x))
        x = plane_group_spatial_max_pooling(x, 1, 1)
        x = torch.max(x, dim=2)[0]
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

net = Net()


print(net)
pytorch_total_params = sum(p.numel() for p in net.parameters() if p.requires_grad)
print("Number of trainable params: " + str(pytorch_total_params))

Net(
  (conv1): P4MConvZ2()
  (conv2): P4MConvP4M()
  (conv3): P4MConvP4M()
  (fc1): Linear(in_features=32, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=10, bias=True)
)
Number of trainable params: 122244


In [6]:
### Hyper-parameters

learning_rate = 0.001
batch_size = 64
epochs = 25

### Optimizers, Objectives 
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)

In [7]:
### Training and testing function definitions
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        
        # Compute prediction and loss for backprop
        pred = model(X.to(device))
        loss = loss_fn(pred, y.to(device))

        # Backpropagation by setting grad to zero, calculating using backprop engine and stepping (using learning rate)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    test_loss, correct = 0, 0
    
    # No gradient on training data (faster computation and no optimization happening here anyway)
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X.to(device))
            test_loss += loss_fn(pred, y.to(device)).item()
            correct += (pred.argmax(1) == y.to(device)).type(torch.float).sum().item()

    test_loss /= size
    correct /= size
    
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [10]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader_rot, net, loss_fn, optimizer)

    # Test loop will always have testing done with rotations and scaling
    test_loop(test_dataloader_rot, net, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.000089  [    0/60000]
loss: 0.000012  [ 6400/60000]


KeyboardInterrupt: 

In [11]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in test_dataloader_rot:
        images, labels = data[0].to(device), data[1].to(device)
        # calculate outputs by running images through the network
        outputs = net(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.to(device)).sum().item()

print('Accuracy of the network on %i test images: %f %%' % (total, 100.0 * correct / total))

Accuracy of the network on 10000 test images: 97.480000 %


In [None]:
# torch.save(net, 'upright-trained-p4m.pth')

In [9]:
net = torch.load('upright-trained-p4m.pth')


In [13]:
correct = 0
total = 0

# prepare to count predictions for each class
correct_pred = {num : 0 for num in range(0, 10)}
total_pred = {num : 0 for num in range(0, 10)}

# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in test_dataloader_rot:
        images, labels = data[0].to(device), data[1].to(device)
        # calculate outputs by running images through the network
        outputs = net(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)

        total += labels.size(0)
        correct += (predicted == labels.to(device)).sum().item()

        for label, prediction in zip(labels, predicted):
            if label == prediction:
                correct_pred[label.item()] += 1
            total_pred[label.item()] += 1

print('Accuracy of the network on %i test images: %f %%' % (total, 100.0 * correct / total))

for classname, correct_count in correct_pred.items():
    accuracy = 100 * float(correct_count) / total_pred[classname]
    print("Accuracy for num {} is: {:.1f} %".format(classname,
                                                   accuracy))

Accuracy of the network on 10000 test images: 97.480000 %
Accuracy for num 0 is: 98.9 %
Accuracy for num 1 is: 98.9 %
Accuracy for num 2 is: 95.8 %
Accuracy for num 3 is: 98.4 %
Accuracy for num 4 is: 97.3 %
Accuracy for num 5 is: 95.5 %
Accuracy for num 6 is: 96.1 %
Accuracy for num 7 is: 96.8 %
Accuracy for num 8 is: 98.9 %
Accuracy for num 9 is: 97.8 %
