# GLGENN on Colored Fashion-MNIST

In [1]:
import torch
print(torch.cuda.is_available())

True


In [23]:
import os
os.chdir('/')
if not os.path.exists("/clifford-group-equivariant-neural-networks"):
    !git clone https://github.com/DavidRuhe/clifford-group-equivariant-neural-networks.git
os.chdir("/clifford-group-equivariant-neural-networks")

Cloning into 'clifford-group-equivariant-neural-networks'...
remote: Enumerating objects: 112, done.[K
remote: Counting objects: 100% (45/45), done.[K
remote: Compressing objects: 100% (22/22), done.[K
remote: Total 112 (delta 32), reused 23 (delta 23), pack-reused 67 (from 1)[K
Receiving objects: 100% (112/112), 349.95 KiB | 11.29 MiB/s, done.
Resolving deltas: 100% (38/38), done.


In [3]:
import os
import sys
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
drive_repo_path = '/content/drive/My Drive/glgenn'
print(os.listdir(drive_repo_path))
repo_path = '/content/drive/MyDrive'
sys.path.append(repo_path)

['glgenn.png', 'LICENSE', 'README.md', 'experiments', 'layers', 'data', 'engineer', 'algebra', 'models']


In [8]:
import os
import random
from dataclasses import dataclass

import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets
import torchvision.transforms.v2 as v2

from glgenn.algebra.cliffordalgebraex import CliffordAlgebraQT
from glgenn.algebra.cliffordalgebra import CliffordAlgebra
from glgenn.layers.qtgp import QTGeometricProduct
from glgenn.layers.qtlinear import QTLinear
from glgenn.layers.qtnorm import QTNormalization

import matplotlib.pyplot as plt


In [9]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x7b55c8592170>

## Dataloader (Colored Fashion MNIST + GA embedding)


In [10]:
class CliffordFashionedMnist(Dataset):
    def __init__(self, root, train=True, download=True, d=5):
        self.metric = [1] * d
        self.ca = CliffordAlgebraQT(self.metric)
        self.d = d
        self.h, self.w = 28, 28
        self.post_transforms = v2.Compose([
            v2.ToImage(),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

        self.data = datasets.FashionMNIST(root, train=train, download=download)

        y_coords, x_coords = torch.meshgrid(
            torch.linspace(-1, 1, self.h),
            torch.linspace(-1, 1, self.w),
            indexing="ij"
        )
        self.grid = torch.stack([x_coords, y_coords], dim=0) # [2, 28, 28]

    def _colorize_random(self, img):
        img_np = np.array(img)
        factors = np.random.uniform(0.2, 1.0, 3)
        color_img = np.stack([img_np * f for f in factors], axis=-1).astype(np.uint8) # [28, 28, 3]
        return color_img

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_raw, label = self.data[idx]
        img_colored = self._colorize_random(img_raw) # [3, 28, 28]
        img_tensor = self.post_transforms(img_colored)
        v_full = torch.cat([self.grid, img_tensor], dim=0) # [5, 28, 28]
        v_full = v_full.permute(1, 2, 0) # [28, 28, 5]
        x_mv = self.ca.embed_grade(v_full, 1)  # [28, 28, 2 ** 5]
        return x_mv, label

### Prepare: MVSiLU from CGENN

In [11]:
def unsqueeze_like(tensor: torch.Tensor, like: torch.Tensor, dim=0):
    """
    Unsqueeze last dimensions of tensor to match another tensor's number of dimensions.

    Args:
        tensor (torch.Tensor): tensor to unsqueeze
        like (torch.Tensor): tensor whose dimensions to match
        dim: int: starting dim, default: 0.
    """
    n_unsqueezes = like.ndim - tensor.ndim
    if n_unsqueezes < 0:
        raise ValueError(f"tensor.ndim={tensor.ndim} > like.ndim={like.ndim}")
    elif n_unsqueezes == 0:
        return tensor
    else:
        return tensor[dim * (slice(None),) + (None,) * n_unsqueezes]

class MVSiLU(nn.Module):
    def __init__(self, algebra, channels, invariant="mag2", exclude_dual=False):
        super().__init__()
        self.algebra = algebra
        self.channels = channels
        self.exclude_dual = exclude_dual
        self.invariant = invariant
        self.a = nn.Parameter(torch.ones(1, channels, algebra.dim + 1))
        self.b = nn.Parameter(torch.zeros(1, channels, algebra.dim + 1))

        if invariant == "norm":
            self._get_invariants = self._norms_except_scalar
        elif invariant == "mag2":
            self._get_invariants = self._mag2s_except_scalar
        else:
            raise ValueError(f"Invariant {invariant} not recognized.")

    def _norms_except_scalar(self, input):
        return self.algebra.norms(input, grades=self.algebra.grades[1:])

    def _mag2s_except_scalar(self, input):
        return self.algebra.qs(input, grades=self.algebra.grades[1:])

    def forward(self, input):
        norms = self._get_invariants(input)
        norms = torch.cat([input[..., :1], *norms], dim=-1)
        a = unsqueeze_like(self.a, norms, dim=2)
        b = unsqueeze_like(self.b, norms, dim=2)
        norms = a * norms + b
        norms = norms.repeat_interleave(self.algebra.subspaces, dim=-1)
        return torch.sigmoid(norms) * input

## Model 1: GLGENN + dummy-embedding


In [12]:
class CGEBlock(nn.Module):
    def __init__(self, algebra, in_features, out_features):
        super().__init__()

        self.layers = nn.Sequential(
            QTLinear(algebra, in_features, out_features),
            MVSiLU(algebra, out_features),
            QTGeometricProduct(algebra, out_features),
            QTNormalization(algebra, out_features)
        )

    def forward(self, input):
        # [batch_size, in_features, 2**d] -> [batch_size, out_features, 2**d]
        return self.layers(input)

In [13]:
class CGEMLP(nn.Module):
    def __init__(self, algebra, in_features, hidden_features, out_features, n_layers=2):
        super().__init__()

        layers = []
        for i in range(n_layers - 1):
            layers.append(
                CGEBlock(algebra, in_features, hidden_features)
            )
            in_features = hidden_features

        layers.append(
            CGEBlock(algebra, hidden_features, out_features)
        )
        self.layers = nn.Sequential(*layers)

    def forward(self, input):
        return self.layers(input)

In [19]:
class CliffordFashionModel(nn.Module):
    def __init__(self, ca, in_channels=1, hidden_channels=16, out_classes=10):
        super().__init__()
        self.ca = ca
        self.cge_part = CGEMLP(ca, in_channels, hidden_channels, hidden_channels)
        self.activation = MVSiLU(ca, hidden_channels)

        self.classifier = nn.Sequential(
            nn.Linear(hidden_channels, hidden_channels),
            nn.ReLU(),
            nn.Linear(hidden_channels, out_classes)
        )

    def forward(self, x):
        batch_size, h, w, mv_dim = x.shape
        x = x.view(batch_size * h * w, 1, mv_dim)
        h_out = self.cge_part(x)
        h_out = self.activation(h_out)
        invariants = h_out[..., 0]
        invariants = invariants.view(batch_size, h * w, -1)
        pooled = invariants.mean(dim=1)
        return self.classifier(pooled)

## Training & Evaluating

In [20]:
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

d = 5
lr = 0.001
epochs = 5
batch_size = 32

ca = CliffordAlgebraQT([1] * d)
full_dataset = CliffordFashionedMnist(root='./data', train=True, download=True)
train_loader = DataLoader(full_dataset, batch_size=batch_size, shuffle=True, generator=torch.Generator(device='cpu'))

test_batch, test_labels = next(iter(train_loader))

model = CliffordFashionModel(ca, in_channels=1, hidden_channels=16).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):
    model.train()
    pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')
    correct = 0
    total = 0

    for batch_idx, (images, labels) in enumerate(pbar):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        accuracy = 100. * correct / total
        pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{accuracy:.2f}%'})

cuda


Epoch 1/5: 100%|██████████| 1875/1875 [22:27<00:00,  1.39it/s, loss=1.6780, acc=23.19%]
Epoch 2/5: 100%|██████████| 1875/1875 [22:27<00:00,  1.39it/s, loss=1.6688, acc=31.46%]
Epoch 3/5: 100%|██████████| 1875/1875 [22:29<00:00,  1.39it/s, loss=1.8694, acc=33.75%]
Epoch 4/5: 100%|██████████| 1875/1875 [22:26<00:00,  1.39it/s, loss=1.8000, acc=36.32%]
Epoch 5/5: 100%|██████████| 1875/1875 [22:27<00:00,  1.39it/s, loss=1.7362, acc=38.82%]


In [22]:
model.eval()

full_test_dataset = CliffordFashionedMnist(root='./data', train=False, download=True)
test_loader = DataLoader(full_test_dataset, batch_size=batch_size, shuffle=False)

test_loss = 0.0
test_correct = 0
test_total = 0

with torch.no_grad():
    for images, labels in tqdm(test_loader, desc='Testing'):
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)

        test_loss += loss.item()
        _, predicted = outputs.max(1)
        test_total += labels.size(0)
        test_correct += predicted.eq(labels).sum().item()

test_accuracy = 100. * test_correct / test_total
print(f'Test Accuracy: {test_accuracy:.2f}%')

Testing: 100%|██████████| 313/313 [00:55<00:00,  5.60it/s]

Test Loss: 1.6032, Test Accuracy: 39.19%





## Model 2: CGENN + dummy-embedding

In [None]:
class CGEBlock(nn.Module):
    def __init__(self, algebra, in_features, out_features):
        super().__init__()

        self.layers = nn.Sequential(
            MVLinear(algebra, in_features, out_features),
            MVSiLU(algebra, out_features),
            SteerableGeometricProductLayer(algebra, out_features),
            MVLayerNorm(algebra, out_features)
        )

    def forward(self, input):
        # [batch_size, in_features, 2**d] -> [batch_size, out_features, 2**d]
        return self.layers(input)

## Building standard models

## Stress tests

## Results