### Import Necessary Packages

In [37]:
import pandas
import numpy as np
import os
import shutil
import torch

### Import Data

In [None]:
labels = ["coca_cola","fanta","sprite"]
for label in labels:
    os.makedirs(f'classification/train/{label}')
    os.makedirs(f'classification/val/{label}')
    os.makedirs(f'classification/test/{label}')

In [None]:
cola_train_count = 0
cola_val_count = 0
cola_test_count = 0
fanta_train_count = 0
fanta_val_count = 0
fanta_test_count = 0
sprite_train_count = 0
sprite_val_count = 0
sprite_test_count = 0

sub_dir = ["coca cola can", "coca cola plastic bottle", "fanta can", 
           "fanta plastic bottle", "sprite can", "sprite plastic bottle"]
for d in sub_dir:
    if d.split()[0] == "coca":
        for f in os.listdir(f'DATA/data/{d}'):
            if cola_train_count < 800:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/train/coca_cola/{cola_train_count}.jpg')
                cola_train_count += 1
            elif cola_val_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/val/coca_cola/{cola_val_count}.jpg')
                cola_val_count += 1
            elif cola_test_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/test/coca_cola/{cola_test_count}.jpg')
                cola_test_count += 1
    elif d.split()[0] == "fanta":
        for f in os.listdir(f'DATA/data/{d}'):
            if fanta_train_count < 800:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/train/fanta/{fanta_train_count}.jpg')
                fanta_train_count += 1
            elif fanta_val_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/val/fanta/{fanta_val_count}.jpg')
                fanta_val_count += 1
            elif fanta_test_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/test/fanta/{fanta_test_count}.jpg')
                fanta_test_count += 1
    elif d.split()[0] == "sprite":
        for f in os.listdir(f'DATA/data/{d}'):
            if sprite_train_count < 800:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/train/sprite/{sprite_train_count}.jpg')
                sprite_train_count += 1
            elif sprite_val_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/val/sprite/{sprite_val_count}.jpg')
                sprite_val_count += 1
            elif sprite_test_count < 100:
                shutil.copy(f'DATA/data/{d}/'+f,f'classification/test/sprite/{sprite_test_count}.jpg')
                sprite_test_count += 1

In [2]:
train_dir = 'classification/train'
val_dir = 'classification/val'
test_dir = 'classification/test'

### Data Process

In [3]:
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.nn.functional as F
import torchvision
from torchvision import models, transforms, datasets
from collections import OrderedDict

