# TOP

# SECRET

# NOTEBOOK

![more layers](https://i.redd.it/5193db0avbey.jpg)

In [None]:
from typing import *

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torch.optim as optim

import numpy as np
from torchvision import datasets, transforms

from tqdm import tqdm

import PIL

In [None]:
CUDA_AVAILABLE = torch.cuda.is_available()

if not CUDA_AVAILABLE:
    print("If you are running this on Google Colab then")
    print("Menu -> Runtime -> Change runtime type -> Hardware Accelerator -> GPU")
    print("Then try this again...")

In [None]:
def score(
    model: nn.Module,
    valid: torch.utils.data.DataLoader,
) -> float:
    correct, incorrect = 0, 0

    with torch.no_grad():
        for inputs, targets in tqdm(valid, desc=f"validation"):
            if CUDA_AVAILABLE:
                inputs, targets = inputs.cuda(), targets.cuda()

            outputs = model(inputs)
            matching = torch.eq(targets, outputs.argmax(dim=1)).sum().item()

            correct += matching
            incorrect += inputs.shape[0] - matching
    
    return correct / (correct + incorrect)

In [None]:
def train(
    model: nn.Module,
    train: torch.utils.data.DataLoader,
    valid: torch.utils.data.DataLoader,
    epochs: int = 4,
    lr: float = 0.001
) -> None:
    optimizer = optim.AdamW(params=model.parameters(), lr=lr)
    loss = nn.CrossEntropyLoss()

    train_batches = len(train)
    train_loss = 0.

    valid_batches = len(valid)
    valid_loss = 0.

    for epoch in range(epochs):
        train_loss = eval_loss = 0.

        for inputs, targets in tqdm(train, desc=f"train {epoch}"):
            if CUDA_AVAILABLE:
                inputs, targets = inputs.cuda(), targets.cuda()

            optimizer.zero_grad()
            outputs = model(inputs)
            loss_value = loss(outputs, targets)
            loss_value.backward()
            optimizer.step()

            train_loss += loss_value.item()

        with torch.no_grad():
            for inputs, targets in tqdm(valid, desc=f"valid {epoch}"):
                if CUDA_AVAILABLE:
                    inputs, targets = inputs.cuda(), targets.cuda()

                outputs = model(inputs)
                loss_value = loss(outputs, targets)
                valid_loss += loss_value.item()
        
        # remember tensorboardx makes pretty graphs
        train_loss /= train_batches
        valid_loss /= valid_batches
        valid_score = score(model, valid)
        print()
        print(f"train loss: {train_loss:.2e}")
        print(f"valid loss: {valid_loss:.2e}")
        print(f"valid score: {valid_score:.2f}")

In [None]:
train_ds = datasets.CIFAR10(
    'data',
    download=True,
    train=True,
    transform=transforms.Compose([
        # This lets you randomly apply all the transformations in this list.
        # The test is not once per transform, it either skips all transforms or applies all transforms.
        transforms.RandomApply([
            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1)
        ]),
        
        # This is a combination of RandomApply and HorizontalFlip, by default has a 50% chance of flipping the image
        transforms.RandomHorizontalFlip(),

        # We can only train with tensors, so we convert the image to a tensor.
        transforms.ToTensor(),

        # A very good thing to do is to normalize the tensors.
        # This ensures the resulting tensors have a mean of 0 and a standard deviation of 1.
        # For pre-existing datasets you can look up the normalization values, or you can calculate them like I did above.
        transforms.Normalize(mean=(0.49139968, 0.48215841, 0.44653091), std=(0.24703223, 0.24348513, 0.26158784)),
    ]),
)

valid_ds = datasets.CIFAR10(
    'data',
    download=True,
    train=False,
    transform=transforms.Compose([
        transforms.ToTensor(),
        # Must apply the same normalization!
        transforms.Normalize(mean=(0.49139968, 0.48215841, 0.44653091), std=(0.24703223, 0.24348513, 0.26158784)),
    ]),
)

In [None]:
BATCH_SIZE = 128

train_dl = torch.utils.data.DataLoader(
    train_ds,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4
)
valid_dl = torch.utils.data.DataLoader(
    valid_ds,
    batch_size=BATCH_SIZE * 2,
    shuffle=True,
    num_workers=4,
)

In [None]:
# model is here so that it is created fresh every time you evaluate it

ACTIVATION_FN = nn.LeakyReLU()

def conv2_same(in_channels: int, out_channels: int, kernel: Tuple[int, int] = (3, 3)) -> nn.Module:
    padding = (int(kernel[0] / 2), int(kernel[1] / 2))
    return nn.Sequential(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel,
            padding=padding,
            padding_mode='reflect',
        ),
        nn.BatchNorm2d(out_channels),
        ACTIVATION_FN,
    )

def conv2_stride(in_channels: int, out_channels: int) -> nn.Module:
    return nn.Sequential(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=(3, 3),
            stride=(2, 2),
            padding=(1, 1),
            padding_mode='reflect',
        ),
        nn.BatchNorm2d(out_channels),
        ACTIVATION_FN,
    )

class SkipSequential(nn.Sequential):
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x_prime = super().forward(x)
        return torch.cat([x, x_prime], dim=1)

def intro(out_channels: int) -> nn.Module:
    return nn.Sequential(
        conv2_same(3, out_channels, kernel=(7, 7)),
        ACTIVATION_FN,
    )

def classification(in_channels: int, out_channels: int = 10) -> nn.Module:
    return nn.Sequential(
        nn.AdaptiveMaxPool2d(output_size=(1,1)), # could try AdaptiveAvg
        nn.Flatten(), # SNEAKY
        nn.Linear(in_features=in_channels, out_features=out_channels)
    )

