In [None]:
## Step 0: Imports


In [None]:
# Imports here
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
import torchvision.models as models
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
from PIL import Image
import json
from matplotlib.ticker import FormatStrFormatter
import os

In [None]:
## Step 1: Load and split data

In [None]:
# Function to filter out folders with 5 or fewer images and count remaining classes
def filter_folders(data_dir):
    filtered_folders = []
    for folder in os.listdir(data_dir):
        folder_path = os.path.join(data_dir, folder)
        if os.path.isdir(folder_path):
            num_images = len(os.listdir(folder_path))
            if num_images > 5:
                filtered_folders.append(folder)
    return filtered_folders, len(filtered_folders)

In [None]:
# Path to your data folder
data_dir = 'data34'

# Filter out folders with 5 or fewer images and count remaining classes
filtered_folders, num_classes = filter_folders(data_dir)
print(num_classes)

# Load the dataset using ImageFolder only for filtered folders
full_dataset = ImageFolder(root=data_dir)#r, transform=None)

# Filter out samples that don't belong to filtered folders
filtered_dataset = [(image, label) for image, label in full_dataset.samples if os.path.basename(os.path.dirname(image)) in filtered_folders]


# Define the percentage of data to be used for testing and validation
test_split = 0.05
valid_split = 0.1

# Calculate the number of samples for training, testing and validation
num_samples = len(full_dataset )
num_test_samples = int(test_split * num_samples)
num_valid_samples = int(valid_split * num_samples)
num_train_samples = num_samples - num_test_samples - num_valid_samples

# Split the dataset into training, validation and testing sets
train_data, valid_data, test_data = random_split(full_dataset , [num_train_samples, num_valid_samples, num_test_samples])


In [None]:
## Step 2: Transform Data

