In [14]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, random_split
from PIL import Image
import shutil
from tqdm import tqdm
from torchvision.models import ResNet18_Weights

In [2]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Path to the dataset
data_dir = 'extracted_glare_dataset'


In [3]:

# Data augmentation and normalization for training
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [4]:

# Load dataset
dataset = datasets.ImageFolder(data_dir, transform=data_transforms['train'])

In [5]:
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])


In [6]:
# Apply transformations to the validation set
train_dataset.dataset.transform = data_transforms['train']
val_dataset.dataset.transform = data_transforms['val']


In [7]:
train_dir = 'data_split/train'
val_dir = 'data_split/val'

# Create directories if they don't exist
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

In [9]:

# Helper function to save the images
def save_split(dataset_split, split_dir):
    for idx in tqdm(range(len(dataset_split)), desc=f"Saving {split_dir} data"):
        # Get image path and label
        path, label = dataset_split[idx]
        class_name = dataset.classes[label]
        
        # Create class-specific directory if not exists
        class_dir = os.path.join(split_dir, class_name)
        os.makedirs(class_dir, exist_ok=True)
        
        # Define target path
        image_name = os.path.basename(dataset_split.dataset.samples[dataset_split.indices[idx]][0])
        target_path = os.path.join(class_dir, image_name)
        
        # Copy image to the new directory
        shutil.copy(dataset_split.dataset.samples[dataset_split.indices[idx]][0], target_path)


In [10]:
save_split(train_dataset, train_dir)
save_split(val_dataset, val_dir)

Saving data_split/train data: 100%|██████████| 1725/1725 [00:02<00:00, 622.22it/s]
Saving data_split/val data: 100%|██████████| 432/432 [00:00<00:00, 640.95it/s]


In [11]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)


In [15]:
model = models.resnet18(weights=ResNet18_Weights.DEFAULT)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 41)  # 41 classes
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [16]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                dataloader = train_loader
            else:
                model.eval()   # Set model to evaluate mode
                dataloader = val_loader
            
            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data
            for inputs, labels in dataloader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / len(dataloader.dataset)
            epoch_acc = running_corrects.double() / len(dataloader.dataset)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
    
    return model

In [19]:
model = train_model(model, {'train': train_loader, 'val': val_loader}, criterion, optimizer, num_epochs=20)
torch.save(model, 'resnet18_model.pt')


Epoch 1/20
train Loss: 0.1012 Acc: 0.9704
val Loss: 0.1452 Acc: 0.9468
Epoch 2/20
train Loss: 0.0272 Acc: 0.9925
val Loss: 0.0836 Acc: 0.9745
Epoch 3/20
train Loss: 0.0112 Acc: 0.9977
val Loss: 0.0535 Acc: 0.9861
Epoch 4/20
train Loss: 0.0056 Acc: 0.9994
val Loss: 0.0537 Acc: 0.9838
Epoch 5/20
train Loss: 0.0013 Acc: 1.0000
val Loss: 0.0357 Acc: 0.9884
Epoch 6/20
train Loss: 0.0008 Acc: 1.0000
val Loss: 0.0431 Acc: 0.9884
Epoch 7/20
train Loss: 0.0006 Acc: 1.0000
val Loss: 0.0376 Acc: 0.9884
Epoch 8/20
train Loss: 0.0005 Acc: 1.0000
val Loss: 0.0313 Acc: 0.9907
Epoch 9/20
train Loss: 0.0004 Acc: 1.0000
val Loss: 0.0321 Acc: 0.9884
Epoch 10/20
train Loss: 0.0004 Acc: 1.0000
val Loss: 0.0335 Acc: 0.9884
Epoch 11/20
train Loss: 0.0004 Acc: 1.0000
val Loss: 0.0362 Acc: 0.9884
Epoch 12/20
train Loss: 0.0004 Acc: 1.0000
val Loss: 0.0320 Acc: 0.9884
Epoch 13/20
train Loss: 0.0003 Acc: 1.0000
val Loss: 0.0299 Acc: 0.9884
Epoch 14/20
train Loss: 0.0003 Acc: 1.0000
val Loss: 0.0272 Acc: 0.9884
E

In [20]:
torch.save(model.state_dict(), 'resnet18_model_state_dict.pth')


In [None]:
from torchvision.models import resnet18, ResNet18_Weights

# Reinitialize the model architecture
loaded_model = resnet18(weights=ResNet18_Weights.DEFAULT)
num_ftrs = loaded_model.fc.in_features
loaded_model.fc = nn.Linear(num_ftrs, 41)  # Ensure the architecture matches the original

# Load the saved parameters
loaded_model.load_state_dict(torch.load('resnet18_model_state_dict.pth'))
loaded_model = model.to(device)
loaded_model.eval()  # Set the model to evaluation mode


In [None]:
# # Inference function
# def inference(model, image_path):
#     model.eval()
#     image = Image.open(image_path)
#     image = data_transforms['val'](image).unsqueeze(0)
#     image = image.to(device)
    
#     with torch.no_grad():
#         outputs = model(image)
#         _, preds = torch.max(outputs, 1)
#         return preds.item()

# # Example usage
# image_path = 'path_to_an_image_in_dataset'
# class_id = inference(model, image_path)
# print(f'Predicted class ID: {class_id}')