In [1]:
import torch
import torchvision
from torchvision import datasets, transforms, models
import torch.nn as nn
import torch.optim as optim
from PIL import ImageFile
import os

# Allow loading of truncated images
ImageFile.LOAD_TRUNCATED_IMAGES = True

# Define dataset paths
train_dir = r"/redd/train"
test_dir = r"/redd/test"
# train_dir = "/workspace/sdp/redd/train"
# test_dir = "/workspace/sdp/redd/test"


In [2]:

import cv2
import random


def apply_random_degradation(image):
    degrade_options = [
        add_gaussian_blur,
        add_gaussian_noise,
        add_haze_effect,
        adjust_contrast_randomly,
        add_jpeg_artifacts
    ]

    num_degradations = random.randint(1, 2)
    selected = random.sample(degrade_options, num_degradations)

    for degrade in selected:
        image = degrade(image)

    return image


def add_gaussian_blur(image):
    """Mild Gaussian blur"""
    if random.random() < 0.3:
        kernel_size = random.choice([3, 5])
        return cv2.GaussianBlur(image, (kernel_size, kernel_size), 0)
    return image


def add_gaussian_noise(image):
    """Mild Gaussian noise"""
    if random.random() < 0.3:
        row, col, ch = image.shape
        sigma = 0.01  # previously variable, now fixed low
        gauss = np.random.normal(0, sigma, (row, col, ch))
        noisy = image + gauss
        return np.clip(noisy, 0, 1)
    return image


def add_haze_effect(image):
    """Light haze"""
    if random.random() < 0.2:
        haze_factor = random.uniform(0.85, 0.95)  # previously 0.3–0.7
        haze_color = np.ones_like(image) * 0.9
        hazy = image * haze_factor + haze_color * (1 - haze_factor)
        return np.clip(hazy, 0, 1)
    return image


def adjust_contrast_randomly(image):
    """Subtle contrast adjustment"""
    if random.random() < 0.3:
        alpha = random.uniform(0.85, 1.15)
        return np.clip(alpha * image, 0, 1)
    return image


def add_jpeg_artifacts(image):
    """Mild JPEG compression artifacts"""
    if random.random() < 0.2:
        quality = random.randint(50, 80)  # narrowed quality range
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
        result, encimg = cv2.imencode('.jpg', (image * 255).astype(np.uint8), encode_param)
        decimg = cv2.imdecode(encimg, 1)
        return decimg.astype(np.float32) / 255.0
    return image


In [3]:
import cv2
import random
import numpy as np
from PIL import Image

def apply_random_degradation(img):
    """Wrapper to convert PIL image to np.array, apply degradation, and convert back to PIL."""
    img = np.array(img).astype(np.float32) / 255.0  # Normalize to [0,1]

    degrade_options = [
        add_gaussian_blur,
        add_gaussian_noise,
        add_haze_effect,
        adjust_contrast_randomly,
        add_jpeg_artifacts
    ]

    num_degradations = random.randint(1, 2)
    selected = random.sample(degrade_options, num_degradations)

    for degrade in selected:
        img = degrade(img)

    img = (img * 255).astype(np.uint8)  # Back to [0,255] for PIL
    return Image.fromarray(img)


def add_gaussian_blur(image):
    if random.random() < 0.3:
        kernel_size = random.choice([3, 5])
        return cv2.GaussianBlur(image, (kernel_size, kernel_size), 0)
    return image


def add_gaussian_noise(image):
    if random.random() < 0.3:
        row, col, ch = image.shape
        sigma = 0.01
        gauss = np.random.normal(0, sigma, (row, col, ch))
        noisy = image + gauss
        return np.clip(noisy, 0, 1)
    return image


def add_haze_effect(image):
    if random.random() < 0.2:
        haze_factor = random.uniform(0.85, 0.95)
        haze_color = np.ones_like(image) * 0.9
        hazy = image * haze_factor + haze_color * (1 - haze_factor)
        return np.clip(hazy, 0, 1)
    return image


def adjust_contrast_randomly(image):
    if random.random() < 0.3:
        alpha = random.uniform(0.85, 1.15)
        return np.clip(alpha * image, 0, 1)
    return image


def add_jpeg_artifacts(image):
    if random.random() < 0.2:
        quality = random.randint(50, 80)
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
        result, encimg = cv2.imencode('.jpg', (image * 255).astype(np.uint8), encode_param)
        decimg = cv2.imdecode(encimg, 1)
        return decimg.astype(np.float32) / 255.0
    return image


In [4]:
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

