# Trash Recognition Neural Network 🧠

## Introduction

📚 This was an ML project for USC CSCI 467: Introduction to Machine Learning. My co-authors are Russell Tan and Pablo Tayun-Mazariegos. The premise is as follows:

♻️ Recycling is a well known solution to saving landfill space, however, many people do not know how or often make mistakes in sorting trash. Sorting recyclables before reaching the recycling facility is crucial for effective recycling as it keeps costs down by preventing clogged machinery and the need of manual sorting in the facilities. If contaminants were to pass, the final product would be deemed unsatisfactory and thrown into the landfill rather than being reused. This experiment's purpose is to help improve models designed to classify six different forms of waste: glass, cardboard, metal, paper, plastic and trash.

📦 The classifier, which consists of some form of CNN, will take an image input containing a single piece of waste on a white background (or any solid color background). The model should then classify the object into one of six possible waste categories mentioned prior. 

### Imports

We're mainly using `Pytorch` for the implementation of our Neural Network.

In [None]:
import os
import torch
import torchvision
from torch.utils.data import random_split
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F

## Step 1: Data Collection and Preprocessing

First, we load our dataset stored in a Google Drive.

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
data_dir ='/content/drive/MyDrive/Colab Notebooks/Datasets/CCHANGCS_Garbage ClassificationDataset.zip (Unzipped Files)/Garbage classification/Garbage classification'
classes = os.listdir(data_dir)
print(classes)

Preprocess the images to ideal sizes for the models.

In [None]:
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import torch.nn.functional as F

# Data augmentation and normalization for training
# `target_size` changes according to the model
target_size = (224,224)

transformations = transforms.Compose([
    transforms.Resize(target_size),
    transforms.ToTensor(),
  ])

dataset = ImageFolder(data_dir, transform = transformations)
print(dataset.classes)

Assign each image with their assigned class for comparison purposes.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

def show_sample(img, label):
    print("Label:", dataset.classes[label], "(Class No: "+ str(label) + ")")
    plt.imshow(img.permute(1, 2, 0))

dataset.classes[0]

Print sample of training data with assigned label.

In [None]:
img, label = dataset[12]
show_sample(img, label)

Randomly allocate images to Training, Dev, Test sets.

In [None]:
random_seed = 42
torch.manual_seed(random_seed)

print(dataset)
print(len(dataset))
total = len(dataset)

print((total*0.7), (total*0.1), (total*0.2))
train_ds, val_ds, test_ds = random_split(dataset, [1769 , 253 , 505])

len(train_ds), len(val_ds), len(test_ds)

In [None]:
from torch.utils.data.dataloader import DataLoader

batch_size = 32
train_dl = DataLoader(train_ds, batch_size, shuffle = True, num_workers = 4, pin_memory = True)
val_dl = DataLoader(val_ds, batch_size*2, num_workers = 4, pin_memory = True)
test_dl = DataLoader(test_ds, batch_size*2, num_workers = 4, pin_memory = True)

Display test images and labels.

In [None]:
from torchvision.utils import make_grid

def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images, nrow = 16).permute(1, 2, 0))
        break

## Step 2: Training 🏋️‍♀️

Base model.

In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss

    def validation_step(self, batch):
        images, labels = batch
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}

    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}

    def epoch_end(self, epoch, result):
        print("Epoch {}: train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch+1, result['train_loss'], result['val_loss'], result['val_acc']))

CNN models as classes. These models were all ran and benchmarked independently.

In [None]:
import torchvision.models
from torchvision.models import list_models, get_model

# List available models
print(list_models())
print(list_models(module=torchvision.models))

# Initialize models, temporary use premade models
class AlexNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = torchvision.models.alexnet(pretrained=True)
        # for param in self.network.features.parameters():
        #    param.requires_grad = False
        self.network.classifier = nn.Sequential(
        nn.Dropout(p=0.5, inplace=False),
        nn.Linear(in_features=9216, out_features=4096, bias=True),
        nn.ReLU(inplace=True),
        nn.Dropout(p=0.5, inplace=False),
        nn.Linear(4096, 1024, True),
        nn.ReLU(inplace=True),
        nn.Linear(1024, len(dataset.classes), True),
        )

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class MobileNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = torchvision.models.mobilenet_v2(pretrained=True)
        for param in self.network.parameters():
            param.requires_grad = False
        num_ftrs = self.network.classifier[1].in_features
        self.network.classifier[1] = nn.Linear(num_ftrs, len(dataset.classes))
    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class AlexNet(ImageClassificationBase):
    def __init__(self, num_classes = len(dataset.classes)):
        super(AlexNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=0),
            nn.BatchNorm2d(96),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer5 = nn.Sequential(
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 3, stride = 2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(6400, 4096),
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU())
        self.fc2= nn.Sequential(
            nn.Linear(4096, num_classes))

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

class MobileNet(ImageClassificationBase):
    def __init__(self, num_classes = len(dataset.classes)):
        super(MobileNet, self).__init__()

        def conv_bn(inp, oup, stride):
            return nn.Sequential(
                nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
                nn.BatchNorm2d(oup),
                nn.ReLU(inplace=True)
            )

        def conv_dw(inp, oup, stride):
            return nn.Sequential(
                nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
                nn.BatchNorm2d(inp),
                nn.ReLU(inplace=True),

                nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
                nn.ReLU(inplace=True),
            )

        self.model = nn.Sequential(
            conv_bn(  3,  32, 2),
            conv_dw( 32,  64, 1),
            conv_dw( 64, 128, 2),
            conv_dw(128, 128, 1),
            conv_dw(128, 256, 2),
            conv_dw(256, 256, 1),
            conv_dw(256, 512, 2),
            conv_dw(512, 512, 1),
            conv_dw(512, 512, 1),
            conv_dw(512, 512, 1),
            conv_dw(512, 512, 1),
            conv_dw(512, 512, 1),
            conv_dw(512, 1024, 2),
            conv_dw(1024, 1024, 1),
            nn.AvgPool2d(7),
        )
        self.fc = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.model(x)
        x = x.view(-1, 1024)
        x = self.fc(x)
        return x

class ResNet50(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.resnet50(pretrained=True)

        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, len(dataset.classes))

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))


