<a href="https://colab.research.google.com/github/yandexdataschool/MLatImperial2022/blob/master/Seminars/lab_06_Conv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Recap

Some imports:

In [None]:
from torchvision.datasets import FashionMNIST
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import torch
print(torch.__version__)
from IPython.display import clear_output

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    print("WARNING: gpu not found, the code will run on cpu")
    device = torch.device('cpu')

print(f'Device is: "{device}".')

A utility class to monitor the training procedure:

In [None]:
from IPython.display import clear_output

class Logger:
    def __init__(self):
        self.train_loss_batch = []
        self.train_loss_epoch = []
        self.test_loss_batch = []
        self.test_loss_epoch = []
        self.train_batches_per_epoch = 0
        self.test_batches_per_epoch = 0
        self.epoch_counter = 0

    def fill_train(self, loss):
        self.train_loss_batch.append(loss)
        self.train_batches_per_epoch += 1

    def fill_test(self, loss):
        self.test_loss_batch.append(loss)
        self.test_batches_per_epoch += 1

    def finish_epoch(self, make_plot=True):
        self.train_loss_epoch.append(np.mean(
            self.train_loss_batch[-self.train_batches_per_epoch:]
        ))
        self.test_loss_epoch.append(np.mean(
            self.test_loss_batch[-self.test_batches_per_epoch:]
        ))
        self.train_batches_per_epoch = 0
        self.test_batches_per_epoch = 0
    
        if make_plot:
            clear_output()
  
        print("epoch #{} \t train_loss: {:.8} \t test_loss: {:.8}".format(
                  self.epoch_counter,
                  self.train_loss_epoch[-1],
                  self.test_loss_epoch [-1]
              ))
    
        self.epoch_counter += 1

        if make_plot:
            plt.figure(figsize=(11, 5))

            plt.subplot(1, 2, 1)
            plt.plot(self.train_loss_batch, label='train loss')
            plt.xlabel('# batch iteration')
            plt.ylabel('loss')
            plt.legend()

            plt.subplot(1, 2, 2)
            plt.plot(self.train_loss_epoch, label='average train loss')
            plt.plot(self.test_loss_epoch , label='average test loss' )
            plt.legend()
            plt.xlabel('# epoch')
            plt.ylabel('loss')
            plt.show();

Make dataloaders

In [None]:
from torch.utils.data import Dataset, DataLoader

class FashionMNISTDataset(Dataset):
    def __init__(self, X, y=None, device='cuda'):
        self.device = device
        self.X, self.y = self.preprocess_data(X, y)
        
    def preprocess_data(self, X, y):
        X_preproc = torch.tensor(X / 255.,
                                    dtype=torch.float).reshape(-1, 28 * 28).to(self.device)
        
        if (y is None):
            return X_preproc, None
        
        return X_preproc, torch.tensor(y).to(self.device)
        
    def __len__(self):
        return self.X.shape[0]
    
    def __getitem__(self, idx):
        if (self.y is None):
            return self.X[idx]
        
        return self.X[idx], self.y[idx]

Our model:

In [None]:
class Model(torch.nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.m =  torch.nn.Linear(input_dim, output_dim)
        
    def forward(self, X):
        return self.m(X)
    
def train(model, optimizer, scheduler, dl_train, dl_test, criterion, n_epochs):
    logger = Logger()
    
    for i_epoch in range(n_epochs):
        model.train()
        for batch_X, batch_y in dl_train:
            optimizer.zero_grad()
            
            loss = criterion(model(batch_X), batch_y)
            loss.backward()
            optimizer.step()

            logger.fill_train(loss.item())
            
        model.eval()
        with torch.no_grad():
            for batch_X, batch_y in dl_test:
                loss = criterion(model(batch_X), batch_y)
                logger.fill_test(loss.item())

        logger.finish_epoch()
        scheduler.step()
        
def predict(model, dl_test):
    model.eval()
    prediction = torch.zeros((len(dl_test.dataset), ), dtype=torch.long).cuda()
    idx = 0
    with torch.no_grad():
        for batch_X , _ in dl_test:
            pred = model(batch_X).squeeze()
            size = pred.shape[0]
            prediction[idx:idx + size] = torch.argmax(pred, dim=1)
            idx += size
    
    return prediction

def accuracy_score(y_pred, y_test):
    return (y_pred == y_test).sum()/len(y_test)

Getting the data:

In [None]:
# Getting the train and test parts of the dataset
data_train = FashionMNIST("FashionMNIST/",
                          download=True,
                          train=True)

data_test = FashionMNIST("FashionMNIST/",
                          download=True,
                          train=False)

# In fact, it's already stored as torch tensor, but we'll need
# to work with the numpy representation, so let's do the convertion:
X_train = data_train.train_data.numpy()
y_train = data_train.train_labels.numpy()

X_test = data_test.test_data.numpy()
y_test = data_test.test_labels.numpy()

In [None]:
BATCH_SIZE = 128

ds_train = FashionMNISTDataset(X_train, y_train)
ds_test = FashionMNISTDataset(X_test, y_test)

dl_train = DataLoader(ds_train, batch_size = BATCH_SIZE, shuffle=True)
dl_test = DataLoader(ds_test, batch_size = BATCH_SIZE, shuffle=False)
y_test = torch.tensor(y_test).to(device)

#### Yesterday's model

In [None]:
from torch.optim.lr_scheduler import StepLR

# Defining the loss function:
criterion = torch.nn.CrossEntropyLoss()

# Defining the model
input_dim = 28 * 28 # number of pixels per image
output_dim = 10 # number of classes
model = Model(input_dim, output_dim).to(device)

# Setting up the optimizer
learning_rate = 0.005
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size = 2, gamma = 0.5)

