<a href="https://colab.research.google.com/github/saarthag/Qualcomm-AV-DL-Hackathon/blob/main/model_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [125]:
!git clone https://github.com/saarthag/Qualcomm-AV-DL-Hackathon.git

fatal: destination path 'Qualcomm-AV-DL-Hackathon' already exists and is not an empty directory.


## Imports

In [126]:
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision as tv
from torchvision.io import decode_image
from torchvision.transforms import v2
from torchvision import models, tv_tensors

import os
import pandas as pd
from sklearn.model_selection import train_test_split
import time
from tempfile import TemporaryDirectory

## Data loading and pre-processing

In [127]:
df = pd.read_csv('/content/Qualcomm-AV-DL-Hackathon/train.csv')
# Split the [raw] data into training and validation splits
train_df, validation_df = train_test_split(df, test_size=0.3, random_state=42)

Custom dataset and dataloader to fetch and feed the data downstream

In [128]:
class EmergencyVehiclesDataset(Dataset):
  def __init__(self, annotations_df: pd.DataFrame, img_dir: str,
               transform=None):
    self.img_labels = annotations_df
    self.img_dir = img_dir
    self.transform = transform

  def __len__(self):
    return len(self.img_labels)

  def __getitem__(self, idx):
      img_path = os.path.join(self.img_dir, self.img_labels.iat[idx, 0])
      image = decode_image(img_path)
      label = self.img_labels.iat[idx, 1]

      if self.transform:
        image = self.transform(image)

      return tv_tensors.Image(image), label

In [129]:
# Convert an image to float32 (from uint8), and normalize it based on ImageNet's mean and stddev
transform = v2.Compose([
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [130]:
# Helper function to create PyTorch DataLoader from a given DataFrame
def get_loader(annotations_df: pd.DataFrame, batch_size: int=4, shuffle: bool=True):
  dataset = EmergencyVehiclesDataset(
      annotations_df=annotations_df,
      img_dir='/content/Qualcomm-AV-DL-Hackathon/images/',
      transform=transform)

  return DataLoader(dataset, batch_size, shuffle), len(dataset)

In [131]:
train_loader, train_set_size = get_loader(train_df)
validation_loader, validation_set_size = get_loader(validation_df)

Get the device - CUDA/CPU

In [132]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [133]:
dataloaders = {'train': train_loader, 'val': validation_loader}
dataset_sizes = {'train': train_set_size, 'val': validation_set_size}

## Configuring the base model for fine tuning
Here, we use ResNet18 as the said base model

In [134]:
resnet18_base_model = models.resnet18(weights='IMAGENET1K_V1')

# Reset the last fully connected layer of the model
resnet18_base_model.fc = nn.Linear(resnet18_base_model.fc.in_features, 2)

# Move the model to the selected device
resnet18_base_model = resnet18_base_model.to(device)

# Use Cross-Entropy as the loss function, and SGD optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet18_base_model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by 0.1 every 5 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

## Base model fine-tuning (training)

In [135]:
def train_model(model: nn.Module, criterion, optimizer, scheduler, num_epochs=25):
    start_ts = time.time()

    # Create a temporary directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        print("Model checkpoints directory:", tempdir)

        best_checkpt_path = os.path.join(tempdir, 'best_checkpt.pt')
        torch.save(model.state_dict(), best_checkpt_path)

        # Best accuracy as produced by the model, across all epochs
        best_acc = 0.

        for epoch in range(num_epochs):
            print(f'EPOCH {epoch+1}/{num_epochs}')

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                tot_loss = 0.
                tot_hits = 0

                # Iterate over data
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # Zero the parameter gradients
                    optimizer.zero_grad()

                    # Forward propagation
                    # Gradients are only calculated in the training phase
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        # torch.max returns the maximum probability across all classes
                        # and the index at which it occurs, for all the samples in a batch
                        # Since we are only interested in the class index, we store it
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # Backprop should occur only in the training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    tot_loss += loss.item() * inputs.size(0)
                    # Total hits are all correct predictions by the model for this batch
                    tot_hits += torch.sum(preds == labels.data)

                # Adjust the learning rate
                if phase == 'train':
                    scheduler.step()

                epoch_loss = tot_loss / dataset_sizes[phase]
                epoch_acc = tot_hits.double() / dataset_sizes[phase]

                print(f'{phase} Loss: {epoch_loss:.5f} Acc: {epoch_acc:.5f}')

                # Save the model if it outperforms its [own] best performing epoch so far
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_checkpt_path)

            print()

        time_elapsed = time.time() - start_ts
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best validation accuracy: {best_acc:4f}')

        # Load best checkpoint
        model.load_state_dict(torch.load(best_checkpt_path, weights_only=True))
    return model

In [None]:
# Start training
resnet18_base_model = train_model(resnet18_base_model, criterion, optimizer, exp_lr_scheduler,
                       num_epochs=25)

## Inference

We load a model saved previously for inference, since the training and inference may occur independently

In [None]:
# Uncomment the following if you wish to load a different checkpoint file

# resnet18_base_model.load_state_dict(torch.load('/content/best_checkpt.pt',
#                                     weights_only=True, map_location=device))

In [None]:
# Load the test data
test_df = pd.read_csv('/content/Qualcomm-AV-DL-Hackathon/test_If1BZq3.csv')

In [None]:
# TODO: Can be optimized??
def predict_for_model(model: nn.Module):
  def predict(img_id):
    img_path=f'/content/Qualcomm-AV-DL-Hackathon/images/{img_id}'
    img = tv_tensors.Image(decode_image(img_path))
    img = transform(img)

    with torch.no_grad():
      outputs = model(img.unsqueeze(0))
      _, preds = torch.max(outputs, 1)
      return preds[0].item()

  return predict

In [None]:
predict_fn = predict_for_model(resnet18_base_model)
test_df['emergency_or_not']=test_df['image_names'].map(
    predict_for_model(resnet18_base_model))

Save inferences on test data

In [None]:
test_df.to_csv('/content/test_If1BZq3.csv', index=False)