In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torchvision.datasets.utils import download_url
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as T
from torchvision.utils import make_grid
from PIL import Image
import random
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

Loading the Dataset

In [None]:
data_dir = './100-bird-species'
paths_df = pd.read_csv(os.path.join(data_dir, "birds.csv"))
# The dataset as of 9-Jan-2023 contains a file that is 
# present at the given index whose dimension is not 224x224. Removing the file to avoid unnecessary complexity in the code
paths_df.drop(40464, axis= 0, inplace=True)
paths_df.head()

In [None]:
paths_df.info()

## Preparing the Data

Collecting the labels and mapping the labels to the birds name

In [None]:
# unique() function finds the unique elements
#of an array and returns these unique elements as a sorted array.
labels = paths_df["class id"].unique()
bird_name_map = {int(i): paths_df[paths_df["class id"] == i]["labels"].values[0] for i in labels}

In [None]:
print(labels)
print(bird_name_map)

Plots 20 different random images from the dataset every time it runs from any of the 450 classes

In [None]:
classes = os.listdir(data_dir + "/train")

def show_images(dataset='train'):
    # Parameters for our graph; we'll output images in a 5x4 configuration
    nrows = 5
    ncols = 4

    fig = plt.gcf()
    fig.set_size_inches(ncols * 5, nrows * 5)
    for i in range(20):
        name=random.choice(classes)
        next_pix=(os.path.join(data_dir,dataset,name))
        img = mpimg.imread(os.path.join(next_pix,random.choice(os.listdir(next_pix))))
        # Set up subplot; subplot indices start at 1
        sp = plt.subplot(nrows, ncols, i + 1)
        sp.axis('Off') # Don't show axes (or gridlines)
        plt.imshow(img)
        plt.title(name)

## Display Images from Train, Test, and Validation Sets

In [None]:
show_images('train')

In [None]:
show_images('test')

In [None]:
show_images('valid')

## Image Normalization

In [None]:
# %% Define Function to Calculate Mean and Standard Deviation
def get_mean_and_std(dataloader):
    channels_sum, channels_squared_sum, num_batches = 0, 0, 0
    for data, _ in dataloader:
        # Mean over batch, height and width, but not over the channels
        channels_sum += torch.mean(data, dim=[0,2,3])
        channels_squared_sum += torch.mean(data**2, dim=[0,2,3])
        num_batches += 1
    
    mean = channels_sum / num_batches

    # std = sqrt(E[X^2] - (E[X])^2)
    std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5
    print(channels_sum,channels_squared_sum,num_batches)
    return mean, std

In [None]:
# %% Calculate Dataset Mean and Std
dataset = ImageFolder(data_dir+'/valid', transform=T.ToTensor())
dataloader = DataLoader(dataset, batch_size=400)
print(get_mean_and_std(dataloader))

## Data Augmentations

In [None]:
# %% Define Transformations
bird_stats = ([0.4758, 0.4685, 0.3870], [0.2376, 0.2282, 0.2475])

train_tfms = T.Compose([
    T.RandomCrop(224, padding=4, padding_mode='reflect'),
#     T.RandomResizedCrop(256, scale=(0.5,0.9), ratio=(1, 1)), 
     T.RandomApply(torch.nn.ModuleList([T.GaussianBlur(kernel_size=3,sigma=(0.2, 5))]),p=0.15),
    T.RandomHorizontalFlip(), 
    T.RandomRotation(10),
    T.ToTensor(), 
    T.Normalize(*bird_stats,inplace=True), 
])

valid_tfms = T.Compose([
    T.Resize(224), 
    T.ToTensor(), 
    T.Normalize(*bird_stats,inplace=True)
])

## Custom Dataset Class for Bird Images

In [None]:
# Define the data directory (change this to the correct path)
data_dir = "./100-bird-species"

