In [1]:
import os


# Changer le répertoire actuel
os.chdir('/kaggle/input/equivarient/e2cnn-master')

# Vérifier le répertoire actuel
print("Répertoire actuel :", os.getcwd())


Répertoire actuel : /kaggle/input/equivarient/e2cnn-master


# General E(2)-Equivariant Steerable CNNs  -  A concrete example


In [2]:
import torch

from e2cnn import gspaces
from e2cnn import nn

Finally, we build a **Steerable CNN** and try it MNIST.

Let's also use a group a bit larger: we now build a model equivariant to $8$ rotations.
We indicate the group of $N$ discrete rotations as $C_N$, i.e. the **cyclic group** of order $N$.
In this case, we will use $C_8$.

Because the inputs are still gray-scale images, the input type of the model is again a *scalar field*.

However, internally we use *regular fields*: this is equivalent to a *group-equivariant convolutional neural network*.

Finally, we build *invariant* features for the final classification task by pooling over the group using *Group Pooling*.

The final classification is performed by a two fully connected layers.

# The model

Here is the definition of our model:

In [3]:
class C8SteerableCNN(torch.nn.Module):

    def __init__(self, n_classes=10):

        super(C8SteerableCNN, self).__init__()

        self.mask = torch.ones(1, 1, 28, 28)  # Adjust the shape to match your input dimensions
        self.mask = self.mask.to(device)

        # the model is equivariant under rotations by 45 degrees, modelled by C8
        self.r2_act = gspaces.Rot2dOnR2(N=8)

        # the input image is a scalar field, corresponding to the trivial representation
        in_type = nn.FieldType(self.r2_act, [self.r2_act.trivial_repr])

        # we store the input type for wrapping the images into a geometric tensor during the forward pass
        self.input_type = in_type

        # convolution 1
        # first specify the output type of the convolutional layer
        # we choose 24 feature fields, each transforming under the regular representation of C8
        out_type = nn.FieldType(self.r2_act, 24*[self.r2_act.regular_repr])
        self.block1 = nn.SequentialModule(
            nn.MaskModule(in_type, 29, margin=1),
            nn.R2Conv(in_type, out_type, kernel_size=7, padding=1, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 2
        # the old output type is the input type to the next layer
        in_type = self.block1.out_type
        # the output type of the second convolution layer are 48 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 48*[self.r2_act.regular_repr])
        self.block2 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool1 = nn.SequentialModule(
            nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=2)
        )

        # convolution 3
        # the old output type is the input type to the next layer
        in_type = self.block2.out_type
        # the output type of the third convolution layer are 48 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 48*[self.r2_act.regular_repr])
        self.block3 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 4
        # the old output type is the input type to the next layer
        in_type = self.block3.out_type
        # the output type of the fourth convolution layer are 96 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 96*[self.r2_act.regular_repr])
        self.block4 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool2 = nn.SequentialModule(
            nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=2)
        )

        # convolution 5
        # the old output type is the input type to the next layer
        in_type = self.block4.out_type
        # the output type of the fifth convolution layer are 96 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 96*[self.r2_act.regular_repr])
        self.block5 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=2, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )

        # convolution 6
        # the old output type is the input type to the next layer
        in_type = self.block5.out_type
        # the output type of the sixth convolution layer are 64 regular feature fields of C8
        out_type = nn.FieldType(self.r2_act, 64*[self.r2_act.regular_repr])
        self.block6 = nn.SequentialModule(
            nn.R2Conv(in_type, out_type, kernel_size=5, padding=1, bias=False),
            nn.InnerBatchNorm(out_type),
            nn.ReLU(out_type, inplace=True)
        )
        self.pool3 = nn.PointwiseAvgPoolAntialiased(out_type, sigma=0.66, stride=1, padding=0)

        self.gpool = nn.GroupPooling(out_type)

        # number of output channels
        c = self.gpool.out_type.size

        # Fully Connected
        self.fully_net = torch.nn.Sequential(
            torch.nn.Linear(c, 64),
            torch.nn.BatchNorm1d(64),
            torch.nn.ELU(inplace=True),
            torch.nn.Linear(64, n_classes),
        )

    def forward(self, input: torch.Tensor):
        # wrap the input tensor in a GeometricTensor
        # (associate it with the input type)
        x = nn.GeometricTensor(input, self.input_type)

        # apply each equivariant block

        # Each layer has an input and an output type
        # A layer takes a GeometricTensor in input.
        # This tensor needs to be associated with the same representation of the layer's input type
        #
        # The Layer outputs a new GeometricTensor, associated with the layer's output type.
        # As a result, consecutive layers need to have matching input/output types
        x = self.block1(x)
        x = self.block2(x)
        x = self.pool1(x)

        x = self.block3(x)
        x = self.block4(x)
        x = self.pool2(x)

        x = self.block5(x)
        x = self.block6(x)

        # pool over the spatial dimensions
        x = self.pool3(x)

        # pool over the group
        x = self.gpool(x)

        # unwrap the output GeometricTensor
        # (take the Pytorch tensor and discard the associated representation)
        x = x.tensor

        # classify with the final fully connected layers)
        x = self.fully_net(x.reshape(x.shape[0], -1))

        return x

