<a href="https://colab.research.google.com/github/jmand626/FGVCAircraft-TransferModelClassifer/blob/main/TransferClassification_fgvcaircraft.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# This is the first cell, where image_path is defined
from pathlib import Path

# Define the path to the data folder
data_path = Path("/content/drive/MyDrive/data/")
image_path = data_path / "fgvc_aircraft"

In [2]:
import os
import sys
import zipfile
import requests

# 1️ Mount Google Drive (if using for storage)
use_gdrive = True  # Set to True if dataset is stored in Google Drive
if use_gdrive:
    from google.colab import drive
    drive.mount('/content/drive')

# 2️ Clone your GitHub repo if it's not already present
repo_url = "https://github.com/jmand626/PyTorchMLEngine-Custom-Dataset-Project.git"
repo_name = "PyTorchMLEngine-Custom-Dataset-Project"

if not os.path.exists(repo_name):
    print(f"Cloning {repo_url}...")
    !git clone {repo_url}
else:
    print(f"Repository {repo_name} already exists.")

# 3️ Change to repo directory ONLY ONCE
os.chdir(repo_name) # This line sets the working directory

# 4️ Add project files to sys.path so imports work
sys.path.append(os.getcwd())
print("Project directory added to sys.path")

# 5️ Ensure necessary dependencies are installed
try:
    import torchinfo
except ImportError:
    print("Installing torchinfo...")
    !pip install -q torchinfo

# 6️ Download FGVC Aircraft dataset if missing
dataset_url = "https://www.robots.ox.ac.uk/~vgg/data/fgvc-aircraft/archives/fgvc-aircraft-2013b.tar.gz"
dataset_tar = data_path / "fgvc-aircraft-2013b.tar.gz"
dataset_folder = data_path / "fgvc-aircraft-2013b"
# Define a file within the extracted dataset to check for existence
check_file = dataset_folder / "data/images/0034309.jpg"

if check_file.exists():
    print("Dataset already exists.")