# Define Custom Dataset Class
class BirdDataset(torch.utils.data.Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Get the original path from the DataFrame
        img_path = self.df.iloc[idx]['filepaths']
        #print(f"[DEBUG] Original img_path from DataFrame: {img_path}")

        # Determine the dataset type and prepend "train/", "test/", or "valid/"
        if 'train' in img_path:
            trimmed_img_path = os.path.join('train', img_path.split('train/')[-1])
        elif 'test' in img_path:
            trimmed_img_path = os.path.join('test', img_path.split('test/')[-1])
        elif 'valid' in img_path:
            trimmed_img_path = os.path.join('valid', img_path.split('valid/')[-1])
        else:
            trimmed_img_path = img_path  # Default in case no match, but ideally, this shouldn't happen

        #print(f"[DEBUG] Trimmed img_path: {trimmed_img_path}")

        # Construct the full path with data_dir
        full_img_path = os.path.join(data_dir, trimmed_img_path)
        #print(f"[DEBUG] Full img_path after joining with data_dir: {full_img_path}")

        # Attempt to open the image and handle potential errors
        try:
            img = Image.open(full_img_path).convert("RGB")
            #print(f"[DEBUG] Successfully opened image: {full_img_path}")
        except FileNotFoundError:
            #print(f"[ERROR] File not found: {full_img_path}")
            raise

        if self.transform:
            img = self.transform(img)

        label = torch.tensor(self.df.iloc[idx]['class id'], dtype=torch.long)
        #print(f"[DEBUG] Image label: {label}")
        return img, label

# Splitting DataFrame into train, test, and validation sets
paths_df['data set'] = paths_df['filepaths'].apply(
    lambda x: 'train' if 'train' in x else ('test' if 'test' in x else 'valid')
)
#print(f"[DEBUG] DataFrame after adding 'data set' column:\n{paths_df.head()}")

train_df = paths_df[paths_df['data set'] == 'train']
test_df = paths_df[paths_df['data set'] == 'test']
val_df = paths_df[paths_df['data set'] == 'valid']

# Create Datasets and DataLoaders
train_dataset = BirdDataset(train_df, train_tfms)
test_dataset = BirdDataset(test_df, valid_tfms)
val_dataset = BirdDataset(val_df, valid_tfms)

train_dl = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
test_dl = DataLoader(test_dataset, batch_size=256, shuffle=True, num_workers=3)
val_dl = DataLoader(val_dataset, batch_size=70, shuffle=True, num_workers=2)

# Debug print for DataLoader lengths
#print(f"[DEBUG] Number of batches in train_dl: {len(train_dl)}")
#print(f"[DEBUG] Number of batches in test_dl: {len(test_dl)}")
#print(f"[DEBUG] Number of batches in val_dl: {len(val_dl)}")


In [None]:
class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        acc = accuracy(out, labels)  
        return loss,acc
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, train_acc: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}, last_lr: {:.5f}".format(
            epoch+1, result['train_loss'], result['train_accuracy'], result['val_loss'], result['val_acc'], result['lrs'][-1]))

## Model Definition: ResNet

In [None]:
def conv_block(in_channels, out_channels, activation=False, pool=False):
    layers = [nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1), 
              nn.BatchNorm2d(out_channels)]
    if activation: layers.append(nn.ReLU(inplace=True))
    if pool: layers.append(nn.MaxPool2d(2))
    return nn.Sequential(*layers)

