<a href="https://colab.research.google.com/github/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/Assignment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Group Number: group visiha
# Student 1: Sidney Damen
# Student 2: Haoqi Guo
# Student 3: Victor Wen

In case you are using google colab, uncomment the following cell, and modify the ```notebook_dir``` variable to contain the directory this notebook is in. It will automatically download the .py files needed for this assignment

In [None]:
# Change the following  line to the directory this notebook is (if using colab)
# In case you do not know the path, open the file navigator on the left in colab
# Find the folder containing this notebook, then press on the three dots --> copy path
notebook_dir = "/content/drive/MyDrive/Colab Notebooks/"

In [None]:
# UNCOMMENT IF USING COLAB
from google.colab import drive
import requests
drive.mount('/content/drive')
import sys
import os
sys.path.insert(0, notebook_dir) 
os.chdir(notebook_dir)
symco = "https://github.com/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/symconv.py?raw=true"
crpt = "https://github.com/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/carpet.py?raw=true"
r_s = requests.get(symco, allow_redirects=True)
r_c = requests.get(crpt, allow_redirects=True)
with open('symconv.py', 'wb') as f:
    f.write(r_s.content)
with open('carpet.py', 'wb') as f:
    f.write(r_c.content)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset

import io
import requests

import symconv as sc
from carpet import show_carpet, oh_to_label

import numpy as np
import matplotlib.pyplot as plt

from scipy.spatial.distance import cdist

from tqdm import tqdm

In [None]:
def load_numpy_arr_from_url(url):
    """
    Loads a numpy array from surfdrive. 
    
    Input:
    url: Download link of dataset 
    
    Outputs:
    dataset: numpy array with input features or labels
    """
    
    response = requests.get(url)
    response.raise_for_status()

    return np.load(io.BytesIO(response.content)) 

# Task 1: Pattern Classification

In [None]:
# loading training and testing data for task 1
# DO NOT MODIFY
task1 = load_numpy_arr_from_url("https://github.com/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/task1data.npz?raw=true")
# task1 = np.load("task1data.npz")

X = torch.tensor(task1['arr_0']).float()
y = torch.tensor(task1['arr_1']).float()

X_train = X[:7500]
X_val = X[7500:9500]
X_test = X[9500:]
y_train = y[:7500]
y_val = y[7500:9500]
y_test  = y[9500:]

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)


print(f"Carpet train shape: {X_train.shape}")
print(f"Label train shape: {y_train.shape}")
print(f"Carpet validation shape: {X_val.shape}")
print(f"Label validation shape: {y_val.shape}")
print(f"Carpet test shape: {X_test.shape}")
print(f"Label test shape: {y_test.shape}")

In [None]:
# random carpet
idx = np.random.randint(0,7500)
show_carpet(X_train, idx)
print('Carpet from', oh_to_label(y_train[idx,None])[0])

In [None]:
batch_size = 32
image_size = 96*60
num_classes = y_train.shape[1]

In [None]:
# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [None]:
# Train model
def train(model, train_loader, val_loader, optimizer, criterion, n_epochs=10, device='cpu'):
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    for epoch in tqdm(range(n_epochs)):
        model.train()
        train_loss = 0
        train_correct = 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            y_hat = model(X)
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            pred = y_hat.argmax(dim=1, keepdim=True)
            train_correct += pred.eq(y.argmax(dim=1, keepdim=True)).sum().item()
        train_acc = train_correct/len(train_loader.dataset)
        train_loss /= len(train_loader)
        train_accuracies.append(train_acc)
        train_losses.append(train_loss)
        
        model.eval()
        val_loss = 0
        val_correct = 0
        with torch.no_grad():
          for X, y in val_loader:
              X, y = X.to(device), y.to(device)
              y_hat = model(X)
              loss = criterion(y_hat, y)
              val_loss += loss.item()
              pred = y_hat.argmax(dim=1, keepdim=True)
              val_correct += pred.eq(y.argmax(dim=1, keepdim=True)).sum().item()
        val_acc = val_correct/len(val_loader.dataset)
        val_loss /= len(val_loader)
        val_accuracies.append(val_acc)
        val_losses.append(val_loss)

        
        print(f'Epoch {epoch+1}/{n_epochs}: Train loss: {train_loss:.4f}, Train acc: {train_acc*100:.2f}, Val loss: {val_loss:.4f}, Val acc: {val_acc*100:.2f}')
    return train_losses, train_accuracies, val_losses, val_accuracies

