In [None]:
!unzip drive/MyDrive/datasets/garbage_types_xl.zip -d ./data

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision
from torchvision import models
from torchvision.models import ConvNeXt_Tiny_Weights
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from PIL import Image
import os

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CLASSES_1 = {'biological': 0, 'cardboard': 1, 'glass': 2, 'metal': 3, 'paper': 4, 'plastic': 5, 'trash': 6} # XL model for bio + trash
CLASSES_2 = {'cardboard': 0, 'glass': 1, 'metal': 2, 'paper': 3, 'plastic': 4, 'trash': 5} # regular model for everything else
MODEL1_PATH = "./models/neo.pth"
MODEL2_PATH = "./models/trinity.pth"
DATASET_SAVE_PATH = "./drive/MyDrive/datasets/prediction_model_softmax.csv"

MASTER_INDEX = {
    'biological1': 0, 
    'cardboard1': 1, 
    'glass1': 2, 
    'metal1': 3, 
    'paper1': 4, 
    'plastic1': 5, 
    'trash1': 6,
    'cardboard2': 7, 
    'glass2': 8, 
    'metal2': 9, 
    'paper2': 10, 
    'plastic2': 11, 
    'trash2': 12
}

In [None]:
def CONV_NN(num_classes):
    
    # load the pretrained model because what's the point of re-inventing the wheel??
    model = models.convnext_tiny(weights=ConvNeXt_Tiny_Weights.DEFAULT) # in the new version of PyTorch, you can't use pretrained=True
    
    # Freeze all the layers because we only want to train the new layers
    
    for param in model.parameters():
        # disable gradients because we don't need to include pretrained data in the backprop
        param.requires_grad = False
        
        
    # Replace the last layer with a MLP mixer with dropout
    
    model.classifier[-1] = nn.Sequential(
        nn.Linear(768, 256),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(256, num_classes)
    )
    
    return model

# The models

In [None]:
model_1 = CONV_NN(len(CLASSES_1))
model_1.to(DEVICE)
model_1.load_state_dict(torch.load(MODEL1_PATH, map_location=torch.device('cpu')))

model_2 = CONV_NN(len(CLASSES_2))
model_2.to(DEVICE)
model_2.load_state_dict(torch.load(MODEL2_PATH, map_location=torch.device('cpu')))

# Create dataloader

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)), # resize image
    transforms.CenterCrop((224, 224)), # center crop image
    transforms.ToTensor(), # convert to Pytorch tensor
    transforms.Normalize((0.485, 0.456, 0.406),(0.229,0.224,0.225)) # normalize
])

original_dataset = torchvision.datasets.ImageFolder(
    root='./data/Garbage Classification',
    transform=transform
)

print(len(original_dataset))

# Utils for the dataset creation

In [None]:
def map_classes(predictions, cls):
    output = {}
    keys = list(cls.keys())

    for i in range(len(predictions.squeeze())):
        output[list(keys)[i]] = predictions[0][i].item()
        
    return output

def get_predictions(img):
    pred1 = model_1(img.unsqueeze(0).to(DEVICE))
    pred2 = model_2(img.unsqueeze(0).to(DEVICE))

    softmax = nn.Softmax(dim=1)
    softmax1 = softmax(pred1)
    softmax2 = softmax(pred2)
            
    c1 = map_classes(softmax1, CLASSES_1)
    c2 = map_classes(softmax2, CLASSES_2)

    return c1, c2

def preds_to_data(c1, c2):
    
    data = torch.zeros(13)
    
    for key in c1.keys():
        new_key = str(key+"1")
        data[MASTER_INDEX[new_key]] = c1[str(key)]
        
    for key in c2.keys():
        new_key = str(key+"2")
        data[MASTER_INDEX[new_key]] = c2[str(key)]
        
    return data

def save_csv(pred_data, label_data):
  print("saving ...")
  data = {"prediction": pred_data, "label": label_data}
  df = pd.DataFrame(data)
  df.to_csv(DATASET_SAVE_PATH, index=False)

# Predict the images and store the preds with the labels

In [None]:
pred_col = []
label_col = []

for idx, (image, label) in enumerate(original_dataset):
  image = image.to(DEVICE)

  c1, c2 = get_predictions(image)
  data = preds_to_data(c1, c2)

  pred_col.append(data.detach().cpu().numpy().tolist())
  label_col.append(label)

  print(idx)

  # checkpoint system
  if idx % 100 == 0:
    save_csv(pred_col, label_col)

    print("###########################################################")
    print(f"##     Effective: {idx+1}/{len(original_dataset)}       ##")
    print("############################################################")

save_csv(pred_col, label_col)
  