train(model, optimizer, scheduler, dl_train, dl_test, criterion, n_epochs = 15)

Let's visualize our model

In [None]:
optimizer.param_groups[0]['lr']

In [None]:
lr0 = 0.005
lr1 = 0.0005
num_updates = 7
gamma = (lr1/lr0)**(1.0/num_updates) # lr1 = lr0 * (gamma)**(num_updates)
gamma

In [None]:
accuracy_score(predict(model, dl_test), y_test)

In [None]:
!pip install torchviz
from torchviz import make_dot

### Visualise

In [None]:
x, y = next(iter(dl_train))
y_pred = model(x)
loss = criterion(y_pred, y)

make_dot(loss)

### Convolutional layers

<img src='https://cdn-images-1.medium.com/max/1600/0*iqNdZWyNeCr5tCkc.' alt='CNN animation'>

One filter is applied to all the channels of the input image image, i.e.:
$$
\mathrm{Conv}(x,y,o) = \sum_{i=x - T}^{x + T} \sum_{j=y - T}^{y + T} \sum_{c=1}^C F_{o}(i - (x - T), j - (y - T), c) \cdot I(i,j,c)
$$


the output value F in position x, y and output channel o will be calculated by the formula above
- I is the input image of size $\mathbb{R}^{H \times W \times C}$
- F is the kernel of size $\mathbb{R}^{K \times K \times C}, K = 2T + 1$

<font color='red'>Question:</font>What will be the output size of the:

- picture of size 1x3x3, applied conv filter 3x3, stride=1, padding=0
- picture of size 1x10x10, applied conv filter 3x3, stride=1, padding=0
- picture of size 1x10x10, applied conv filter 3x3, stride=1, padding=1

- picture of size 3x20x20, applied conv filter 3x3, stride=3, padding=0


Eventually, this is the formula to calculate output size
$$
H_{out} = \lfloor \frac{H_{input} + 2P − K}{S}+1 \rfloor \\
W_{out} = \lfloor \frac{W_{input} + 2P − K}{S}+1 \rfloor
$$

- $H_{input}, W_{input}$ are input image sizes
- $H_{out}, W_{out}$ are output conv feature sizes
- K is the kernel size
- P is the padding
- S is the stride

