# Residual Network

A residual network uses residual blocks that have skip connections to allow for deep networks. 

The skip connections allow the model to overcome the vanishing gradient problem.

## Definitions

In [30]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# works best with len(samples) = 2, 4, 6
def show_samples(samples):
    y = int(len(samples) / 2)
    for i in range(len(samples)):
        plt.subplot(2, y, i + 1)
        plt.subplots_adjust(hspace=1, wspace=1)
        plt.imshow(samples[i][0])
        plt.title("Uninfected" if samples[i][1] else "Infected", loc='center')
        plt.ylabel(f"Height ({samples[i][0].shape[0]})")
        plt.xlabel(f"Width ({samples[i][0].shape[1]})")
    plt.show()

class MalariaDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        image, label = self.dataframe[index]
        if self.transform: 
            image = self.transform(image)
        return image, label

def get_loader(df, batch_size=32, transform=None): 
    dataset = MalariaDataset(df, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False)


class ResidualBlock(nn.Module): 
    def __init__(self, channels, stride=1): 
        assert len(channels) == 3
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels[0], channels[1], 3, stride=stride),
            nn.BatchNorm2d(channels[1]),
            nn.ReLU(),
            nn.Conv2d(channels[1], channels[2], 1),
            nn.BatchNorm2d(channels[2]),
        )
        self.shortcut = nn.Sequential(
            nn.Conv2d(channels[0], channels[2], 3, stride=stride),
            nn.BatchNorm2d(channels[2]),
        )
        self.relu = nn.ReLU()
    
    def forward(self, input): 
        return self.relu(self.block(input) + self.shortcut(input))

class ResidualNet(nn.Module): 
    def __init__(self): 
        super().__init__()
        self.Encoder = nn.Sequential(
            ResidualBlock([3, 16, 32]), 
            ResidualBlock([32, 48, 64]),
            ResidualBlock([64, 96, 128]),
            ResidualBlock([128, 96, 32]), 
            ResidualBlock([32, 28, 24]), 
            ResidualBlock([24, 16, 8]),
        )
        self.FC = nn.Sequential(
            nn.Flatten(),
            nn.Linear(8 * 20 * 20, 32),
            nn.ReLU(),
            nn.Linear(32, 1), 
        )
    
    def forward(self, input): 
        output = self.Encoder(input)
        # print(output.shape)
        output = self.FC(output)
        return output

def evaluate(model, loader): 
    total_correct = 0
    total_samples = 0
    for images, labels in loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images) 
        total_correct += int(((outputs > 0.0).squeeze().long() == labels).sum())
        total_samples += len(labels)
    return float(total_correct) / total_samples

True


## Loading Data

In [26]:
input_size = 32
data_dir = f"../data/corrected_{input_size}.pickle"
df = pd.read_pickle(data_dir)

#### Initial Transformations

In [27]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])
df["image"] = df["image"].apply(lambda x: transform(x))

#### Splitting the Data

In [28]:
np_df = df.to_numpy()
np.random.seed(1000)
np.random.shuffle(np_df)

num_samples = len(np_df)
a = int(num_samples * 0.8)
b = int(num_samples * 0.9)
df_trn = np_df[:a]
df_val = np_df[a:b]
df_tst = np_df[b:]

#### Data Loaders 

We can apply random transformations for the training data to have data augmentation. 

In [29]:
batch_size = 32

trn_transform = transforms.Compose([
    transforms.RandomPerspective(distortion_scale=0.1, p=0.25),
    transforms.RandomAffine(degrees=5, translate=(0.05, 0.05)),
    transforms.RandomRotation(180), 
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
])

trn_loader = get_loader(df_trn, batch_size=batch_size, transform=trn_transform)
val_loader = get_loader(df_val, batch_size=batch_size)
tst_loader = get_loader(df_tst, batch_size=batch_size)

## Training the Model

In [17]:
learning_rate = 0.001
num_epochs = 100

model = ResidualNet().to(device)
loss_function = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)

Ensure that the model architecture is functional. 

In [15]:
for images, _ in trn_loader: 
    images = images.to(device)
    outputs = model(images)
    break

#### Training

In [18]:
try: os.mkdir("./residual_net_0")
except: assert os.path.isdir("./residual_net_0")

best = {
    "epoch": 0, 
    "accuracy": 0
}

for epoch in range(num_epochs): 
    total_correct = 0
    total_samples = 0
    for images, labels in trn_loader:
        images = images.to(device)
        labels = labels.unsqueeze(1).float()
        labels = labels.to(device)
        # forward pass 
        outputs = model(images)
        _, predictions = torch.max(outputs, axis=1)
        # loss value
        loss = loss_function(outputs, labels)
        # backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # training accuracy 
        total_correct += int(((outputs > 0.0) == labels).sum())
        total_samples += len(labels)
    training_accuracy = float(total_correct) / total_samples
    validation_accuracy = evaluate(model, val_loader)
    if validation_accuracy > best["accuracy"]: 
        best["accuracy"] = validation_accuracy
        best["epoch"] = epoch + 1
    print(f"Epoch: {'%.3d' % (epoch + 1)}/{num_epochs}, Training Loss: {loss.item():.6f}, Training Accuracy: {training_accuracy:.6f}, Validation Accuracy: {validation_accuracy:.6f}")
    torch.save(model.state_dict(), f"./residual_net_0/epoch_{epoch + 1}.pt")
    scheduler.step()

print(f"Best Validation Accuracy was {best['accuracy']} at Epoch {best['epoch']}")

Epoch: 001/100, Training Loss: 0.028609, Training Accuracy: 0.913000, Validation Accuracy: 0.980044
Epoch: 002/100, Training Loss: 0.011318, Training Accuracy: 0.972512, Validation Accuracy: 0.979318
Epoch: 003/100, Training Loss: 0.037767, Training Accuracy: 0.974100, Validation Accuracy: 0.977504
Epoch: 004/100, Training Loss: 0.012553, Training Accuracy: 0.975733, Validation Accuracy: 0.975327
Epoch: 005/100, Training Loss: 0.015300, Training Accuracy: 0.975415, Validation Accuracy: 0.977504
Epoch: 006/100, Training Loss: 0.007210, Training Accuracy: 0.976368, Validation Accuracy: 0.979318
Epoch: 007/100, Training Loss: 0.015045, Training Accuracy: 0.977139, Validation Accuracy: 0.979318
Epoch: 008/100, Training Loss: 0.012359, Training Accuracy: 0.977638, Validation Accuracy: 0.982221
Epoch: 009/100, Training Loss: 0.016052, Training Accuracy: 0.978227, Validation Accuracy: 0.981495
Epoch: 010/100, Training Loss: 0.008965, Training Accuracy: 0.977320, Validation Accuracy: 0.981132


## Evaluating the Model

Now we can evaluate the model on the testing data. 

In [31]:
model = ResidualNet()
model.load_state_dict(torch.load(f"./best_models/residual_net_98_73.pt"))
model.to(device)
print("Model Loaded")

Model Loaded


In [33]:
model.eval()
testing_accuracy = evaluate(model, tst_loader)
print(f"Testing Accuracy: {testing_accuracy}")

Testing Accuracy: 0.9836719883889695


Testing Accuracy is 98.37%