Let's try the model on *rotated* MNIST

In [4]:
import torch.utils.data
import torchvision
import torchvision.transforms.functional as F
from torchvision.transforms import Compose, Pad, ToTensor, Normalize, ToPILImage
from PIL import Image

import numpy as np


class RotatedMNISTDataset(torch.utils.data.Dataset):
    '''
        This class provides MNIST images with random rotations sampled from
        a list of rotation angles. This list is dependent of the number of tasks
        `num_tasks` and the distance (measured in degrees) between tasks
        `per_task_rotation`.
    '''
    def __init__(self, root, train=True, transform=None, download=True, num_tasks=5, per_task_rotation=45):
        self.dataset = torchvision.datasets.MNIST(root=root, train=train, transform=transform, download=download)
        self.transform = transform
        self.rotation_angles = []
        for task in range(num_tasks):
            self.rotation_angles.append(float((task) * per_task_rotation))

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        angle = np.random.choice(self.rotation_angles)  # Randomly choose a rotation angle
        rotated_image = F.rotate(image, angle, fill=(0,))


        return rotated_image, label, angle


In [5]:
def flattened_rotMNIST(num_tasks,
                       per_task_rotation,
                       batch_size,
                       transform=[],
                       ):
    '''
    Returns:
    - train_loader
    - test_loader
    '''
    g = torch.Generator()
    g.manual_seed(0)  # Ensure consistent ordering across runs

    # Extend the provided transform with default Pad, ToTensor, and Normalize
    extended_transform = transform.copy()
    extended_transform.extend([
        Pad((0, 0, 1, 1)),  # Add padding
        ToTensor(),
        Normalize((0.1307,), (0.3081,))  # Normalize
    ])
    transforms = Compose(extended_transform)

    # Create train and test datasets
    train = RotatedMNISTDataset(
        root='~/data/', train=True, download=True,
        transform=transforms, num_tasks=num_tasks, per_task_rotation=per_task_rotation
    )
    test = RotatedMNISTDataset(
        root='~/data/', train=False, download=True,
        transform=transforms, num_tasks=num_tasks, per_task_rotation=per_task_rotation
    )

    # Create DataLoaders
    train_loader = torch.utils.data.DataLoader(
        train, batch_size=batch_size, shuffle=False,
        num_workers=0, pin_memory=True, generator=g
    )
    test_loader = torch.utils.data.DataLoader(
        test, batch_size=batch_size, shuffle=True,
        num_workers=0, pin_memory=True, generator=g
    )

    return train_loader, test_loader

In [6]:
# Step 1: Import necessary modules
import torch

# Step 2: Set parameters for the new dataset
num_tasks = 8  # Number of rotation tasks
per_task_rotation = 45  # Degrees of rotation per task
batch_size = 64  # Batch size for training/testing

# Step 3: Load the rotated MNIST dataset
train_loader, test_loader = flattened_rotMNIST(
    num_tasks=num_tasks,
    per_task_rotation=per_task_rotation,
    batch_size=batch_size
)

for images, labels, angles in train_loader:
    print(f"Input image shape after transform: {images.shape}")  # Should be [batch_size, 1, 29, 29]
    break


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to /root/data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 17721602.02it/s]


Extracting /root/data/MNIST/raw/train-images-idx3-ubyte.gz to /root/data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to /root/data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 469339.92it/s]


Extracting /root/data/MNIST/raw/train-labels-idx1-ubyte.gz to /root/data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to /root/data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 4415336.32it/s]