<img src='https://cdn-images-1.medium.com/max/2000/1*vkQ0hXDaQv57sALXAJquxA.jpeg' alt='img'>
(image taken from https://towardsdatascience.com/a-comprehensive-guide-to-convolutional-neural-networks-the-eli5-way-3bd2b1164a53)

A general view of the most common convolutional architecture is shown above. The main idea is to gradually reduce the size of the image while increasing the number of channels. This is motivated by the following:

 - It's expensive (in terms of memory) to make a lot of channels for a large image, while smaller sized images allow us to do so. Intuitively, there's a trade-off between image size and number of channels.
 - We actually don't need that many channels at lower levels since there's not that many distinct low-level features for an image. Higher level features are more complex and require more filters (channels).
 - At the left side of the diagram (for low-level features) we care more about the positional information (e.g. "is this stroke located near that one?"), while at the right side (high-level features) we want to know what kind of an object we see, rather than where exactly we see it (e.g. "looks like there's furry face somewhere in this picture - I might be looking at a cat")

#### Getting a grip on convolutions

Let's get an image of a flower:

In [None]:
!wget https://upload.wikimedia.org/wikipedia/commons/b/ba/Flower_jtca001.jpg -O flower.jpg

In [None]:
from PIL import Image

img = torch.from_numpy(np.array(Image.open("flower.jpg").convert("L").resize((500, 350)))).unsqueeze(0).unsqueeze(0)/255.0
img.shape

Apply Sobel convolution

Horizontal Sobel filter:
```
[[ -1., 0., 1.],
 [ -2., 0., 2.],
 [ -1., 0., 1.]]
```
Vertical Sobel filter:
```
[[ -1., -2., -1.],
 [  0.,  0.,   0.],
 [  1.,  2.,  1.]]
```

In [None]:
Sobel_filter_h = <YOUR_CODE> / 8.0 # normalized Sobel filter
Sobel_filter_v = <YOUR_CODE> / 8.0 # normalized Sobel filter

# use torch.nn.functional.conv2d
res_h = <YOUR_CODE>
res_v = <YOUR_CODE>

res = torch.sqrt(res_h ** 2 + res_v ** 2).clip(0.0, 1.0) # compute gradient norm

plt.figure(figsize=(8, 8))
plt.imshow(res.numpy(), cmap='gray')

Once you're done try other kernels and see how they affect the image – what features do they highlight?


What will happen if you apply a convolution twice? `n` times?

#### Building a CNN

Convolutional layers in torch expect their input to be of 4-dimensional shape: $(B, C, H, W)$. Here $B$ is the number of images per batch, $C$ is the number of channels (e.g. 1 for a greyscale image, 3 for an RGB one, or number of filters from the previous convolutional layer). $H$ and $W$ are height and width in pixels.

This means, at the beggining of our network we need to reshape our images from $(B, 784)$ to $(B, 1, 28, 28)$. In the end we'll want to reshape it back to 2 dimensions in order to apply a linear connection.

For some reason torch doesn't have a reshaping layer, so we'll implement our own:

In [None]:
class Reshape(torch.nn.Module):
    def __init__(self, *shape):
        super().__init__()
        self.shape = shape

    def forward(self, x):
        return x.reshape(x.shape[0], *self.shape)
    
def conv_out_size(size, conv_params, num_conv_blocks):
    for c in range(num_conv_blocks):
        size = size + 2 * conv_params['padding'] - conv_params['kernel_size'][0] + 1
        size = size//2
    return size

Ok, now let's create and train a convolutional NN!

Do keep in mind the model architecture from the picture above. I.e. we want to gradually reduce the size of the image while increasing the number of channels. We also want at least one fully connected layer at the end of the network.

Use `torch.nn.Conv2d` for convolutions and `torch.nn.MaxPool2d` for max pooling.
Also try `torch.nn.BatchNorm2d` and `torch.nn.Dropout` regularizers.

In [None]:
class Model(torch.nn.Module):
    def __init__(self, input_dim, conv_params, dropout_p, output_dim):
        super().__init__()
        out_size = conv_out_size(input_dim[-1], conv_params, 2)
        self.m =  torch.nn.Sequential(
            Reshape(*input_dim),
            ...
            <YOUR_CODE>
            ...
            torch.nn.Linear(?, 10)
        ).to(device)
        
    def forward(self, X):
        return self.m(X)

# Defining the loss function:
criterion = torch.nn.CrossEntropyLoss()

# Defining the model
input_dim = (1, 28, 28)
conv_params = {'kernel_size': <YOUR_CODE>, 'padding': <YOUR_CODE>}
dropout_p = <YOUR_CODE>
output_dim = 10 # number of classes

model = Model(input_dim, conv_params, dropout_p, output_dim).to(device)

# Setting up the optimizer
learning_rate = 0.005
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size = 2, gamma = 0.5)

train(model, optimizer, scheduler, dl_train, dl_test, criterion, n_epochs = 10)

In [None]:
accuracy_score(predict(model, dl_test), y_test)

Let's see our model graph now:

In [None]:
x, y = next(iter(dl_train))
y_pred = model(x)
loss = criterion(y_pred, y)

make_dot(loss)

In [None]:
def visualise_conv(f_maps):
    N = f_maps.shape[0] # suppose N = k^2, k > 1
    k = int(np.sqrt(N))
    
    fig, ax = plt.subplots(k, k, figsize=(10, 10))
    for i in range(k):
        for j in range(k):
            im = ax[i, j].imshow(f_maps[i*k + j].cpu().numpy())
            ax[i, j].axis('off')
            
    fig.subplots_adjust(right=0.8)
    cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7])
    fig.colorbar(im, cax=cbar_ax)
    plt.show()

In [None]:
# Visualise learned kernels

visualise_conv(model.m[1].weight.detach().squeeze())

In [None]:
activation = {}

def get_activation(name):
    def hook(model, input, output):
        activation[name] = output.detach()
    return hook

model.m[1].register_forward_hook(get_activation('conv1'))
model.m[5].register_forward_hook(get_activation('conv2'))

x, _ = next(iter(dl_train))
output = model(x[0].unsqueeze(0))

