In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
from PIL import Image
from tempfile import TemporaryDirectory
import glob
import copy

In [None]:
# Cell 1: data setup + super-charged augmentations

import os
import torch
import torch.backends.cudnn as cudnn
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import transforms, datasets

cudnn.benchmark = True
plt.ion()   # interactive mode

# Custom resize + padding to fixed portrait shape
class ResizePad:
    def __init__(self, size, fill_mean=(0.485, 0.456, 0.406)):
        self.W, self.H = size
        r, g, b = (int(c*255) for c in fill_mean)
        self.fill = (r, g, b)

    def __call__(self, img: Image.Image) -> Image.Image:
        w, h = img.size
        scale = min(self.W / w, self.H / h)
        new_w, new_h = int(w * scale), int(h * scale)
        img_resized = img.resize((new_w, new_h), Image.BICUBIC)
        background = Image.new("RGB", (self.W, self.H), self.fill)
        paste_x = (self.W - new_w) // 2
        paste_y = (self.H - new_h) // 2
        background.paste(img_resized, (paste_x, paste_y))
        return background

# Photometric (non-geometric) augmentations
photo_augs = [
    transforms.ColorJitter(
        brightness=0.2,
        contrast=0.2,
        saturation=0.2,
        hue=0.1
    ),
    transforms.RandomGrayscale(p=0.1),
    transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0)),
]

# 1) Super-charged train transforms
data_transforms = {
    'train': transforms.Compose([
        ResizePad((224, 400)),
        *photo_augs,
        transforms.RandomAffine(
            degrees=5,
            translate=(0.1, 0.1),
            scale=(0.8, 1.2),
            fill=(int(0.485*255), int(0.456*255), int(0.406*255))
        ),
        transforms.RandomPerspective(
            distortion_scale=0.2,
            p=0.5,
            fill=(int(0.485*255), int(0.456*255), int(0.406*255))
        ),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225]),
    ]),
    'val': transforms.Compose([
        ResizePad((224, 400)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225]),
    ]),
    'test': transforms.Compose([
        ResizePad((224, 400)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225]),
    ]),
}

data_dir = '8apr_dataset_split'
image_datasets = {
    x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
    for x in ['train', 'val', 'test']
}

dataloaders = {
    'train': torch.utils.data.DataLoader(
        image_datasets['train'], batch_size=16, shuffle=True, num_workers=6
    ),
    'val': torch.utils.data.DataLoader(
        image_datasets['val'],   batch_size=16, shuffle=True, num_workers=6
    ),
    'test': torch.utils.data.DataLoader(
        image_datasets['test'],  batch_size=16, shuffle=False, num_workers=6
    ),
}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes


In [None]:
# Use CUDA if available; otherwise fallback to CPU.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
print("Class-to-Index Mapping:")
print(image_datasets['train'].class_to_idx)


In [None]:
def imshow(inp, title=None):
    inp = inp.numpy().transpose((1, 2, 0))  # CHW -> HWC
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean  # unnormalize
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title:
        plt.title(title, fontsize=10)

    plt.pause(0.001)

# Display a batch of training data (optional)
inputs, classes_batch = next(iter(dataloaders['train']))
out = torchvision.utils.make_grid(inputs)
imshow(out, title=[class_names[x] for x in classes_batch])

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=100, save_dir='saved_models'):
    # Create the save directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    best_model_params_path = os.path.join(save_dir, 'best_model_params.pt')
    best_acc = 0.0
    since = time.time()
    
    # Save initial model (optional)
    torch.save(model.state_dict(), best_model_params_path)
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluation mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # Adjust scheduler after validation phase
            if phase == 'val':
                scheduler.step(epoch_loss)

            # Save the best model (based on validation accuracy)
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')

    # Load best model weights
    model.load_state_dict(torch.load(best_model_params_path))
    return model


In [None]:
def visualize_dataset(model, phase='val', num_images=6):
    model.eval()
    images_so_far = 0

    # Set rows and cols based on num_images
    cols = 3
    rows = (num_images + cols - 1) // cols

    plt.figure(figsize=(cols * 6, rows * 6))  # Bigger canvas (try 6 or even 7)


    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(rows, cols, images_so_far)
                ax.axis('off')

                pred_label = class_names[preds[j]]
                true_label = class_names[labels[j]]

                ax.set_title(f'Pred: {pred_label}\nTrue: {true_label}',
                             color='red' if pred_label != true_label else 'green')

                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    plt.tight_layout()
                    plt.show()
                    return

    plt.tight_layout()
    plt.show()