Extracting /root/data/MNIST/raw/t10k-images-idx3-ubyte.gz to /root/data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to /root/data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 3148847.73it/s]


Extracting /root/data/MNIST/raw/t10k-labels-idx1-ubyte.gz to /root/data/MNIST/raw

Input image shape after transform: torch.Size([64, 1, 29, 29])


In [7]:
from torch.utils.data import Dataset
from torchvision.transforms import RandomRotation
from torchvision.transforms import Pad
from torchvision.transforms import Resize
from torchvision.transforms import ToTensor
from torchvision.transforms import Compose

import numpy as np

from PIL import Image

device = 'cuda' if torch.cuda.is_available() else 'cpu'


Build the dataset

In [8]:
class MnistRotDataset(Dataset):

    def __init__(self, mode, transform=None):
        assert mode in ['train', 'test']

        if mode == "train":
            file = "mnist_rotation_new/mnist_all_rotation_normalized_float_train_valid.amat"
        else:
            file = "mnist_rotation_new/mnist_all_rotation_normalized_float_test.amat"

        self.transform = transform

        data = np.loadtxt(file, delimiter=' ')

        self.images = data[:, :-1].reshape(-1, 28, 28).astype(np.float32)
        self.labels = data[:, -1].astype(np.int64)
        self.num_samples = len(self.labels)

    def __getitem__(self, index):
        image, label = self.images[index], self.labels[index]
        image = Image.fromarray(image)
        if self.transform is not None:
            image = self.transform(image)
        return image, label

    def __len__(self):
        return len(self.labels)

# images are padded to have shape 29x29.
# this allows to use odd-size filters with stride 2 when downsampling a feature map in the model
pad = Pad((0, 0, 1, 1), fill=0)

# to reduce interpolation artifacts (e.g. when testing the model on rotated images),
# we upsample an image by a factor of 3, rotate it and finally downsample it again
resize1 = Resize(87)
resize2 = Resize(29)

totensor = ToTensor()

Let's build the model

The model is now randomly initialized.
Therefore, we do not expect it to produce the right class probabilities.

However, the model should still produce the same output for rotated versions of the same image.
This is true for rotations by multiples of $\frac{\pi}{2}$, but is only approximate for rotations by $\frac{\pi}{4}$.

Let's test it on a random test image:
we feed eight rotated versions of the first image in the test set and print the output logits of the model for each of them.

In [9]:

def test_model(model: torch.nn.Module, x: Image):
    # evaluate the `model` on 8 rotated versions of the input image `x`
    model.eval()

    wrmup = model(torch.randn(1, 1, 29, 29).to(device))
    del wrmup

    x = resize1(pad(x))

    print()
    print('##########################################################################################')
    header = 'angle |  ' + '  '.join(["{:6d}".format(d) for d in range(10)])
    print(header)
    with torch.no_grad():
        for r in range(8):
            x_transformed = totensor(resize2(x.rotate(r*45., Image.BILINEAR))).reshape(1, 1, 29, 29)
            x_transformed = x_transformed.to(device)

            y = model(x_transformed)
            y = y.to('cpu').numpy().squeeze()

            angle = r * 45
            print("{:5d} : {}".format(angle, y))
    print('##########################################################################################')
    print()



The output of the model is already almost invariant.
However, we still observe small fluctuations in the outputs.

This is because the model contains some operations which might break equivariance.
For instance, every convolution includes a padding of $2$ pixels per side. This is adds information about the actual orientation of the grid where the image/feature map is sampled because the padding is not rotated with the image.

During training, the model will observe rotated patterns and will learn to ignore the noise coming from the padding.

So, let's train the model now.
The model is exactly the same used to train a normal *PyTorch* architecture:

In [10]:
loss_function = torch.nn.CrossEntropyLoss()

In [11]:
import csv
import torch
from torch.utils.data import DataLoader, Subset
import numpy as np

# Create or open a CSV file
csv_file = "/kaggle/working/accuracy_by_percentage.csv"
with open(csv_file, mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["Data Percentage", "Num Images", "Epoch", "Accuracy"])  # CSV header

# Define percentages to sample
percentages = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