In [None]:
activation['conv1'].shape, activation['conv2'].shape

In [None]:
# Visualise conv feature maps

visualise_conv(activation['conv1'].squeeze())

In [None]:
visualise_conv(activation['conv2'].squeeze())

In [None]:
predictions_test = predict(model, dl_test).cpu().numpy()
mask = predictions_test != y_test.cpu().numpy()

wrongly_predicted_objects = X_test[mask]
wrongly_predicted_labels = predictions_test[mask]
wrongly_predicted_labels_true = y_test.cpu().numpy()[mask]

label_names = np.array([
    'T-shirt/top',
    'Trouser',
    'Pullover',
    'Dress',
    'Coat',
    'Sandal',
    'Shirt',
    'Sneaker',
    'Bag',
    'Ankle boot',
])


# Print and plot the first 100:
wrongly_predicted_labels = label_names[wrongly_predicted_labels[:100]].reshape(10, 10)
wrongly_predicted_labels_true = label_names[wrongly_predicted_labels_true[:100]].reshape(10, 10)

for ix in range(10):
    for iy in range(10):
        plt.text(ix / 5, iy / 8, (wrongly_predicted_labels[-1 - iy, ix]), color='red')
        plt.text(ix / 5, iy / 8 + 0.05, (wrongly_predicted_labels_true[-1 - iy, ix]), color='green')
plt.axis('off')
plt.show()

plt.figure(figsize=(12,12))
plt.imshow(wrongly_predicted_objects[:100].reshape(10, 10, 28, 28)
           .transpose(0, 2, 1, 3).reshape(280, 280), cmap="Greys")
plt.axis('off');

## Bonus

Let's do some augmentation with [torchvision](https://pytorch.org/vision/stable/transforms.html).

In [None]:
from torchvision import transforms

In [None]:
X_train = data_train.train_data.numpy()
y_train = data_train.train_labels.numpy()

X_test = data_test.test_data.numpy()
y_test = data_test.test_labels.numpy()

Transformation example

In [None]:
im = torch.from_numpy(X_test[0]).unsqueeze(0)/255.0
trans = transforms.RandomRotation(degrees=(-50, 50))
im_trans = trans(im).squeeze().numpy()

plt.imshow(im_trans);

In [None]:
class FashionMNISTDataset(Dataset):
    def __init__(self, X, y=None, device='cuda', do_aug=False):
        self.device = device
        self.X, self.y = self.preprocess_data(X, y)
        self.do_aug = do_aug
        
        self.transforms = torch.nn.Sequential(
            transforms.RandomRotation(degrees=(-10, 10)),
            transforms.RandomPerspective(distortion_scale=0.2, p=0.5)
        )
        
    def preprocess_data(self, X, y):
        X_preproc = torch.tensor(X / 255.,
                                    dtype=torch.float).reshape(-1, 28 * 28).to(self.device)
        
        X_preproc = (X_preproc - X_preproc.mean(1).unsqueeze(1))/X_preproc.std(1).unsqueeze(1)
        if (y is None):
            return X_preproc, None
        
        return X_preproc.reshape(-1, 28, 28), torch.tensor(y).to(self.device)
        
    def __len__(self):
        return self.X.shape[0]
    
    def __getitem__(self, idx):
        if (self.y is None):
            return self.X[idx]

        if (self.do_aug):
            X_aug = self.transforms(self.X[idx].unsqueeze(0)).squeeze()
        else:
            X_aug = self.X[idx]

        return X_aug.reshape(-1), self.y[idx]

In [None]:
BATCH_SIZE = 128

ds_train = FashionMNISTDataset(X_train, y_train, do_aug=True)
ds_test = FashionMNISTDataset(X_test, y_test)

dl_train = DataLoader(ds_train, batch_size = BATCH_SIZE, shuffle=True)
dl_test = DataLoader(ds_test, batch_size = BATCH_SIZE, shuffle=False)
y_test = torch.tensor(y_test).to(device)

In [None]:
# Defining the loss function:
criterion = torch.nn.CrossEntropyLoss()

# Defining the model
input_dim = (1, 28, 28)
conv_params = {'kernel_size': (3, 3), 'padding': 1}
dropout_p = 0.2
output_dim = 10 # number of classes

model = Model(input_dim, conv_params, dropout_p, output_dim).to(device)

# Setting up the optimizer
learning_rate = 0.005
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size = 2, gamma = 0.8)

train(model, optimizer, scheduler, dl_train, dl_test, criterion, n_epochs = 15)

In [None]:
 accuracy_score(predict(model, dl_test), y_test)


## Tomorrow's lecture: Style Transfer
- [Style Transfer in pytorch](https://pytorch.org/tutorials/advanced/neural_style_tutorial.html)