In [None]:
from torchvision.models import efficientnet_b0
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn as nn

# --- 1. Load and modify EfficientNet-B0 ---
model_ft = efficientnet_b0(weights='DEFAULT')

# The original classifier is [Dropout(p=0.2), Linear(in_features, out_features)]
# We’ll replace it with our own dropout + linear head:
num_ftrs = model_ft.classifier[1].in_features
model_ft.classifier = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(num_ftrs, len(class_names))
)
model_ft = model_ft.to(device)

# --- 2. Loss function ---
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# === PHASE 1: Train head only (backbone frozen) ===
print("=== PHASE 1: training head only ===")
# a) Freeze the backbone
for param in model_ft.features.parameters():
    param.requires_grad = False

# b) Optimizer for head only
optimizer_head = optim.Adam(
    filter(lambda p: p.requires_grad, model_ft.parameters()),
    lr=1e-3
)

# c) Scheduler for head (optional)
scheduler_head = lr_scheduler.ReduceLROnPlateau(
    optimizer_head, mode='min', patience=3, factor=0.5
)

# d) Train head for 23 epochs
model_ft = train_model(
    model=model_ft,
    criterion=criterion,
    optimizer=optimizer_head,
    scheduler=scheduler_head,
    num_epochs=44,
    save_dir='saved_models/head_only'
)


# === PHASE 2: Fine-tuning full model ===
print("\n=== PHASE 2: fine-tuning full model ===")
# a) Unfreeze all layers
for param in model_ft.parameters():
    param.requires_grad = True

# b) Optimizer over all params, with lower LR + weight decay
optimizer_full = optim.AdamW(
    model_ft.parameters(),
    lr=1e-4,
    weight_decay=1e-4
)

# c) Scheduler for full model
scheduler_full = lr_scheduler.ReduceLROnPlateau(
    optimizer_full, mode='min', patience=5, factor=0.5
)

# d) Fine-tune for 44 epochs
model_ft = train_model(
    model=model_ft,
    criterion=criterion,
    optimizer=optimizer_full,
    scheduler=scheduler_full,
    num_epochs=90,
    save_dir='saved_models/full_finetune'
)


# --- 6. Visualize results ---
visualize_dataset(model_ft, phase='val')


In [None]:
def visualize_test_images_from_folders(model, class_names, max_images=8):
    import os, glob, math
    from PIL import Image
    import matplotlib.pyplot as plt

    model.eval()

    # collect all test images
    image_paths = glob.glob('8apr_dataset_split/test/*/*.*')
    if not image_paths:
        print("No test images found.")
        return

    # only show up to max_images
    image_paths = image_paths[:max_images]

    images_per_row = 3
    rows = math.ceil(len(image_paths) / images_per_row)
    plt.figure(figsize=(images_per_row * 6, rows * 6))

    for idx, img_path in enumerate(image_paths):
        true_label = os.path.basename(os.path.dirname(img_path))
        img = Image.open(img_path).convert('RGB')

        # preprocess & predict
        input_tensor = data_transforms['val'](img).unsqueeze(0).to(device)
        with torch.no_grad():
            pred = model(input_tensor).argmax(1).item()
            predicted_label = class_names[pred]

        # plot
        ax = plt.subplot(rows, images_per_row, idx + 1)
        ax.imshow(img)
        ax.axis('off')
        color = 'green' if predicted_label == true_label else 'red'
        ax.set_title(f'Pred: {predicted_label}\nTrue: {true_label}', color=color)

    plt.tight_layout()
    plt.show()

# call it:
visualize_test_images_from_folders(model_ft, class_names, max_images=36)


In [None]:
import os
import glob
import math
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torchvision import models, transforms

# --------------------------------
# 1. Define angle classes (must match your training folders and labels)
class_to_angle = {
    0: -2.5,
    1: -5,
    2: 0,
    3: 2.5,
    4: 5,
}