# Function to create a subset DataLoader for a specific percentage
def get_subset_loader(full_loader, percentage):
    # Get all indices and labels from the existing DataLoader
    indices = []
    labels = []
    for i, (x, t, _) in enumerate(full_loader.dataset):
        indices.append(i)
        labels.append(t)

    indices = np.array(indices)
    labels = np.array(labels)

    # Split indices by label
    subset_indices = []
    for label in np.unique(labels):
        label_indices = indices[labels == label]
        num_samples = int(len(label_indices) * (percentage / 100))
        subset_indices.extend(label_indices[:num_samples])

    # Create a subset DataLoader
    subset_dataset = Subset(full_loader.dataset, subset_indices)
    subset_loader = DataLoader(subset_dataset, batch_size=full_loader.batch_size, shuffle=True)
    return subset_loader, len(subset_indices)

# Train the model for different data percentages
for percentage in percentages:

    model = C8SteerableCNN().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-5, weight_decay=1e-5)
    
    # Get the subset DataLoader
    subset_loader, num_images = get_subset_loader(train_loader, percentage)
    print(f"Training with {percentage}% of the data ({num_images} images)")

    # Training loop
    for epoch in range(11):  # Number of epochs
        model.train()
        for i, (x, t, _) in enumerate(subset_loader):
            optimizer.zero_grad()

            x = x.to(device)
            t = t.to(device)

            y = model(x)
            loss = loss_function(y, t)

            loss.backward()
            optimizer.step()

        # Testing loop
        total = 0
        correct = 0
        with torch.no_grad():
            model.eval()
            for i, (x, t, _) in enumerate(test_loader):
                x = x.to(device)
                t = t.to(device)

                y = model(x)

                _, prediction = torch.max(y.data, 1)
                total += t.shape[0]
                correct += (prediction == t).sum().item()

        # Calculate accuracy
        test_accuracy = correct / total * 100.0
        print(f"Percentage {percentage}% | Epoch {epoch} | Test Accuracy: {test_accuracy:.2f}%")

        # Save results to the CSV file
        with open(csv_file, mode="a", newline="") as file:
            writer = csv.writer(file)
            writer.writerow([percentage, num_images, epoch, test_accuracy])


  full_mask[mask] = norms.to(torch.uint8)


Training with 0.2% of the data (113 images)
Percentage 0.2% | Epoch 0 | Test Accuracy: 8.99%
Percentage 0.2% | Epoch 1 | Test Accuracy: 10.36%
Percentage 0.2% | Epoch 2 | Test Accuracy: 12.32%
Percentage 0.2% | Epoch 3 | Test Accuracy: 23.39%
Percentage 0.2% | Epoch 4 | Test Accuracy: 33.94%
Percentage 0.2% | Epoch 5 | Test Accuracy: 38.50%
Percentage 0.2% | Epoch 6 | Test Accuracy: 43.40%
Percentage 0.2% | Epoch 7 | Test Accuracy: 48.00%
Percentage 0.2% | Epoch 8 | Test Accuracy: 50.98%
Percentage 0.2% | Epoch 9 | Test Accuracy: 53.62%
Percentage 0.2% | Epoch 10 | Test Accuracy: 56.08%


In [12]:
###### import matplotlib.pyplot as plt
import numpy as np

# Function to display a batch of images
def show_images_from_loader(loader, classes, num_images=8):
    """
    Displays a batch of images from the given DataLoader.

    Args:
    - loader: The DataLoader to pull images from (e.g., test_loader).
    - classes: A list of class names corresponding to the dataset labels.
    - num_images: The number of images to display (default is 8).
    """
    # Get a single batch from the loader
    data_iter = iter(loader)
    images, labels, _ = next(data_iter)

    # Limit to the specified number of images
    images = images[:num_images]
    labels = labels[:num_images]

    # Convert from torch tensors to numpy for visualization
    images = images.numpy()

    # Create a figure
    fig, axes = plt.subplots(1, num_images, figsize=(15, 5))

    for i in range(num_images):
        ax = axes[i]
        img = np.transpose(images[i], (1, 2, 0))  # Convert from (C, H, W) to (H, W, C)
        ax.imshow(img)
        ax.axis("off")
        ax.set_title(classes[labels[i]])

    plt.tight_layout()
    plt.show()

# Example usage
# Assuming you have a DataLoader named `test_loader` and a `classes` list
# For example: classes = ['cat', 'dog', 'bird', ...] based on your dataset
classes = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']  # Replace with your actual class names
show_images_from_loader(test_loader, classes)


NameError: name 'plt' is not defined