<a href="https://colab.research.google.com/github/vic-comm/deep-learning-pytorch/blob/main/Dog_breed_classification_with_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION (/kaggle/input) IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'dog-breed-image-dataset:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F5219537%2F8702511%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240913%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240913T093748Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D31c7dc792afc8561e002e3915577b6de280d98ef6f1893d2876484a8d12b3d9b451962cff654d6f4c6db05a6f156e24799a5e20047ef402fb88ff7a302208a8cc1e023d1d6465d8b2bda111dd86542255cfbbec5354a3ea62f4a8c59a07045c31a4ead24dae063a3ae63e81de37767079825e6aea822498226d238707ba48fdb04a97bb362be3b0e69f2cb5d19ac1d0c656ddcf4ec8eb0b5d5a5c4bd8d366a31d44f05df6f61fadc62833163f32722ae986b9fb276328fe734e1416556a1d0ae26ed291d4e5bac724c87c301bbe7e06653b4887e4c1c9c3331fd03cf85c7433c949067e9fdd4f9b82f9cd6ec6f7511bf86c20d43661cbe638945617c2ba931e1'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


In [None]:
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn.functional as f
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tqdm.notebook import tqdm, trange

In [None]:
transform = transforms.Compose([transforms.RandAugment(),transforms.Resize((32, 32)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])
data = torchvision.datasets.ImageFolder(r"/kaggle/input/dog-breed-image-dataset/dataset", transform=transform)

In [None]:
train_len = round(0.8 * len(data))
test_len = len(data) - train_len
train_data, test_data = torch.utils.data.random_split(data, [train_len, test_len])

In [None]:
train_loader = DataLoader(train_data, shuffle=True, batch_size=32)
test_loader = DataLoader(test_data, shuffle=True, batch_size=32)

In [None]:
class Attention_CNN(nn.Module):
    def __init__(self, channels_in):
        super().__init__()
        self.conv1 = nn.Conv2d(channels_in, 64, 3, 1, 1)
        self.norm = nn.LayerNorm(64)
        self.mha = nn.MultiheadAttention(64, num_heads=1, batch_first=True)
        self.conv2 = nn.Conv2d(64, 64, 3, 2, 1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 3, 2, 1)
        self.bn2 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, 3, 2, 1)
        self.bn3 = nn.BatchNorm2d(128)
        self.do = nn.Dropout(p=0.4)
        self.fc = nn.Linear(128*4*4, 10)
        self.scale = nn.Parameter(torch.zeros(1))

    def use_attention(self, x):
        bs, c, h, w = x.shape
        x_att = x.reshape(bs, c, h*w).transpose(1, 2)
        x_att = self.norm(x_att)
        att_out, att_map = self.mha(x_att, x_att, x_att)

        return att_out.transpose(1, 2).reshape(bs, c, h, w), att_map

    def forward(self, x):
        x = self.conv1(x)
        # Apply self-attention mechanism and add to the input
        x = self.scale * self.use_attention(x)[0] + x

        # Apply batch normalization and ReLU activation
        x = f.relu(x)

        # Additional convolutional layers
        x = f.relu(self.bn1(self.conv2(x)))
        x = f.relu(self.bn2(self.conv3(x)))
        x = f.relu(self.bn3(self.conv4(x)))

        # Flatten the output and apply dropout
        x = self.do(x.reshape(x.shape[0], -1))

        # Fully connected layer for final output
        return self.fc(x)

In [None]:
!pip install torchsummary
from torchsummary import summary
model = Attention_CNN(3)
summary(model, input_size=(3, 32, 32))

In [None]:
# Create a dataloader itterable object
dataiter = next(iter(test_loader))
# Sample from the itterable object
test_images, test_labels = dataiter

# Lets visualise an entire batch of images!
plt.figure(figsize = (20,10))
out = torchvision.utils.make_grid(test_images, 8, normalize=True)
plt.imshow(out.numpy().transpose((1, 2, 0)))

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3)