def pyramid_block(in_channels: int, out_channels: int) -> nn.Module:
    mid_channels = in_channels + ((out_channels - in_channels) // 2)
    # outer -> middle -> inner -> middle -> outer
    
    # input_size: out_channels
    # output_size: out_channels + out_channels
    inner = SkipSequential(
        conv2_stride(mid_channels, mid_channels),
        conv2_same(mid_channels, out_channels),
        nn.Upsample(scale_factor=2),
    )
    
    # input_size: in_channels
    # output_size: in_channels + mid_channels + out_channels
    middle = SkipSequential(
        conv2_stride(in_channels, mid_channels),
        inner,
        nn.Upsample(scale_factor=2),
    )
    
    # input_size: in_channels
    # output_size: out_channels
    outer = nn.Sequential(
        middle,
        conv2_stride(
            in_channels=in_channels + mid_channels + out_channels,
            out_channels=out_channels,
        ),
    )
    return outer

def repeating_block(in_channels: int, out_channels: int) -> nn.Module:
    mid_channels = in_channels + ((out_channels - in_channels) // 2)
    return nn.Sequential(
        conv2_same(in_channels, mid_channels),
        conv2_same(mid_channels, mid_channels),
        conv2_stride(mid_channels, out_channels),
    )

model = nn.Sequential(
    intro(64),
    pyramid_block(64, 64), # image now 16x16
    pyramid_block(64, 128), # image now 8x8
    pyramid_block(128, 256), # image now 4x4
    repeating_block(256, 512), # image now 2x2
    classification(512)
)

if CUDA_AVAILABLE:
    model = model.cuda()

train(model, train_dl, valid_dl, epochs=4, lr=0.001)
score(model, valid_dl)

In [None]:
model = nn.Sequential(
    intro(64),
    pyramid_block(64, 64), # image now 16x16
    pyramid_block(64, 128), # image now 8x8
    pyramid_block(128, 256), # image now 4x4
    repeating_block(256, 512), # image now 2x2
    classification(512)
)

if CUDA_AVAILABLE:
    model = model.cuda()

train(model, train_dl, valid_dl, epochs=20, lr=0.001)
score(model, valid_dl)

In [None]:
# model is here so that it is created fresh every time you evaluate it

ACTIVATION_FN = nn.LeakyReLU()

def conv2_same(in_channels: int, out_channels: int, kernel: Tuple[int, int] = (3, 3), dropout: bool = True) -> nn.Module:
    padding = (int(kernel[0] / 2), int(kernel[1] / 2))
    core = nn.Sequential(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=kernel,
            padding=padding,
            padding_mode='reflect',
        ),
        nn.BatchNorm2d(out_channels),
        ACTIVATION_FN,
    )
    if not dropout:
        return core
    return nn.Sequential(nn.Dropout(), core)

def conv2_stride(in_channels: int, out_channels: int, dropout: bool = True) -> nn.Module:
    core = nn.Sequential(
        nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=(3, 3),
            stride=(2, 2),
            padding=(1, 1),
            padding_mode='reflect',
        ),
        nn.BatchNorm2d(out_channels),
        ACTIVATION_FN,
    )
    if not dropout:
        return core
    return nn.Sequential(nn.Dropout(), core)

class SkipSequential(nn.Sequential):
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x_prime = super().forward(x)
        return torch.cat([x, x_prime], dim=1)

def intro(out_channels: int) -> nn.Module:
    return nn.Sequential(
        conv2_same(3, out_channels, kernel=(7, 7), dropout=False),
        ACTIVATION_FN,
    )

def classification(in_channels: int, out_channels: int = 10) -> nn.Module:
    return nn.Sequential(
        nn.AdaptiveMaxPool2d(output_size=(1,1)), # could try AdaptiveAvg
        nn.Flatten(), # SNEAKY
        nn.Linear(in_features=in_channels, out_features=out_channels)
    )

def pyramid_block(in_channels: int, out_channels: int) -> nn.Module:
    mid_channels = in_channels + ((out_channels - in_channels) // 2)
    # outer -> middle -> inner -> middle -> outer
    
    # input_size: out_channels
    # output_size: out_channels + out_channels
    inner = SkipSequential(
        conv2_stride(mid_channels, mid_channels),
        conv2_same(mid_channels, out_channels),
        nn.Upsample(scale_factor=2),
    )
    
    # input_size: in_channels
    # output_size: in_channels + out_channels + out_channels
    middle = SkipSequential(
        conv2_stride(in_channels, mid_channels),
        inner,
        nn.Upsample(scale_factor=2),
    )
    
    # input_size: in_channels
    # output_size: out_channels
    outer = nn.Sequential(
        middle,
        conv2_stride(
            in_channels=in_channels + mid_channels + out_channels,
            out_channels=out_channels,
        ),
    )
    return outer

def repeating_block(in_channels: int, out_channels: int) -> nn.Module:
    mid_channels = in_channels + ((out_channels - in_channels) // 2)
    return nn.Sequential(
        conv2_same(in_channels, mid_channels),
        conv2_same(mid_channels, mid_channels),
        conv2_stride(mid_channels, out_channels),
    )

model = nn.Sequential(
    intro(64),
    pyramid_block(64, 64), # image now 16x16
    pyramid_block(64, 128), # image now 8x8
    pyramid_block(128, 256), # image now 4x4
    repeating_block(256, 512), # image now 2x2
    classification(512)
)

if CUDA_AVAILABLE:
    model = model.cuda()

train(model, train_dl, valid_dl, epochs=5, lr=0.01)
train(model, train_dl, valid_dl, epochs=5, lr=0.001)
train(model, train_dl, valid_dl, epochs=5, lr=0.0001)