class ResNet34(ImageClassificationBase):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels, 64, kernel_size=7, stride=1, padding=4),
            nn.BatchNorm2d(64),nn.MaxPool2d(2),nn.ReLU(inplace=True))
           
        self.res1 = nn.Sequential(conv_block(64, 64,activation=True), conv_block(64, 64))
        self.res2 = nn.Sequential(conv_block(64, 64,activation=True), conv_block(64, 64))
        self.res3 = nn.Sequential(conv_block(64, 64,activation=True), conv_block(64, 64))
        self.downsample1=nn.Sequential(conv_block(64, 128,pool=True)) 
        self.res4 = nn.Sequential(conv_block(64, 128,activation=True, pool=True),
                                  conv_block(128,128))
        self.res5 = nn.Sequential(conv_block(128, 128,activation=True), conv_block(128, 128))
        self.res6 = nn.Sequential(conv_block(128, 128,activation=True), conv_block(128, 128))
        self.res7 = nn.Sequential(conv_block(128, 128,activation=True), conv_block(128, 128))
        self.res8 = nn.Sequential(conv_block(128, 256,activation=True, pool=True),
                                  conv_block(256,256))
        self.downsample2 = nn.Sequential(conv_block(128, 256,pool=True))
        self.res9 = nn.Sequential(conv_block(256, 256,activation=True), conv_block(256, 256))
        self.res10 = nn.Sequential(conv_block(256, 256,activation=True), conv_block(256, 256))
        self.res11 = nn.Sequential(conv_block(256, 256,activation=True), conv_block(256, 256))
        self.res12 = nn.Sequential(conv_block(256, 256,activation=True), conv_block(256, 256))
        self.res13 = nn.Sequential(conv_block(256, 256,activation=True), conv_block(256, 256))
        self.res14 = nn.Sequential(conv_block(256, 512,activation=True, pool=True),
                                   conv_block(512,512))
        
        self.downsample3 = nn.Sequential(conv_block(256, 512,pool=True))
        self.res15 = nn.Sequential(conv_block(512, 512,activation=True), conv_block(512, 512))
        self.res16 = nn.Sequential(conv_block(512, 512,activation=True), conv_block(512, 512,activation=True))

        self.classifier = nn.Sequential(nn.AdaptiveMaxPool2d((1,1)), 
                                        nn.Flatten(), 
                                        nn.Dropout(0.17),
                                        nn.Linear(512, num_classes))
        self.apply(self.init_weights)

    def init_weights(self,m):
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
        
    def forward(self, xb):
        out = self.conv1(xb)
        out = self.res1(out) + out
        out = self.res2(out) + out
        out = self.res3(out) + out
        out = self.downsample1(out) +self.res4(out)
        out = self.res5(out) + out
        out = self.res6(out) + out
        out = self.res7(out) + out
        out = self.downsample2(out) +self.res8(out)
        out = self.res9(out) + out
        out = self.res10(out) + out
        out = self.res11(out) + out
        out = self.res12(out) + out
        out = self.res13(out) + out
        out = self.downsample3(out) + self.res14(out) 
        out = self.res15(out) + out
        out = self.res16(out) + out
        out = self.classifier(out)
        return out

### Checking the Device available
### moving the model and dataset to that device

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

### Checking the Device available.

In [None]:
device = get_default_device()
device

In [None]:
# Define the optimizer and loss function
model = ResNet34(3, num_classes=len(labels)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## Training Loop

In [None]:
# %% Define Training Function
train_dl = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=0)
print("[DEBUG] DataLoader created with batch size 64 and num_workers=0")

def train_model(num_epochs, save_path="bird_classification_model.pth"):
    print(f"[INFO] Starting training for {num_epochs} epochs...")
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0  # Track cumulative loss for the epoch
        print(f"[DEBUG] Epoch {epoch+1} started")

        for batch_idx, (images, labels) in enumerate(train_dl):
            print(f"[DEBUG] Processing batch {batch_idx+1}")
            print(f"[DEBUG] Batch {batch_idx+1} - images shape: {images.shape}, labels shape: {labels.shape}")
            
            # Move data to the appropriate device
            images, labels = images.to(device), labels.to(device)
            print(f"[DEBUG] Batch {batch_idx+1} - images and labels moved to device: {device}")

            # Forward pass
            outputs = model(images)
            print(f"[DEBUG] Batch {batch_idx+1} - outputs shape: {outputs.shape}")

            # Calculate loss
            loss = F.cross_entropy(outputs, labels)
            epoch_loss += loss.item()
            print(f"[DEBUG] Batch {batch_idx+1} - Loss: {loss.item()}")

            # Backward pass and optimization step
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            print(f"[DEBUG] Batch {batch_idx+1} - Optimizer step completed and gradients zeroed")

        # Average loss for the epoch
        avg_loss = epoch_loss / len(train_dl)
        print(f"[INFO] Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_loss}")

    # Save the trained model
    torch.save(model.state_dict(), save_path)
    print(f"[INFO] Model saved to {save_path}")


In [None]:
# Training the model and saving it
train_model(10, save_path="bird_classification_model.pth")

## Evaluation

In [None]:
def evaluate(model, val_dl):
    model.eval()
    with torch.no_grad():
        val_loss, val_acc = 0, 0
        for images, labels in val_dl:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = F.cross_entropy(outputs, labels)
            val_loss += loss.item()
            val_acc += accuracy(outputs, labels).item()
        return val_loss / len(val_dl), val_acc / len(val_dl)

# Evaluate on validation data
val_loss, val_acc = evaluate(model, val_dl)
print(f'Validation Loss: {val_loss}, Validation Accuracy: {val_acc}')