In [None]:
device = "cuda" if torch.cuda.is_available() else 'cpu'
train_loss_logger = []
test_loss_logger = []
model.to(device)
def train_model(model, optimizer, criterion, loader, loss_logger):
    model.train()

    for i, (img, label) in enumerate(tqdm(loader, desc="Training", leave=False)):
        output = model(img.to(device))
        loss = loss_fn(output, label.to(device))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loss_logger.append(loss.item())

    return model, optimizer, loss_logger

def evaluate(model, loader):
    model.eval()
    epoch_acc = 0

    with torch.inference_mode():
        for i, (img, label) in enumerate(tqdm(loader, leave=False, desc='Evaluating')):
            output = model(img.to(device))
            epoch_acc += (output.argmax(1) == label.to(device)).sum().item()

    return (epoch_acc / len(loader.dataset))

In [None]:
training_loss_logger = []
validation_acc_logger = []
training_acc_logger = []

valid_acc = 0
train_acc = 0
num_epochs = 50

# This cell implements our training loop
pbar = trange(0, num_epochs, leave=False, desc="Epoch")
for epoch in pbar:
    pbar.set_postfix_str('Accuracy: Train %.2f%%, Val %.2f%%' % (train_acc * 100, valid_acc * 100))

    # Call the training function and pass training dataloader etc
    model, optimizer, training_loss_logger = train_model(model=model,
                                                   optimizer=optimizer,
                                                   loader=train_loader,
                                                   criterion=loss_fn,
                                                   loss_logger=training_loss_logger)

    # Call the evaluate function and pass the dataloader for both validation and training
    train_acc = evaluate(model=model, loader=train_loader)
    valid_acc = evaluate(model=model, loader=test_loader)

    # Log the train and validation accuracies
    validation_acc_logger.append(valid_acc)
    training_acc_logger.append(train_acc)

print("Training Complete")

In [None]:
plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(training_loss_logger))
plt.plot(train_x, training_loss_logger)
_ = plt.title("Attention_CNN Training Loss")

In [None]:
plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(training_acc_logger))
plt.plot(train_x, training_acc_logger, c = "y")
valid_x = np.linspace(0, num_epochs, len(validation_acc_logger))
plt.plot(valid_x, validation_acc_logger, c = "k")

plt.title("Attention_CNN")
_ = plt.legend(["Training accuracy", "Validation accuracy"])

In [None]:
# Lets visualise the prediction for a few test images!

with torch.no_grad():
    fx = model(test_images[:8].to(device))
    pred = fx.argmax(-1)

plt.figure(figsize = (20,10))
out = torchvision.utils.make_grid(test_images[:8], 8, normalize=True)
plt.imshow(out.numpy().transpose((1, 2, 0)))

print("Predicted Values\n", list(pred.cpu().numpy()))
print("True Values\n", list(test_labels[:8].numpy()))

In [None]:
# Assuming model and test_images are already defined and loaded
with torch.no_grad():
    x = model.conv1(test_images[:8].to(device))
    _, att_map = model.use_attention(x)

# Index of the image you want to visualize
img_idx = 6

# Specify the dimensions for the attention map visualization
x_dim = 5
y_dim = 25

assert x_dim < test_images.shape[3], "x_dim must be less than " + str(test_images.shape[3] - 1)
assert y_dim < test_images.shape[2], "y_dim must be less than " + str(test_images.shape[2] - 1)

# Plot the image and its corresponding attention map
fig, axes = plt.subplots(1, 2, figsize=(6, 3))

# Plot the original image
img_out = test_images[img_idx]
img_out = (img_out - img_out.min())/(img_out.max() - img_out.min())
axes[0].imshow(img_out.permute(1, 2, 0).cpu().numpy())
axes[0].set_title("Original Image")
axes[0].axis('off')
axes[0].scatter(x_dim, y_dim, color='red', marker='x')

# Plot the attention map
axes[1].imshow(att_map[img_idx, x_dim * y_dim].reshape(32, 32).cpu().numpy(), cmap='viridis')
axes[1].set_title("Attention Map")
axes[1].axis('off')

plt.show()