# Define your custom preprocessing function
def apply_random_degradation(img):
    # Example: convert to tensor and do some custom transformations
    # You can modify this as needed
    return img  # Assuming your function modifies the PIL image directly

# Data augmentation and normalization for training
train_transforms = transforms.Compose([
    transforms.RandomRotation(40),
    transforms.RandomResizedCrop(150, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=(0.5, 1.5)),
    transforms.RandomAffine(degrees=0, shear=20, translate=(0.2, 0.2)),
    transforms.Lambda(apply_random_degradation),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),  # Normalize to [-1, 1] or adjust to your needs
])

# Only resizing and normalization for testing
test_transforms = transforms.Compose([
    transforms.Resize((150, 150)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])

# Load datasets
train_dataset = ImageFolder(train_dir, transform=train_transforms)
test_dataset = ImageFolder(test_dir, transform=test_transforms)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [5]:
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

class VGGTransferModel(nn.Module):
    def __init__(self, num_classes=3):
        super(VGGTransferModel, self).__init__()

        # Load pre-trained VGG16 model
        base_model = models.vgg16(pretrained=True)

        # Freeze all VGG16 feature layers
        for param in base_model.features.parameters():
            param.requires_grad = False

        self.features = base_model.features
        self.avgpool = base_model.avgpool  # Adaptive avg pool to (7,7)

        # Define custom classifier
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.SiLU(),  # swish equivalent in PyTorch
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.SiLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = self.classifier(x)
        return x

# Instantiate model
model = VGGTransferModel(num_classes=3)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Print model summary (simple text)
print(model)


Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to C:\Users\anmol/.cache\torch\hub\checkpoints\vgg16-397923af.pth
100.0%


VGGTransferModel(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=

In [7]:
import torch
import pickle
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Early stopping class
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.counter = 0
        self.best_loss = float('inf')
        self.best_model_state = None

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            if self.restore_best_weights:
                self.best_model_state = model.state_dict()
        else:
            self.counter += 1
            if self.counter >= self.patience:
                if self.restore_best_weights:
                    model.load_state_dict(self.best_model_state)
                return True
        return False

# Initialize scheduler and early stopping
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, min_lr=1e-6, verbose=True)
early_stopper = EarlyStopping(patience=3, restore_best_weights=True)

epochs = 50
history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

for epoch in range(epochs):
    model.train()
    running_loss = 0.0

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} - Training"):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    history['train_loss'].append(epoch_loss)

    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)

            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()

    val_loss /= len(test_loader.dataset)
    val_acc = correct / len(test_loader.dataset)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    print(f"Epoch {epoch+1}: Train Loss = {epoch_loss:.4f}, Val Loss = {val_loss:.4f}, Val Acc = {val_acc:.4f}")

    scheduler.step(val_loss)

    if early_stopper(val_loss, model):
        print("Early stopping triggered.")
        break

# Save model
torch.save(model.state_dict(), "third_model_da.pth")

# Save training history
with open("history3_da.pkl", "wb") as f:
    pickle.dump(history, f)

print("Model and training history saved successfully!")


Epoch 1/50 - Training: 100%|██████████| 469/469 [02:21<00:00,  3.32it/s]


Epoch 1: Train Loss = 0.7537, Val Loss = 0.5856, Val Acc = 0.9687


Epoch 2/50 - Training: 100%|██████████| 469/469 [01:54<00:00,  4.11it/s]


Epoch 2: Train Loss = 0.6299, Val Loss = 0.5729, Val Acc = 0.9807


Epoch 3/50 - Training: 100%|██████████| 469/469 [02:18<00:00,  3.38it/s]


Epoch 3: Train Loss = 0.6193, Val Loss = 0.5735, Val Acc = 0.9787


Epoch 4/50 - Training: 100%|██████████| 469/469 [01:54<00:00,  4.10it/s]


Epoch 4: Train Loss = 0.6162, Val Loss = 0.5694, Val Acc = 0.9820


Epoch 5/50 - Training: 100%|██████████| 469/469 [01:54<00:00,  4.09it/s]


Epoch 5: Train Loss = 0.6121, Val Loss = 0.5684, Val Acc = 0.9827


Epoch 6/50 - Training: 100%|██████████| 469/469 [01:54<00:00,  4.08it/s]


Epoch 6: Train Loss = 0.6094, Val Loss = 0.5663, Val Acc = 0.9847


Epoch 7/50 - Training:   4%|▍         | 21/469 [00:05<01:52,  3.99it/s]


KeyboardInterrupt: 