class ResNet101(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.resnet101(pretrained=True)
        
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, len(dataset.classes))

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class ResNet18(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.resnet18(pretrained=True)
        
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, len(dataset.classes))

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class WideResNet101(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.wide_resnet101_2(pretrained=True)
        
        num_ftrs = self.network.fc.in_features
        self.network.fc = nn.Linear(num_ftrs, len(dataset.classes))

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class MobileNet_V3L(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = torchvision.models.mobilenet_v3_large(pretrained=True)
        self.network.classifier = nn.Sequential(
            nn.Linear(960, 1280, bias=True),
            nn.Hardswish(),
            nn.Dropout(0.2, True),
            nn.Linear(1280, len(dataset.classes), True)
        )
        
    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class MobileNet_V3S(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = torchvision.models.mobilenet_v3_small(pretrained=True)
        self.network.classifier = nn.Sequential(
            nn.Linear(576, 1024, bias=True),
            nn.Hardswish(),
            nn.Dropout(0.2, True),
            nn.Linear(1024, len(dataset.classes), True)
        )

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class SqueezeNet_V1p1(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = torchvision.models.squeezenet1_1(pretrained=False)
        self.network.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=False),
            nn.Conv2d(512, len(dataset.classes), kernel_size=(1, 1)),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d(output_size=(1, 1))
        )

    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

class EfficientNetV2(ImageClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = models.efficientnet_v2_l(pretrained=True)
        self.network.classifier = nn.Sequential(
            nn.Dropout(0.4, True),
            nn.Linear(1280, len(dataset.classes), True)
        )
    
    def forward(self, xb):
        return torch.sigmoid(self.network(xb))

model = AlexNet() # or any of the other models above

Choose a GPU for performance ⚡️

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device

    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl:
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

device = get_default_device()
print(device)

train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)
test_dl = DeviceDataLoader(test_dl, device)
to_device(model, device)

Training algorithm.

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    for epoch in range(epochs):
        # Training Phase
        train_losses = []
        model.train()
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    return history

Actual training.

In [None]:
num_epochs = 20
opt_func = torch.optim.Adam
lr = 5e-5

history = fit(num_epochs, lr, model, train_dl, val_dl, opt_func)

## Step 3: Save Model and Display Statistics 📈

In [None]:

def plot_accuracies(history):
    accuracies = [x['val_acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.title('Accuracy vs. No. of epochs');

plot_accuracies(history)

In [None]:
def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of epochs');

plot_losses(history)

After plotting accuracies and losses, save the model.

In [None]:
model_save_name = 'AlexNetNew_May9.pt'
path = f"/content/drive/MyDrive/Colab Notebooks/TrashModels/{model_save_name}"
torch.save(model, path)

## Step 4: Analyze and Judge Models

In [None]:
model = ResNet50()
model = torch.load('/content/drive/MyDrive/Colab Notebooks/TrashModels/ResNetPT_April17_compost.pt')

model.eval()
print(device)

In [None]:
def predict_image(img, model):
    # Convert to a batch of 1
    xb = to_device(img.unsqueeze(0), device)
    
    # Get predictions from model
    yb = model(xb)
    
    # Pick index with highest probability
    prob, preds  = torch.max(yb, dim=1)
    
    # Retrieve the class label
    return dataset.classes[preds[0].item()]

img, label = test_ds[17]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

img, label = test_ds[23]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

img, label = test_ds[80]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

Use `sklearn`, `seaborn`, `pandas` to plot metrics.

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

y_pred, y_true = [], []

# Iterate over test data
for img, label in test_ds:
        y_pred.append(predict_image(img, model)) # Save Prediction
        y_true.append(dataset.classes[label]) # Save Truth

print(y_pred)
print(y_true)

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)
df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in classes],
                     columns = [i for i in classes])

plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('output.png')