# --------------------------------
# 2. Define the same transforms used during validation/testing
data_transforms = {
    'val': transforms.Compose([
        transforms.Resize((400, 224)),
        transforms.ToTensor(),
        transforms.Pad((0, 88)),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
}

# --------------------------------
# 3. Geometric crop utilities from your augmentation script
def largest_rotated_rect(w, h, angle_rad):
    if w <= 0 or h <= 0:
        return 0, 0
    width_is_longer = w >= h
    side_long, side_short = (w, h) if width_is_longer else (h, w)
    sin_a = abs(math.sin(angle_rad))
    cos_a = abs(math.cos(angle_rad))

    if side_short <= 2.0 * sin_a * cos_a * side_long or abs(sin_a - cos_a) < 1e-10:
        x = 0.5 * side_short
        if width_is_longer:
            wr = x / sin_a
            hr = x / cos_a
        else:
            wr = x / cos_a
            hr = x / sin_a
    else:
        cos_2a = cos_a ** 2 - sin_a ** 2
        wr = (w * cos_a - h * sin_a) / cos_2a
        hr = (h * cos_a - w * sin_a) / cos_2a

    return int(wr), int(hr)

def rotate_and_crop(img, angle):
    orig_w, orig_h = img.size
    rotated = img.rotate(angle, resample=Image.BICUBIC, expand=True)

    angle_rad = math.radians(angle)
    new_w, new_h = largest_rotated_rect(orig_w, orig_h, angle_rad)

    cx, cy = rotated.size[0] // 2, rotated.size[1] // 2
    left = cx - new_w // 2
    top = cy - new_h // 2
    right = cx + new_w // 2
    bottom = cy + new_h // 2
    cropped = rotated.crop((left, top, right, bottom))

    return cropped.resize((orig_w, orig_h), Image.LANCZOS)

# --------------------------------
# 4. Load the saved trained model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def load_trained_model():
    model_ft = models.mobilenet_v3_small(weights='DEFAULT')
    num_ftrs = model_ft.classifier[3].in_features

    # Re-create the exact head you trained: Dropout → Linear
    model_ft.classifier[3] = nn.Sequential(
        nn.Dropout(p=0.2, inplace=True),
        nn.Linear(num_ftrs, len(class_to_angle))
    )

    model_ft = model_ft.to(device)
    model_weights_path = os.path.join('saved_models', 'best_model_params.pt')
    model_ft.load_state_dict(torch.load(model_weights_path, map_location=device))
    model_ft.eval()
    return model_ft

# --------------------------------
# 5. Rotate, correct, save and visualize ALL images
def rotate_and_visualize(model, custom_data_dir, corrected_data_dir):
    os.makedirs(corrected_data_dir, exist_ok=True)
    image_paths = glob.glob(os.path.join(custom_data_dir, '*.*'))

    if not image_paths:
        print(f"No images found in '{custom_data_dir}'")
        return

    total_images = len(image_paths)
    plt.figure(figsize=(12, total_images * 3))

    for idx, img_path in enumerate(image_paths):
        img = Image.open(img_path).convert('RGB')
        input_tensor = data_transforms['val'](img).unsqueeze(0).to(device)

        with torch.no_grad():
            output = model(input_tensor)
            _, pred = torch.max(output, 1)
            predicted_angle = class_to_angle[pred.item()]

        corrected_img = rotate_and_crop(img, predicted_angle)

        corrected_filename = f"corrected_{os.path.basename(img_path)}"
        corrected_path = os.path.join(corrected_data_dir, corrected_filename)
        corrected_img.save(corrected_path)

        print(f"[{idx+1}] {os.path.basename(img_path)} "
              f"→ Predicted tilt: {predicted_angle}° "
              f"→ Saved: {corrected_filename}")

        ax1 = plt.subplot(total_images, 2, idx * 2 + 1)
        ax1.imshow(img)
        ax1.set_title(f'Original\nPred: {predicted_angle}°')
        ax1.axis('off')

        ax2 = plt.subplot(total_images, 2, idx * 2 + 2)
        ax2.imshow(corrected_img)
        ax2.set_title('Corrected')
        ax2.axis('off')

    plt.tight_layout()
    plt.show()

# --------------------------------
# 6. Set device and run everything
model_ft = load_trained_model()
rotate_and_visualize(
    model=model_ft,
    custom_data_dir='custom_data',           # Folder with test/tilted images
    corrected_data_dir='corrected_images'    # Output folder for corrected images
)


In [None]:
import glob

image_paths = glob.glob('8apr_dataset_split/test/*/*.jpg')
print(f"Found {len(image_paths)} test images.")
print("\nSample paths:")
print("\n".join(image_paths[:5]))


In [None]:
print("Class-to-Index Mapping:")
print(image_datasets['train'].class_to_idx)


In [None]:
from collections import Counter

for phase in ['train','val']:
    labels = [label for (_,label) in image_datasets[phase].imgs]
    print(phase, Counter(labels))