# Test model
def test(model, test_loader, device='cpu'):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            y_hat = model(X)
            test_loss += F.cross_entropy(y_hat, y, reduction='sum').item()
            pred = y_hat.argmax(dim=1, keepdim=True)
            correct += pred.eq(y.argmax(dim=1, keepdim=True)).sum().item()
    test_loss /= len(test_loader.dataset)
    print(f'Test loss: {test_loss:.4f}, Test accuracy: {correct}/{len(test_loader.dataset)} ({correct/len(test_loader.dataset)*100:.2f}%)')

In [None]:
class Lambda(nn.Module):
  def __init__(self, func):
      super().__init__()
      self.func = func

  def forward(self, x):
      return self.func(x)

model = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

)      

In [None]:
import pandas as pd

def plot_learning_curves(train_loss, train_accuracies, val_losses, val_accuracies):
    # Plot the losses and accuracies
    learning_curves = pd.DataFrame({'Train loss': train_losses, 'Train accuracy': train_accuracies, 'Validation loss': val_losses, 'Validation accuracy': val_accuracies})

    print("Max val score: {:.2f}%".format(learning_curves.iloc[:,3].max()*100))
    learning_curves.plot(lw=2,style=['b:','r:','b-','r-'])
    plt.xlabel('epochs')
    plt.show()

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
train_losses, train_accuracies, val_losses, val_accuracies = train(model, train_loader, val_loader, optimizer, criterion, n_epochs=25, device=device)
test(model, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

## Task 1: Question 5d

### Different optimizers

In [None]:
# Define new model to prevent ablation study to avoid messing with other results

modelSGD = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

)      

modelSGD.to(device)

In [None]:
optimizerSGD = torch.optim.SGD(modelSGD.parameters(), lr=0.001)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelSGD.to(device), train_loader, val_loader, optimizerSGD, criterion, n_epochs=25, device=device)
test(modelSGD, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

In [None]:
# Define new model to prevent ablation study to avoid messing with other results

modelAda = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

)      

In [None]:
modelAda.to(device)
optimizerAdagrad = torch.optim.Adagrad(modelAda.parameters(), lr=0.001)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelAda, train_loader, val_loader, optimizerAdagrad, criterion, n_epochs=25, device=device)
test(modelAda, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

In [None]:
# Define new model to prevent ablation study to avoid messing with other results

modelRMS = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

)      

In [None]:
modelRMS.to(device)
optimizerRMS = torch.optim.RMSprop(modelRMS.parameters(), lr=0.001)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelRMS, train_loader, val_loader, optimizerRMS, criterion, n_epochs=25, device=device)
test(modelRMS, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

### Changing train size

In [None]:
X = torch.tensor(task1['arr_0']).float()
y = torch.tensor(task1['arr_1']).float()

In [None]:
# Define new model to prevent ablation study to avoid messing with other results

modelLess = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

)      

In [None]:
# Less training
X_train = X[:5000]
X_val = X[5000:9500]
X_test = X[9500:]
y_train = y[:5000]
y_val = y[5000:9500]
y_test  = y[9500:]

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

modelLess.to(device)
optimizer = torch.optim.Adam(modelLess.parameters(), lr=0.001)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelLess, train_loader, val_loader, optimizer, criterion, n_epochs=25, device=device)
test(modelLess, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

In [None]:
# Define new model to prevent ablation study to avoid messing with other results

modelMore = nn.Sequential(
    
    #block 1

    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 1024),
    nn.BatchNorm1d(1024),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(1024, num_classes),
    nn.Softmax(dim=1)

) 

In [None]:
# More training
X_train = X[:8500]
X_val = X[8500:9500]
X_test = X[9500:]
y_train = y[:8500]
y_val = y[8500:9500]
y_test  = y[9500:]

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