In [None]:
# Training transform includes random rotation and flip to build a more robust model
train_transforms = transforms.Compose([transforms.Resize((244,244)),
                                       transforms.RandomRotation(30),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


# The validation set will use the same transform as the test set
test_transforms = transforms.Compose([transforms.Resize((244,244)),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

validation_transforms = transforms.Compose([transforms.Resize((244,244)),
                                            transforms.CenterCrop(224),
                                            transforms.ToTensor(),
                                            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


In [None]:
# Apply the transforms to the subsets
train_data.dataset.transform = train_transforms
valid_data.dataset.transform = validation_transforms
test_data.dataset.transform = test_transforms

# Create dataloaders for training, validation and testing sets
trainloader = DataLoader(train_data, batch_size=196, shuffle=True)
print(len(trainloader))
validloader = DataLoader(valid_data, batch_size=64, shuffle=True)
print(len(validloader))
testloader = DataLoader(test_data, batch_size=32, shuffle=True)
print(len(testloader))


In [None]:
## Step 3: Create Model

In [None]:
#model = models.densenet121(pretrained=True)
model = models.resnet34(pretrained=True)

In [None]:
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, num_classes)

In [None]:
criterion = nn.CrossEntropyLoss()
#optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
#optimizer = optim.Adam(model.parameters(), lr=0.001)
optimizer = optim.AdamW(model.parameters(), lr=0.001)
#lrscheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, threshold = 0.9)
lrscheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2, factor=0.5, threshold=0.9)


In [None]:
## Step 4: Train Model

In [None]:
# Implement a function for the validation pass
def validation(model, validloader, criterion):
    valid_loss = 0
    accuracy = 0
    
    # change model to work with cuda
    model.to('cuda')

    # Iterate over data from validloader
    for ii, (images, labels) in enumerate(validloader):
    
        # Change images and labels to work with cuda
        images, labels = images.to('cuda'), labels.to('cuda')

        # Forward pass image though model for prediction
        output = model.forward(images)
        # Calculate loss
        valid_loss += criterion(output, labels).item()
        # Calculate probability
        ps = torch.exp(output)
        
        # Calculate accuracy
        equality = (labels.data == ps.max(dim=1)[1])
        accuracy += equality.type(torch.FloatTensor).mean()
    
    return valid_loss, accuracy

In [None]:
epochs = 6
steps = 0
print_every = 20

# change to gpu mode
model.to('cuda')
model.train()
for e in range(epochs):

    running_loss = 0
    
    # Iterating over data to carry out training step
    for ii, (inputs, labels) in enumerate(trainloader):
        steps += 1
        
        inputs, labels = inputs.to('cuda'), labels.to('cuda')
        
        # zeroing parameter gradients
        optimizer.zero_grad()
        
        # Forward and backward passes
        outputs = model.forward(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Save the model after every epoch
        torch.save(model.state_dict(), f'epoch_save/model_epoch_{e+1}.pth')                

        # Carrying out validation step
        if steps % print_every == 0:
            # setting model to evaluation mode during validation
            model.eval()
            
            # Gradients are turned off as no longer in training
            with torch.no_grad():
                valid_loss, accuracy = validation(model, validloader, criterion)
            
            print(f"No. epochs: {e+1}, \
            Training Loss: {round(running_loss/print_every,3)} \
            Valid Loss: {round(valid_loss/len(validloader),3)} \
            Valid Accuracy: {round(float(accuracy/len(validloader)),3)}")
            
            
            # Turning training back on
            model.train()
            lrscheduler.step(accuracy * 100)

In [None]:
correct = 0
total = 0
model.to('cuda')


with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to('cuda'), labels.to('cuda')
        # Get probabilities
        outputs = model(images)
        # Turn probabilities into predictions
        _, predicted_outcome = torch.max(outputs.data, 1)
        # Total number of images
        total += labels.size(0)
        # Count number of cases in which predictions are correct
        correct += (predicted_outcome == labels).sum().item()

print(f"Test accuracy of model: {round(100 * correct / total,3)}%")

In [None]:
## Step 5: Save the Model

In [None]:
# # Saving: feature weights, new model.fc, index-to-class mapping, optimiser state, and No. of epochs
# checkpoint = {'state_dict': model.state_dict(),
#               'model': model.fc,
#               'class_to_idx': train_data.class_to_idx,
#               'opt_state': optimizer.state_dict,
#               'num_epochs': epochs}

# torch.save(checkpoint, 'models/my_checkpoint2.pth')

checkpoint = {'state_dict': model.state_dict(),
              'model': model.fc,
              'class_to_idx': full_dataset.class_to_idx,  # Access class_to_idx from the original dataset
              'opt_state': optimizer.state_dict(),
              'num_epochs': epochs}

torch.save(checkpoint, 'models/car_classifer.pth')

In [None]:
## Step 6: Load the Model

In [None]:
# Write a function that loads a checkpoint and rebuilds the model

def load_checkpoint(filepath):

    #checkpoint = torch.load(filepath)
    checkpoint = torch.load(filepath, map_location=torch.device('cuda'))

    
    #model.load_state_dict(checkpoint['state_dict'])
    model.load_state_dict(checkpoint['state_dict'], strict=False)
    model.class_to_idx = checkpoint['class_to_idx']
    
    return model

In [None]:

# Loading model
model = load_checkpoint('models/car_classifer.pth')
# Checking model i.e. should have 196 output units in the classifier
print(model)

In [None]:
model = torch.nn.DataParallel(model)

In [None]:
## Step 7: Predict the Image

In [None]:
def process_image(image):
    
    # Process a PIL image for use in a PyTorch model

    # Converting image to PIL image using image file path
    pil_im = Image.open(f'{image}' + '.jpg')

    # Building image transform
    transform = transforms.Compose([transforms.Resize((244,244)),
                                    #transforms.CenterCrop(224),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], 
                                                         [0.229, 0.224, 0.225])]) 
    
    # Transforming image for use with network
    pil_tfd = transform(pil_im)
    
    # Converting to Numpy array 
    array_im_tfd = np.array(pil_tfd)
    
    return array_im_tfd

In [None]:
def imshow(image, ax=None, title=None):
    if ax is None:
        fig, ax = plt.subplots()
    
    # PyTorch tensors assume the color channel is the first dimension
    # but matplotlib assumes is the third dimension
    image = image.transpose((1, 2, 0))
    
    # Undo preprocessing
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    
    # Image needs to be clipped between 0 and 1 or it looks like noise when displayed
    image = np.clip(image, 0, 1)
    
    ax.imshow(image)
    
    return ax

In [None]:
imshow(process_image(data_dir + '/1/audi_1'))

In [None]:
def predict(image_path, model, topk=5):
    # Implement the code to predict the class from an image file   
    
    # Loading model - using .cuda() for working with cudas
    loaded_model = load_checkpoint(model)#.to('cpu')
    
    # Pre-processing image
    img = process_image(image_path)
    # Converting to torch tensor from Numpy array
    img_tensor = torch.from_numpy(img).type(torch.FloatTensor)
    # Adding dimension to image to comply with (B x C x W x H) input of model
    img_add_dim = img_tensor.unsqueeze_(0)

    # Setting model to evaluation mode and turning off gradients
    loaded_model.eval()
    with torch.no_grad():
        # Running image through network
        output = loaded_model.forward(img_add_dim)
        
    #conf, predicted = torch.max(output.data, 1)   
    probs_top = output.topk(topk)[0]
    predicted_top = output.topk(topk)[1]
    
    # Converting probabilities and outputs to lists
    #conf = np.array(probs_top)[0]
    #predicted = np.array(predicted_top)[0]

    # Converting probabilities and outputs to lists on cpu
    conf = np.array(probs_top.cpu())[0]
    predicted = np.array(predicted_top.cpu())[0]

        
    #return probs_top_list, index_top_list
    return conf, predicted

In [None]:
# Load the CSV file into a pandas DataFrame
df = pd.read_csv('Ident/class_names.csv')

# Convert the DataFrame into a dictionary
folder_to_class = df.set_index('folder_name')['class_name'].to_dict()


In [None]:
# tie the class indices to their names

def find_classes(dir):
    classes = os.listdir(dir)
    classes.sort()
    class_to_idx = {classes[i]: i for i in range(len(classes))}
    #return list(folder_to_class.values()), class_to_idx
    return classes, class_to_idx
classes, c_to_idx = find_classes(data_dir)#+"train")

print(classes, c_to_idx)

In [None]:
## Step 8: Show the result

In [None]:
model_path = 'models/car_classifer.pth'
image_path = data_dir + '/1/audi_1'


conf1, predicted1 = predict(image_path, model_path, topk=5)

print(conf1)
print(classes[predicted1[0]])


In [None]:
# Testing predict function

# Inputs are paths to saved model and test image
model_path = 'models/car_classifer.pth'
carname = '/1/audi_1'
image_path = data_dir + carname


conf2, predicted1 = predict(image_path, model_path, topk=5)
# Converting classes to names
names = []
for i in range(5):
  
    names += [classes[predicted1[i]]]

# Creating PIL image
image = Image.open(image_path+'.jpg')

# Plotting test image and predicted probabilites
f, ax = plt.subplots(2,figsize = (6,10))

ax[0].imshow(image)
ax[0].set_title(carname)

y_names = np.arange(len(names))
ax[1].barh(y_names, conf2/conf2.sum(), color='darkblue')
ax[1].set_yticks(y_names)
ax[1].set_yticklabels(names)
ax[1].invert_yaxis() 

plt.show()

In [None]:
def plot_solution(cardir, model):
  # Testing predict function

  # Inputs are paths to saved model and test image
  model_path = 'models/car_classifer.pth'
  image_path = data_dir + cardir
  carname = cardir.split('/')[1]

  conf2, predicted1 = predict(image_path, model_path, topk=5)
  # Converting classes to names
  names = []
  for i in range(5):
  
      names += [classes[predicted1[i]]]


  # Creating PIL image
  image = Image.open(image_path+'.jpg')

  # Plotting test image and predicted probabilites
  f, ax = plt.subplots(2,figsize = (6,10))

  ax[0].imshow(image)
  ax[0].set_title(carname)

  y_names = np.arange(len(names))
  ax[1].barh(y_names, conf2/conf2.sum(), color='darkblue')
  ax[1].set_yticks(y_names)
  ax[1].set_yticklabels(names)
  ax[1].invert_yaxis() 

  plt.show()

In [None]:
# Add Section that maps class_names.csv to classes for plotting. 

In [None]:
cardir='/27/bmw_1'
plot_solution(cardir, model)

In [None]:
cardir='/270/00406'
plot_solution(cardir, model)