In [4]:
dataset_transform = transforms.Compose([
  transforms.Resize(256),
  transforms.CenterCrop(224),
  transforms.RandomRotation(30),
  transforms.RandomHorizontalFlip(),
  transforms.ToTensor(),
  transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [5]:
# Create train, validation and test dataset
train_dataset = datasets.ImageFolder(train_dir,transform=dataset_transform)
val_dataset = datasets.ImageFolder(val_dir,transform=dataset_transform)
test_dataset = datasets.ImageFolder(test_dir,transform=dataset_transform)

In [6]:
# Load datasets into dataloader for iteration
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size = 100, shuffle = True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size = 100, shuffle = True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size = 100, shuffle = True)

In [34]:
image = "./classification/train/fanta/0.jpg"

In [36]:
ig= Image.open(image)
data = np.asarray(ig)
print(data.shape)

(640, 535, 3)


In [7]:
# Training function
def training(model):
    iteration = 0
    minLoss = float("inf")
    best_parameters = None

    # Training Loop
    for epoch in range(epochs):
        # Iterate over the training dataloader
        for i, (images, labels) in enumerate(train_dataloader):
            # Reinitialize the gradient
            optimizer.zero_grad()

            # Get the correct label
            images, labels = images.to(device), labels.to(device)

            # Forward propagation
            outputs = model(images)

            # Calculate the loss
            loss = criterion(outputs, labels)

            # Use the loss for back propagation
            loss.backward()

            # Update network parameters
            optimizer.step()
            
            # Empty the cache of GPU
            del images
            del labels
            del loss
            del outputs
            torch.cuda.empty_cache()

            iteration += 1

            if iteration % 1 == 0 or iteration == 1:
                # Calculate Accuracy and loss        
                correct = 0
                total = 0

                total_loss = 0

                # Validation Loop
                # Iterate over the validation dataloader
                for images, labels in val_dataloader:

                    images, labels = images.to(device), labels.to(device)
                    # Make prediction
                    outputs = model(images)

                    loss = criterion(outputs, labels)
                    total_loss += float(loss)

                    # Get predictions from the maximum value, index 0 is the maximum value, 
                    # index 1 is the index of the maximum value
                    _, predicted = torch.max(outputs.data, 1)

                    # Number of labels
                    total += labels.size(0)

                    # Number of correct predictions
                    correct += (predicted == labels).sum()
                    
                    # Empty cache of GPU
                    del images
                    del labels
                    del loss
                    del outputs
                    torch.cuda.empty_cache()

                # Calculate accuracy
                accuracy = 100 * correct / total

                # Save parameters if loss is smaller
                if total_loss < minLoss:
                    minLoss = total_loss
                    best_parameters = model.state_dict()

                # Print accuracy and loss
                print(f'Iteration {iteration}: Loss: {total_loss}, Validation accuracy: {accuracy}')
    return best_parameters

In [8]:
import matplotlib.pyplot as plt
# Testing
def test(model):
    correct = 0
    total = 0
    total_loss = 0
    count = 0

    # Test Loop
    for images, labels in test_dataloader:

        images, labels = images.to(device), labels.to(device)

        # Make prediction
        outputs = model(images)

        loss = criterion(outputs, labels)
        total_loss += float(loss)

        # Get predictions from the maximum value, index 0 is the maximum value, 
        # index 1 is the index of the maximum value
        _, predicted = torch.max(outputs.data, 1)

        # Number of labels
        total += labels.size(0)

        # Number of correct predictions
        correct += (predicted == labels).sum()
        incorrect = images[predicted != labels]
        
        # Show 5 wrong predicted example
        if count < 5 and (predicted == labels).sum() != labels.size(0):
            
            wrong = "dog" if predicted[predicted != labels][0] else "cat"
            actual = "dog" if labels[predicted != labels][0] else "cat"
            print(f"Predicted {wrong}")
            print(f"Actual {actual}")
            show_image = incorrect[0].cpu().numpy().transpose((1, 2, 0))
            plt.imshow(show_image, cmap='gray')
            plt.show()
            count += 1
        
        # Empty cache
        del images
        del labels
        del loss
        del outputs
        torch.cuda.empty_cache()

    # Calculate accuracy
    accuracy = 100 * correct / total

    # Print accuracy and loss
    print(f'Best model has Loss: {total_loss}, Test accuracy: {accuracy}')

In [9]:
VGG = models.vgg16(pretrained=True)
VGG

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [10]:
# Modify the last layer
VGG.classifier[6] = nn.Linear(in_features=4096, out_features=3)
VGG.classifier.add_module("7", nn.Sigmoid())
print(VGG.classifier)

Sequential(
  (0): Linear(in_features=25088, out_features=4096, bias=True)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): Linear(in_features=4096, out_features=4096, bias=True)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): Linear(in_features=4096, out_features=3, bias=True)
  (7): Sigmoid()
)


In [11]:
# Only unfreeze the last two layer
to_update = ["classifier.6.weight", "classifier.6.bias"]
to_update_params = []
for name, param in VGG.named_parameters():
    if name in to_update:
        param.requires_grad = True
        to_update_params.append(param)
    else:
        param.requires_grad = False

In [12]:
# Training hyperparameters
epochs = 2
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(params=to_update_params, lr=0.001)

In [13]:
# Move model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
VGG = VGG.to(device)

In [14]:
# Training
vgg_best_parameters = training(VGG)



Iteration 1: Loss: 3.321898937225342, Validation accuracy: 32.66666793823242
Iteration 2: Loss: 3.3141952753067017, Validation accuracy: 34.66666793823242
Iteration 3: Loss: 3.2939783334732056, Validation accuracy: 31.0
Iteration 4: Loss: 3.3082375526428223, Validation accuracy: 33.333335876464844
Iteration 5: Loss: 3.2860840559005737, Validation accuracy: 34.0
Iteration 6: Loss: 3.2835062742233276, Validation accuracy: 35.66666793823242
Iteration 7: Loss: 3.272558093070984, Validation accuracy: 35.333335876464844
Iteration 8: Loss: 3.263235926628113, Validation accuracy: 38.333335876464844
Iteration 9: Loss: 3.2822173833847046, Validation accuracy: 36.66666793823242
Iteration 10: Loss: 3.292462468147278, Validation accuracy: 33.66666793823242
Iteration 11: Loss: 3.281262755393982, Validation accuracy: 34.333335876464844
Iteration 12: Loss: 3.2986907958984375, Validation accuracy: 35.0
Iteration 13: Loss: 3.2946826219558716, Validation accuracy: 32.66666793823242


KeyboardInterrupt: 