modelMore.to(device)
optimizer = torch.optim.Adam(modelMore.parameters(), lr=0.001)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelMore, train_loader, val_loader, optimizer, criterion, n_epochs=25, device=device)
test(modelMore, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

### Experimenting with more layers or less layers

In [None]:
# Less layers
modelLessLayers = nn.Sequential(
    
    #block 1
    nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    # nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    # nn.BatchNorm2d(16),
    # nn.ReLU(),
    # nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    # nn.BatchNorm2d(32),
    # nn.ReLU(),

    #block 2
    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3
    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4
    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5
    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688, num_classes),           # Changed
    # nn.BatchNorm1d(1024),                 # Removed
    # nn.ReLU(),                            # Removed
    # nn.Dropout(0.5),                      # Removed
    # nn.Linear(1024, num_classes),         # Removed
    nn.Softmax(dim=1)
)      

modelLessLayers.to(device)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelLessLayers, train_loader, val_loader, optimizer, criterion, n_epochs=25, device=device)
test(modelLessLayers, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

In [None]:
# More layers
modelMoreLayers = nn.Sequential(
    
    #block 1
    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
    nn.BatchNorm2d(16),
    nn.ReLU(),
    nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),   # Added
    nn.BatchNorm2d(32),                                                     # Added
    nn.ReLU(),                                                              # Added

    #block 2

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 3

    sc.Slice(rotation=1, reflection=False),
    sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 4

    sc.Slice(rotation=4, reflection=False),
    sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
    sc.SymmetryPool(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(kernel_size=2, stride=2),

    #block 5

    Lambda(lambda x: x.view(x.size(0),-1)),
    nn.Linear(2688 , 2048),                                               # Changed
    nn.BatchNorm1d(2048),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(2048 , 1024),                                               # Added
    nn.BatchNorm1d(1024),                                                 # Added
    nn.ReLU(),                                                            # Added
    nn.Dropout(0.5),                                                      # Added
    nn.Linear(1024 , 256),                                                # Added
    nn.BatchNorm1d(256),                                                  # Added
    nn.ReLU(),                                                            # Added
    nn.Dropout(0.5),                                                      # Added
    nn.Linear(256 , 64),                                                  # Added
    nn.BatchNorm1d(64),                                                   # Added
    nn.ReLU(),                                                            # Added
    nn.Dropout(0.5),                                                      # Added
    nn.Linear(64, num_classes),                                           # Added
    nn.Softmax(dim=1)
)      

modelMoreLayers.to(device)
train_losses, train_accuracies, val_losses, val_accuracies = train(modelMoreLayers, train_loader, val_loader, optimizer, criterion, n_epochs=25, device=device)
test(modelMoreLayers, test_loader, device=device)
plot_learning_curves(train_losses, train_accuracies, val_losses, val_accuracies)

# Task 2: Carpet Matching 

In [None]:
# loading training and testing data for task 2
# DO NOT MODIFY
task2 = load_numpy_arr_from_url("https://github.com/vlamen/tue-deeplearning/blob/main/assignments/assignment_1/task2data.npz?raw=true")
# task2 = np.load('task2data.npz')

X = task2['arr_0'].astype(float)
y = task2['arr_1'].astype(float)
gt = task2['arr_2'].astype(float) # ground truth
queries = task2['arr_3'].astype(float)
targets = task2['arr_4'].astype(float)

print(f"Carpet train shape: {X.shape}")
print(f"Label train shape: {y.shape}")
print(f"Ground truth test shape: {gt.shape}")
print(f"Query carpets shape: {queries.shape}")
print(f"Candidate carpets shape: {targets.shape}")

In [None]:
# function to determine performance of model
def query_performance(net, queries, targets, gt, top=1):
    assert top >= 1
    cnt = 0
    for i in range(gt.shape[0]):

        q = queries[i][None].float().cuda()
        t = targets[i].float().cuda()

        with torch.no_grad():
            
            
            ### MODIFY IF NECESSARY ###
            emb_q = net(q).cpu().numpy()
            emb_t = net(t).cpu().numpy()

            dists = cdist(emb_q, emb_t)
            
            if top == 1:
                pred = np.argmin(dists)

                if pred == gt[i]:
                    cnt += 1
            
            else:
                pred = np.argsort(dists)
                if gt[i] in pred[0,:top].tolist():
                    cnt+=1
    return (100*cnt/gt.shape[0])

In [None]:
class EmbeddingNet(nn.Module):
    def __init__(self):
        """CNN Builder."""
        super(EmbeddingNet, self).__init__()

        self.front_layer = nn.Sequential(
            
        #block 1
            
        nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
        nn.BatchNorm2d(16),
        nn.ReLU(),
        nn.Conv2d(in_channels=16, out_channels=16, kernel_size=3, padding=1),
        nn.BatchNorm2d(16),
        nn.ReLU(),
        nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
        nn.BatchNorm2d(32),
        nn.ReLU(),

        #block 2

        sc.Slice(rotation=4, reflection=False),
        sc.SymmetryConv2d(32, 32, 4, stride=1, rotation=4, reflection=False),
        sc.SymmetryPool(),
        nn.BatchNorm2d(32),
        nn.MaxPool2d(kernel_size=2, stride=2),

        #block 3

        sc.Slice(rotation=1, reflection=False),
        sc.SymmetryConv2d(32, 64, 3, stride=1, rotation=1, reflection=False),
        sc.SymmetryPool(),
        nn.BatchNorm2d(64),
        nn.MaxPool2d(kernel_size=2, stride=2),

        #block 4

        sc.Slice(rotation=4, reflection=False),
        sc.SymmetryConv2d(64, 128, 8, stride=1, rotation=4, reflection=False),
        sc.SymmetryPool(),
        nn.BatchNorm2d(128),
        nn.MaxPool2d(kernel_size=2, stride=2),

        #block 5
        
        Lambda(lambda x: x.view(x.size(0),-1)),
        nn.Linear(2688 , 1024),
        nn.BatchNorm1d(1024),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(1024, 200),
        nn.Softmax(dim=1)
        )

        #self.last_layer = nn.Linear(512, 10)

    def forward(self, x):
        """Perform forward."""
        # conv layers
        x = self.front_layer(x)
        #x = self.last_layer(x)
        return x
    
    def get_embedding(self, x):
        return self.forward(x)

In [None]:
from torch.utils.data.sampler import BatchSampler
import numpy as np
class BalancedBatchSampler(BatchSampler):
    """
    Returns batches of size n_classes * n_samples
    """

    def __init__(self, labels, n_classes, n_samples):
        self.labels = labels
        self.labels_set = list(set(self.labels))
        self.label_to_indices = {label: np.where(  np.array(self.labels) == label)[0]
                                 for label in self.labels_set}
        for l in self.labels_set:
            np.random.shuffle(self.label_to_indices[l])
        self.used_label_indices_count = {label: 0 for label in self.labels_set}
        self.count = 0
        self.n_classes = n_classes
        self.n_samples = n_samples
        self.n_dataset = len(self.labels)
        self.batch_size = self.n_samples * self.n_classes

    def __iter__(self):
        self.count = 0
        while self.count + self.batch_size < self.n_dataset:
            classes = np.random.choice(self.labels_set, self.n_classes, replace=False)
            indices = []
            for class_ in classes:
                indices.extend(self.label_to_indices[class_][
                               self.used_label_indices_count[class_]:self.used_label_indices_count[
                                                                         class_] + self.n_samples])
                self.used_label_indices_count[class_] += self.n_samples
                if self.used_label_indices_count[class_] + self.n_samples > len(self.label_to_indices[class_]):
                    np.random.shuffle(self.label_to_indices[class_])
                    self.used_label_indices_count[class_] = 0
            yield indices
            self.count += self.n_classes * self.n_samples

    def __len__(self):
        return self.n_dataset // self.batch_size

In [None]:
from itertools import combinations

class RandomTripletSelector():
    """
    Select random negative  example for  each positive pair  to create triplets
    """

    def __init__(self):
        super(RandomTripletSelector, self).__init__()

    def get_triplets(self, embeddings, labels):
        labels = labels.cpu().data.numpy()
        triplets = []
        for label in set(labels):
            label_mask = (labels == label)
            label_indices = np.where(label_mask)[0]
            if len(label_indices) < 2:
                continue
            negative_indices = np.where(np.logical_not(label_mask))[0]
            anchor_positives = list(combinations(label_indices, 2))  # All anchor-positive pairs

            # random choose one negative example for each positive pair
            temp_triplets = [[anchor_positive[0], anchor_positive[1], np.random.choice(negative_indices)] for anchor_positive in anchor_positives]
            triplets += temp_triplets

        return torch.LongTensor(np.array(triplets))


In [None]:
def pdist(vectors):
    distance_matrix = -2 * vectors.mm(torch.t(vectors)) + vectors.pow(2).sum(dim=1).view(1, -1) + vectors.pow(2).sum(
        dim=1).view(-1, 1)
    return distance_matrix

In [None]:
from itertools import combinations

class Informative_Negative_TripletSelector():

    def __init__(self, margin):
        super(Informative_Negative_TripletSelector, self).__init__()
  
        self.margin = margin
  
   # Our goal is to mining informative triplets.
    def informative_negative(self, loss_values):
        
        informative_negative = np.where(loss_values > 0)[0]
        return np.random.choice(informative_negative) if len(informative_negative) > 0 else None
    

    def get_triplets(self, embeddings, labels):
        
        if torch.cuda.is_available()==False:
            embeddings = embeddings.cpu()
        distance_matrix = pdist(embeddings)
        distance_matrix = distance_matrix.cpu()

        labels = labels.cpu().data.numpy()
        triplets = []

        for label in set(labels):
            label_mask = (labels == label)
            label_indices = np.where(label_mask)[0]
            if len(label_indices) < 2:
                continue
            negative_indices = np.where(np.logical_not(label_mask))[0]
            anchor_positives = list(combinations(label_indices, 2))  # All anchor-positive pairs
            anchor_positives = np.array(anchor_positives)

            
            ap_distances = distance_matrix[anchor_positives[:, 0], anchor_positives[:, 1]]
            for anchor_positive, ap_distance in zip(anchor_positives, ap_distances):
                loss_values = ap_distance - distance_matrix[torch.LongTensor(np.array([anchor_positive[0]])), torch.LongTensor(negative_indices)] + self.margin
                loss_values = loss_values.data.cpu().numpy()
                
                hard_negative = self.informative_negative(loss_values)
                if hard_negative is not None:
                    hard_negative = negative_indices[hard_negative]
                    triplets.append([anchor_positive[0], anchor_positive[1], hard_negative])

        if len(triplets) == 0:
            triplets.append([anchor_positive[0], anchor_positive[1], negative_indices[0]])

        triplets = np.array(triplets)
        
        return torch.LongTensor(triplets)

In [None]:
class TripletLoss(nn.Module):
    """
    Triplets loss
    Takes a batch of embeddings and corresponding labels.
    Triplets are generated using triplet_selector object that take embeddings and targets and return indices of
    triplets
    """

    def __init__(self, margin, triplet_selector):
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.triplet_selector = triplet_selector

    def forward(self, embeddings, target):

        triplets = self.triplet_selector.get_triplets(embeddings, target)

        if embeddings.is_cuda:
            triplets = triplets.cuda()

            
        anchor_idx= triplets[:, 0]  
        positive_idx= triplets[:, 1]  
        negative_idx= triplets[:, 2]  
            
            
        ap_distances = (embeddings[anchor_idx] - embeddings[positive_idx]).pow(2).sum(1)  # .pow(.5)
        an_distances = (embeddings[anchor_idx] - embeddings[negative_idx]).pow(2).sum(1)  # .pow(.5)
        losses = F.relu((ap_distances - an_distances)/an_distances.mean() + self.margin)

        return losses.mean()

In [None]:
import numpy as np
from tqdm import tqdm


class Trainer():
    def __init__(self,
                 model: torch.nn.Module,
                 device: torch.device,
                 criterion: torch.nn.Module,
                 optimizer: torch.optim.Optimizer,
                 training_DataLoader: torch.utils.data.Dataset,
                 validation_DataLoader: torch.utils.data.Dataset ,
                 epochs: int
                 ):
        
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.training_DataLoader = training_DataLoader
        self.validation_DataLoader = validation_DataLoader
        self.device = device
        self.epochs = epochs

    def run_trainer(self):
        for epoch in tqdm(range(self.epochs)):
            self.model.train()  # train mode
            train_losses=[]
            for batch in self.training_DataLoader:
                x,y=batch
                input, target = x.to(self.device), y.to(self.device)  # send to device (GPU or CPU)
                self.optimizer.zero_grad()  # zerograd the parameters
                out = self.model(input)  # one forward pass
                loss = self.criterion(out, target)  # calculate loss
                 
                loss_value = loss.item()
                train_losses.append(loss_value)
                 
                loss.backward()  # one backward pass
                self.optimizer.step()  # update the parameters

            self.model.eval()  # evaluation mode
            valid_losses = []  # accumulate the losses here

            for batch in self.validation_DataLoader:
                x,y=batch
                input, target = x.to(self.device), y.to(self.device)  # send to device (GPU or CPU)
                with torch.no_grad():
                    out = self.model(input)   # one forward pass
                    loss = self.criterion(out, target) # calculate loss
                 
                    loss_value = loss.item()
                    valid_losses.append(loss_value)
                
            # print the results
            print(f'EPOCH: {epoch+1:0>{len(str(self.epochs))}}/{self.epochs}', end=' ')
            print(f'LOSS: {np.mean(train_losses):.4f}',end=' ')
            print(f'VAL-LOSS: {np.mean(valid_losses):.4f}',end='\n')

In [None]:
train_dataset = TensorDataset(torch.from_numpy(X.astype(np.float32)[:12000]), torch.from_numpy(y.astype(np.float32)[:12000]))
test_dataset = TensorDataset(torch.from_numpy(X.astype(np.float32)[12000:]), torch.from_numpy(y.astype(np.float32)[12000:]))

train_batch_sampler = BalancedBatchSampler(y.astype(np.float32)[:12000], n_classes=20, n_samples=20)
test_batch_sampler = BalancedBatchSampler(y.astype(np.float32)[12000:], n_classes=20, n_samples=20)

triplets_train_loader = DataLoader(train_dataset, batch_sampler=train_batch_sampler)
triplets_test_loader = DataLoader(test_dataset, batch_sampler=test_batch_sampler)

In [None]:
# device
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device=torch.device('cpu')
    
# model
embedding_net = EmbeddingNet()
model = embedding_net.to(device)


# margin value
margin=1

# criterion
criterion = TripletLoss(margin,  RandomTripletSelector())

# optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# trainer
trainer = Trainer(model=model,
                  device=device,
                  criterion=criterion,
                  optimizer=optimizer,
                  training_DataLoader=triplets_train_loader,
                  validation_DataLoader=triplets_test_loader,
                  epochs=10)

# start training
trainer.run_trainer()

In [None]:
q = torch.from_numpy(queries).float().cuda()
t = torch.from_numpy(targets).float().cuda()
g = torch.from_numpy(gt).float().cuda()

print(query_performance(model, q, t, g, 1))

In [None]:
def extract_embeddings(dataloader, model):
    
    cuda = torch.cuda.is_available()
    with torch.no_grad():
        model.eval()
        embeddings = np.zeros((len(dataloader.dataset), 200))
        labels = np.zeros(len(dataloader.dataset))
        k = 0
        for images, target in dataloader:
            if cuda:
                images = images.cuda()
            embeddings[k:k+len(images)] = model.get_embedding(images).data.cpu().numpy()
            labels[k:k+len(images)] = target.numpy()
            k += len(images)
    return embeddings, labels

train_embeddings, train_labels = extract_embeddings(triplets_train_loader, model)
val_embeddings, val_labels = extract_embeddings(triplets_test_loader, model)

In [None]:
from sklearn.manifold import TSNE
def plot_tsne_embeddings(embeddings, targets, xlim=None, ylim=None):
    
    
    # The first 3000 embeddings and targets
    embeddings= embeddings[:3000]
    targets= targets[:3000]

    # Using Tsne to for dimension reduction 
    tsne = TSNE(n_components=2)
    embeddings = tsne.fit_transform(embeddings)
    
    # Plot
    plt.figure(figsize=(10,10))
    for i in range(10):
        inds = np.where(targets==i)[0]
        plt.scatter(embeddings[inds,0], embeddings[inds,1], alpha=0.5)
    if xlim:
        plt.xlim(xlim[0], xlim[1])
    if ylim:
        plt.ylim(ylim[0], ylim[1])
    # plt.legend(classes)

plot_tsne_embeddings(train_embeddings, train_labels)
plot_tsne_embeddings(val_embeddings, val_labels)