else:
    print("Downloading FGVC Aircraft dataset...")
    data_path.mkdir(parents=True, exist_ok=True)
    response = requests.get(dataset_url, stream=True)
    with open(dataset_tar, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    print("Extracting dataset...")
    !tar -xzf {dataset_tar} -C {data_path}
    os.remove(dataset_tar)
    print("Dataset extraction complete.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Repository PyTorchMLEngine-Custom-Dataset-Project already exists.
Project directory added to sys.path
Dataset already exists.


In [3]:
# For this notebook to run with updated APIs, we need torch 1.12+ and torchvision 0.13+
try:
    import torch
    import torchvision
    assert int(torch.__version__.split(".")[1]) >= 12, "torch version should be 1.12+"
    assert int(torchvision.__version__.split(".")[1]) >= 13, "torchvision version should be 0.13+"
    print(f"torch version: {torch.__version__}")
    print(f"torchvision version: {torchvision.__version__}")
except:
    print(f"[INFO] torch/torchvision versions not as required, installing nightly versions.")
    !pip3 install -U torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu113
    import torch
    import torchvision
    print(f"torch version: {torch.__version__}")
    print(f"torchvision version: {torchvision.__version__}")

[INFO] torch/torchvision versions not as required, installing nightly versions.
Looking in indexes: https://pypi.org/simple, https://download.pytorch.org/whl/cu113
torch version: 2.9.0+cu128
torchvision version: 0.24.0+cu128


In [4]:
!ls

computer_vision_test_main.py  model_backbone.py  setup_dataholders.py
create_custom_dataset.py      __pycache__
firsttry_model.py	      README.md


In [5]:
# Setup device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

Now hopefully we can continously use the previous setup code whenever we want to use this dataset again.

In [6]:
!ls /content/drive/MyDrive/data/fgvc-aircraft-2013b/data

families.txt			  images_manufacturer_val.txt
images				  images_test.txt
images_box.txt			  images_train.txt
images_family_test.txt		  images_val.txt
images_family_train.txt		  images_variant_test.txt
images_family_trainval.txt	  images_variant_train.txt
images_family_val.txt		  images_variant_trainval.txt
images_manufacturer_test.txt	  images_variant_val.txt
images_manufacturer_train.txt	  manufacturers.txt
images_manufacturer_trainval.txt  variants.txt


In [7]:
# ipython-input-12-e84a44c78b2d
# Assume the dataset is extracted to 'data/fgvc-aircraft-2013b'
# and images are in 'data/fgvc-aircraft-2013b/data/images'
from pathlib import Path
import os

# Fix: Update paths to include the subfolder where the dataset was downloaded and extracted
# Use os.path.join to create platform-independent paths
# The issue was train_dir and test_dir were pointing to the wrong location.
# They should point to the parent directory containing the class folders.
# After reorganizing, the class folders will be inside the 'images' directory.

# Corrected paths to point to the dataset location on Google Drive
# Based on the dataset download path in cell GzSakdlhzbLN
data_path = Path("/content/drive/MyDrive/data/") # This variable is defined in the first cell
dataset_folder = data_path / "fgvc-aircraft-2013b"

train_dir = dataset_folder / "data" / "images" # Corrected path
test_dir = dataset_folder / "data" / "images"  # Corrected path, assuming test images are in the same location


# Print the resolved paths to verify they are correct
print("Train directory:", train_dir)
print("Test directory:", test_dir)

Train directory: /content/drive/MyDrive/data/fgvc-aircraft-2013b/data/images
Test directory: /content/drive/MyDrive/data/fgvc-aircraft-2013b/data/images


Now we continue on to creating our datasets and dataloaders. An important issue is that we have to ensure that the data that we feed into our pretrained model must be formatted in the same way as the data inputted when training the model (as that helps performance immeasurably). There is a certain way that all models from torchvision.models require, and we will do that.

It is detailed in this page: https://docs.pytorch.org/vision/0.8/models.html

In [8]:
import torchvision.transforms as transforms
import importlib
import setup_dataholders
importlib.reload(setup_dataholders)
manual_transforms = transforms.Compose([
    transforms.Resize((224, 224)), # 1. Reshape all images to 224x224 (though some models may require different sizes)
    transforms.ToTensor(), # 2. Turn image values to between 0 & 1
    transforms.Normalize(mean=[0.485, 0.456, 0.406], # 3. A mean of [0.485, 0.456, 0.406] (across each color channel)
                         std=[0.229, 0.224, 0.225]) # 4. A standard deviation of [0.229, 0.224, 0.225] (across each color channel),
])

In [9]:
import os
from pathlib import Path
import shutil

# Define the path to the data folder on Google Drive
data_path = Path("/content/drive/MyDrive/data/")
dataset_folder = data_path / "fgvc-aircraft-2013b"
images_base_dir = dataset_folder / "data" / "images" # Base directory where images are now in class subfolders

# Define paths to the train and test mapping files
train_mapping_file = dataset_folder / "data" / "images_variant_train.txt"
test_mapping_file = dataset_folder / "data" / "images_variant_test.txt"
variants_file = dataset_folder / "data" / "variants.txt" # File containing all class names

# Define the target root directories for the train and test splits
train_root_dir = dataset_folder / "train"
test_root_dir = dataset_folder / "test"

# Function to check if the directories are populated
def is_populated(directory):
    if not directory.exists():
        return False
    # Check if there are any subdirectories (which represent classes)
    if not any(directory.iterdir()):
        return False
    # You could add a more robust check here, like checking if a certain number of
    # class directories exist or if a specific file exists in a class directory.
    return True

# Check if directories are already populated
if is_populated(train_root_dir) and is_populated(test_root_dir):
    print("Train and test directories are already populated. Skipping population.")
else:
    print("Train and test directories are not populated or incomplete. Populating now...")

    # Ensure the target root directories exist and are empty or can be overwritten if not populated
    for dir_path in [train_root_dir, test_root_dir]:
        if dir_path.exists() and not is_populated(dir_path):
            print(f"Removing incomplete existing directory: {dir_path}")
            shutil.rmtree(dir_path)
        dir_path.mkdir(parents=True, exist_ok=True)

    # Get the full list of class names from variants.txt
    all_class_names = []
    with open(variants_file, "r") as f:
        for line in f:
            all_class_names.append(line.strip())
    all_class_names = sorted(list(set(all_class_names))) # Remove duplicates and sort

    print(f"Found {len(all_class_names)} unique class names from variants.txt.")

    # Create all class subdirectories within the train and test root directories
    print("Creating all class subdirectories in train and test folders...")
    for class_name in all_class_names:
        sanitized_class_folder_name = class_name.replace(" ", "_").replace("/", "_").replace("-", "_")
        (train_root_dir / sanitized_class_folder_name).mkdir(parents=True, exist_ok=True)
        (test_root_dir / sanitized_class_folder_name).mkdir(parents=True, exist_ok=True)
    print("Class subdirectories created.")


    # Function to read mapping files and get image ID to label mapping
    def read_mapping_file(filepath):
        image_id_to_class = {}
        with open(filepath, "r") as f:
            for line in f:
                parts = line.strip().split(" ", 1)
                if len(parts) == 2:
                    image_id, label = parts
                    image_id_to_class[image_id] = label
                else:
                     print(f"Warning: Skipping line in {filepath.name} with unexpected format: {line.strip()}")
        return image_id_to_class

    # Read mappings for train and test sets
    train_image_id_to_class = read_mapping_file(train_mapping_file)
    test_image_id_to_class = read_mapping_file(test_mapping_file)

    print(f"Found mappings for {len(train_image_id_to_class)} training images.")
    print(f"Found mappings for {len(test_image_id_to_class)} testing images.")

    # Function to copy images for a given split based on mappings
    def populate_split_directories(image_id_to_class_mapping, target_root_dir, images_base_dir):
        copied_count = 0
        source_not_found_count = 0
        target_folder_not_found_count = 0

        print(f"Populating {target_root_dir} with images...")

        for image_id, class_label in image_id_to_class_mapping.items():
            image_name = f"{image_id}.jpg"
            sanitized_class_folder_name = class_label.replace(" ", "_").replace("/", "_").replace("-", "_")

            # Source path: Look for the image within the existing class subfolders in images_base_dir
            source_image_path = images_base_dir / sanitized_class_folder_name / image_name

            # Target path: The location in the new train/test class folder (already created)
            target_class_folder = target_root_dir / sanitized_class_folder_name
            target_image_path = target_class_folder / image_name

            if source_image_path.exists():
                if target_class_folder.exists(): # Ensure target class folder was created
                    if not target_image_path.exists():
                        try:
                            shutil.copy(str(source_image_path), str(target_image_path))
                            copied_count += 1
                        except Exception as e:
                            print(f"Error copying file {source_image_path} to {target_class_folder}: {e}")
                else:
                    print(f"Error: Target class folder not found for {class_label}: {target_class_folder}")
                    target_folder_not_found_count += 1
            else:
                print(f"Warning: Source image file not found: {source_image_path}")
                source_not_found_count += 1

        print(f"Finished populating {target_root_dir}. Copied {copied_count} images. {source_not_found_count} source images not found. {target_folder_not_found_count} target class folders not found.")


    # Populate train and test directories
    populate_split_directories(train_image_id_to_class, train_root_dir, images_base_dir)
    populate_split_directories(test_image_id_to_class, test_root_dir, images_base_dir)


    print("Dataset reorganization and population complete.")

Train and test directories are already populated. Skipping population.


In [10]:
train_dir = "/content/drive/MyDrive/data/fgvc-aircraft-2013b/train"
test_dir = "/content/drive/MyDrive/data/fgvc-aircraft-2013b/test"

In [11]:
# Create training and testing DataLoaders as well as get a list of class names
train_dataloader, test_dataloader, class_names = setup_dataholders.create_dataloaders(train_directory=train_dir,
                                                                               test_directory=test_dir,
                                                                               data_transforms=manual_transforms, # resize, convert images to between 0 & 1 and normalize them
                                                                               batch_size=32, # set mini-batch size to 32
                                                                               workers=4)

train_dataloader, test_dataloader, class_names



(<torch.utils.data.dataloader.DataLoader at 0x7b609d365c70>,
 <torch.utils.data.dataloader.DataLoader at 0x7b60a7527ef0>,
 ['707_320',
  '727_200',
  '737_200',
  '737_300',
  '737_400',
  '737_500',
  '737_600',
  '737_700',
  '737_800',
  '737_900',
  '747_100',
  '747_200',
  '747_300',
  '747_400',
  '757_200',
  '757_300',
  '767_200',
  '767_300',
  '767_400',
  '777_200',
  '777_300',
  'A300B4',
  'A310',
  'A318',
  'A319',
  'A320',
  'A321',
  'A330_200',
  'A330_300',
  'A340_200',
  'A340_300',
  'A340_500',
  'A340_600',
  'A380',
  'ATR_42',
  'ATR_72',
  'An_12',
  'BAE_125',
  'BAE_146_200',
  'BAE_146_300',
  'Beechcraft_1900',
  'Boeing_717',
  'CRJ_200',
  'CRJ_700',
  'CRJ_900',
  'C_130',
  'C_47',
  'Cessna_172',
  'Cessna_208',
  'Cessna_525',
  'Cessna_560',
  'Challenger_600',
  'DC_10',
  'DC_3',
  'DC_6',
  'DC_8',
  'DC_9_30',
  'DHC_1',
  'DHC_6',
  'DHC_8_100',
  'DHC_8_300',
  'DH_82',
  'DR_400',
  'Dornier_328',
  'EMB_120',
  'ERJ_135',
  'ERJ_145',
 

The next cells focus on the actual "transfer" part of taking a model from someplace else and using it for better performance. I could have done the transforms from the previous cells in a different way that is more automatic, but I wished to explore the more manual original way first.

Now onto taking a different model and applying it here! We will use the [EfficentNet_v2_m](https://docs.pytorch.org/vision/stable/models/generated/torchvision.models.efficientnet_v2_m.html#torchvision.models.efficientnet_v2_m) model.

In [12]:
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
model = torchvision.models.efficientnet_b0(weights=weights).to(device)

# Take the precalculated features, avgpool calculation, and the classifier from the transfered model and apply it to our problem!

Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth


100%|██████████| 20.5M/20.5M [00:00<00:00, 178MB/s]


In [13]:
# Print a summary using torchinfo
from torchinfo import summary

summary(model=model,
        input_size=(32, 3, 480, 480), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                                  [32, 3, 480, 480]    [32, 1000]           --                   True
├─Sequential (features)                                      [32, 3, 480, 480]    [32, 1280, 15, 15]   --                   True
│    └─Conv2dNormActivation (0)                              [32, 3, 480, 480]    [32, 32, 240, 240]   --                   True
│    │    └─Conv2d (0)                                       [32, 3, 480, 480]    [32, 32, 240, 240]   864                  True
│    │    └─BatchNorm2d (1)                                  [32, 32, 240, 240]   [32, 32, 240, 240]   64                   True
│    │    └─SiLU (2)                                         [32, 32, 240, 240]   [32, 32, 240, 240]   --                   --
│    └─Sequential (1)                                        [32, 32, 240, 240]   [32, 16, 240

Now we want to freeze some layers of this model because that means we do not have to train them again!

In [15]:
for param in model.features.parameters():
    param.requires_grad = False

Update our model to now use 102 classes instead, since our dataset only had 102 variants

In [16]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# From before
output_shape = len(class_names)
# Recreate the classifier layer and seed it to the target device
model.classifier = torch.nn.Sequential(
    torch.nn.Dropout(p=0.2, inplace=True),
    torch.nn.Linear(in_features=1280,
                    out_features=output_shape, # same number of output units as our number of classes
                    bias=True)).to(device)

In [17]:
summary(model,
        input_size=(32, 3, 224, 224), # make sure this is "input_size", not "input_shape" (batch_size, color_channels, height, width)
        verbose=0,
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                                  [32, 3, 224, 224]    [32, 100]            --                   Partial
├─Sequential (features)                                      [32, 3, 224, 224]    [32, 1280, 7, 7]     --                   False
│    └─Conv2dNormActivation (0)                              [32, 3, 224, 224]    [32, 32, 112, 112]   --                   False
│    │    └─Conv2d (0)                                       [32, 3, 224, 224]    [32, 32, 112, 112]   (864)                False
│    │    └─BatchNorm2d (1)                                  [32, 32, 112, 112]   [32, 32, 112, 112]   (64)                 False
│    │    └─SiLU (2)                                         [32, 32, 112, 112]   [32, 32, 112, 112]   --                   --
│    └─Sequential (1)                                        [32, 32, 112, 112]   [32, 

In [19]:
import torch.nn as nn

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

from timeit import default_timer as timer
start_time = timer()

# Setup training and save the results
results = model_backbone.train(model=model,
                       train_dataloader=train_dataloader,
                       test_dataloader=test_dataloader,
                       optimizer=optimizer,
                       loss_fn=loss_fn,
                       epochs=5,
                       device=device)

# End the timer and print out how long it took
end_time = timer()
print(f"[INFO] Total training time: {end_time-start_time:.3f} seconds")