# Kaggle Garbage Classification Dataset

In [None]:
# Import packages
import os
import torch
import torchvision
import torch.nn as nn
import numpy as np
import pandas as pd
import seaborn as sn
import copy

from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from sklearn.metrics import confusion_matrix

from matplotlib import pyplot as plt
from tqdm import tqdm

In [None]:
# GPU usage
use_gpu = True
device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')

# Specify data directory
data_root = '../../datasets/kaggle-garbage-classification/'
data_dir = os.path.join(data_root, 'images')

## Load data

In [None]:
# Create dataset
"""
# For FCN
trans = transforms.Compose([
                transforms.Resize((48, 64)),
                transforms.ToTensor(),
                transforms.Normalize([0.6718, 0.6381, 0.6041],
                                     [0.2005, 0.2032, 0.2268])
            ])
# For CNN
trans = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize([0.6718, 0.6381, 0.6041],
                                     [0.2005, 0.2032, 0.2268])
            ])
"""
trans = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize([0.6718, 0.6381, 0.6041],
                                     [0.2005, 0.2032, 0.2268])
            ])

dataset = ImageFolder(data_dir, transform=trans)


# Show a sample image
sample_img = dataset[567][0].numpy().transpose(1, 2, 0)
# Renormalize image to interval [0, 1]
mi, ma = (np.min(sample_img, axis=(0, 1)), np.max(sample_img, axis=(0, 1)))
sample_img = (sample_img - mi) / (ma - mi)
plt.imshow(sample_img)
plt.show()

In [None]:
# Get subsets form list
splits = ['train', 'val', 'test']
data_splits = []
for s in splits:
    list_name_class = pd.read_csv(os.path.join(data_root, 'one-indexed-files-notrash_%s.txt' % s), sep=' ', header=None)
    names = np.array(list_name_class[0])
    names = np.sort(names)

    inds = []
    j = 0
    for i, img in enumerate(dataset.imgs):
        file_name = os.path.split(img[0])[-1]
        if file_name == names[j]:
            inds.append(i)
            j = j + 1
        if j >= len(names):
            break
    data_splits.append(torch.utils.data.Subset(dataset, inds))

trainset, valset, testset = data_splits

## Neural Networks

In [None]:
class FullyConnectedNeuralNetwork(nn.Module):
    def __init__(self, channels= [9216] + 5 * [128], 
                 n_classes=6):
        # Initialize object
        super().__init__()
        
        # Setup parameters
        self.n_layers = len(channels) - 1
        
        # Compose layers
        layers = [nn.Flatten()]
        for i in range(self.n_layers):
            layers.append(nn.Linear(channels[i], channels[i + 1]))
            layers.append(nn.ReLU())
        
        # Output layer
        layers.append(nn.Linear(channels[-1], n_classes))
        
        self.net = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.net(x)

In [None]:
class ConvolutionalNeuralNetwork(nn.Module):
    def __init__(self, channels=[8, 16, 32, 64, 128, 258, 512], 
                 kernel_size=3, strides=[2, 2, 2, 2, 2, 2, 2],
                 final_size=(3, 4), n_classes=6):
        # Initialize object
        super().__init__()
        
        # Setup parameters
        self.n_layers = len(channels)
        channels = [3] + channels
        if isinstance(kernel_size, int):
            kernel_size = self.n_layers * [kernel_size]
        
        # Compose layers
        layers = []
        for i in range(self.n_layers):
            layers.append(nn.Conv2d(channels[i], channels[i + 1],
                                    kernel_size[i], strides[i], padding=1))
            layers.append(nn.ReLU())
        layers = layers[:-1]
        
        # Average pooling, flatten, and linear layer at the end
        layers.append(nn.AvgPool2d(final_size))
        layers.append(nn.Flatten())
        layers.append(nn.Linear(channels[-1], n_classes))
        
        self.net = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.net(x)

In [None]:
# Initialize model
#model = FullyConnectedNeuralNetwork()
model = ConvolutionalNeuralNetwork()
model = model.to(device)

## Train model

In [None]:
# Prepare training
batch_size = 32

# Data loaders
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size)

# Loss
loss_fn = torch.nn.CrossEntropyLoss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# Scheduler for learning rate decay
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.6, patience=3)

def evaluate(model, loader, loss_fn, device):
    n = 0
    loss_cum = 0
    acc_cum = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = (x.to(device), y.to(device))
            batch_size = len(x)
            pred = model(x)
            loss_cum = loss_cum + loss_fn(pred, y).item() * batch_size
            pred_class = torch.argmax(pred, -1)
            acc_cum = acc_cum + torch.sum(pred_class == y).item()
            n = n + batch_size
    return loss_cum / n, acc_cum / n

In [None]:
# Do Training
n_epoch = 2

train_loss_hist = []
val_loss_hist = []
val_acc_hist = []

for i in range(n_epoch):
    # Run training epoch
    model.train()
    for x, y in tqdm(train_loader):
        x, y = (x.to(device), y.to(device))
        optimizer.zero_grad()
        out = model(x)
        loss = loss_fn(out, y)
        train_loss_hist.append(loss.item())
        loss.backward()
        optimizer.step()
        
    # Evaluate model
    model.eval()
    val_perf = evaluate(model, val_loader, loss_fn, device)
    print('Epoch: %i, Valiation loss: %f, Validation accuracy: %f' % ((i + 1,) + val_perf))
    val_loss_hist.append(val_perf[0])
    val_acc_hist.append(val_perf[1])

In [None]:
it = np.linspace(0, n_epoch, len(train_loss_hist) + 1)[1:]
ep = np.arange(n_epoch) + 1

plt.plot(it, train_loss_hist)
plt.plot(ep, val_loss_hist)
plt.ylabel('Loss')
plt.xlabel('Epochs')
plt.legend(['Train', 'Val'])
plt.show()

plt.plot(ep, val_acc_hist)
plt.ylabel('Validation accuracy')
plt.xlabel('Epochs')
plt.show()

## Evaluate on testset

In [None]:
# Compute loss and accuracy on testset
test_perf = evaluate(model, test_loader, loss_fn, device)
print('Test loss: %f, Test accuracy: %f' % test_perf)

In [None]:
# Compute confusion matrix 
predictions = []
labels = []
with torch.no_grad():
    for x, y in test_loader:
        pred = model(x.to(device))
        pred_class = torch.argmax(pred, -1)
        predictions.extend(pred_class.cpu().numpy())
        labels.extend(y.numpy())

cm = confusion_matrix(labels, predictions, normalize='true')
cm_pd = pd.DataFrame(cm, dataset.classes, dataset.classes)

sn.heatmap(cm_pd, annot=True)
plt.show()