Let us start by importing the libraries:

In [None]:
import os
import random
import torch
import torchvision
from torch.utils.data import random_split
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import copy


Let us see the classes present in the dataset:

In [None]:
# data_dir  = '/kaggle/input/garbage-classification/Garbage classification/Garbage classification'
data_dir  = 'fina_data'
classes = os.listdir(data_dir)
print(classes)

## Transformations:

Now, let's apply transformations to the dataset and import it for use.

Several data augmentations have been tried. The one with highest validation accuracy is kept

In [None]:
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.functional as F

# Data augmentation and normalization for training
target_size = (224,224) # 256,256 for Resnet, 224,224 for mobilenet, 299,299 for inception

transformations = transforms.Compose([
    #旋转
    transforms.RandomRotation(5),
    transforms.Resize(target_size),
    transforms.ColorJitter(brightness=0.2, contrast=0.1),
    #transforms.Resize((256,341)), 
    #transforms.RandomCrop(size = target_size),
    #transforms.RandomHorizontalFlip(),
    
    transforms.ToTensor(),
    #transforms.Normalize([0.6610, 0.6283, 0.5894], [0.2085, 0.2085, 0.2302]) # ImageNet prior
  ])

dataset = ImageFolder(data_dir, transform = transformations)

Let's create a helper function to see the image and its corresponding label:

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

def show_sample(img, label):
    print("Label:", dataset.classes[label], "(Class No: "+ str(label) + ")")
    plt.imshow(img.permute(1, 2, 0))

In [None]:
#original image example
dataset0 = ImageFolder(data_dir, transform = None)
img, label = dataset0[203]
img

In [None]:
# transformed image
img, label = dataset[203]
show_sample(img, label)

# Loading and Splitting Data:

In [None]:
random_seed = 3047
torch.manual_seed(random_seed)
random.seed(random_seed)
np.random.seed(0)

We'll split the dataset into training, validation and test sets:

In [None]:
print(len(dataset))
ld=len(dataset)

In [None]:
train_ds, val_ds, test_ds = random_split(dataset, [int(ld*0.8), int(ld*0.15), ld-int(ld*0.8)-int(ld*0.15)])

len(train_ds), len(val_ds), len(test_ds)

In [None]:
from torch.utils.data.dataloader import DataLoader
batch_size = 8

Now, we'll create training and validation dataloaders using `DataLoader`.

In [None]:
train_dl = DataLoader(train_ds, batch_size, shuffle = True, num_workers = 2, pin_memory = True)
val_dl = DataLoader(val_ds, batch_size*2, num_workers = 2, pin_memory = True)
test_dl = DataLoader(test_ds, batch_size*2, num_workers = 2, pin_memory = True) 

This is a helper function to visualize batches:

In [None]:
from torchvision.utils import make_grid

def show_batch(dl):
    for images, labels in dl:
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(make_grid(images, nrow = 16).permute(1, 2, 0))
        break

In [None]:
# show_batch(train_dl)

# Model Base:

Let's create the model base:

In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch 
        out = self(images)                  # Generate predictions
        loss = F.cross_entropy(out, labels) # Calculate loss
        return loss
    
    def validation_step(self, batch):
        images, labels = batch 
        out = self(images)                    # Generate predictions
        loss = F.cross_entropy(out, labels)   # Calculate loss
        acc = accuracy(out, labels)           # Calculate accuracy
        return {'val_loss': loss.detach(), 'val_acc': acc}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine losses
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()      # Combine accuracies
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch {}: train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch+1, result['train_loss'], result['val_loss'], result['val_acc']))

We'll compare the performance of three models: ResNet50, MobileNet V2, Inception V3

In [None]:

class ResNet(ImageClassificationBase):
    def __init__(self):
        super().__init__()

        # Use a pretrained model
        base_model = models.resnet18(pretrained=True)

        # Save the number of input features for the fc layer
        num_ftrs = base_model.fc.in_features
        
        # Remove the avgpool and fc layer from the base model
        self.features = nn.Sequential(*list(base_model.children())[:-2])

        # Create a new layer to replace the fc layer
        self.fc = nn.Conv2d(num_ftrs, len(dataset.classes), kernel_size=3)
    
    def forward(self, xb):
        # Use the base model to compute the features
        x = self.features(xb)
        # Apply the final convolutional layer
        x = self.fc(x)
        # Use adaptive average pooling to have the same output size
        x = F.adaptive_avg_pool2d(x, (1, 1))
        return torch.sigmoid(x.view(xb.size(0), -1))




Choose from the three models. ResNet generates highest accuracy.

In [None]:
import eiffnetv2
model = eiffnetv2.efficientnetv2_s(num_classes=7)
print(model)
 

## Porting to GPU:

GPUs tend to perform faster calculations than CPU. Let's take this advantage and use GPU for computation:

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)
    

In [None]:
device = get_default_device()
device

In [None]:
train_dl = DeviceDataLoader(train_dl, device)
val_dl = DeviceDataLoader(val_dl, device)
test_dl = DeviceDataLoader(test_dl, device)
to_device(model, device)

# Training the Model:

This is the function for fitting the model.

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, optimizer):
    history = []
    best_val_loss = 10
    patient = 15
   
    for epoch in range(epochs):
        # Training Phase 
        model.train()
        train_losses = []
        for batch in train_loader:
            loss = model.training_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        # Validation phase
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
        
        # Reduce LR
        scheduler.step(result['val_loss'])
        
        # Early stopping
        if result['val_loss']>= best_val_loss:
            trigger += 1
            print('Trigger time',trigger)
            if trigger > patient:
                return history, best_model
        else:
            best_val_loss = result['val_loss']
            best_model = copy.deepcopy(model)
            trigger = 0
    return history,best_model

In [None]:
model = to_device(model, device)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
count_parameters(model)

Let's start training and fine-tuning the model:

In [None]:
num_epochs = 90
lr = 5.5e-5
# optimizer = torch.optim.SGD(model.parameters(), 
#                              lr = lr)
optimizer = torch.optim.Adam(model.parameters(), 
                             #weight_decay = 1,
                             lr = lr)
scheduler = ReduceLROnPlateau(
    optimizer,
    factor = 0.9,
    patience=3,
    cooldown=0,
    min_lr=0,
    verbose=True
)
history,best_model = fit(num_epochs, lr, model, train_dl, val_dl, optimizer)

In [None]:
evaluate(best_model, val_dl)

In [None]:
evaluate(best_model, test_dl)

In [None]:
def plot_accuracies(history):
    accuracies = [x['val_acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.title('Accuracy vs. No. of epochs');

plot_accuracies(history)

In [None]:
def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of epochs');

plot_losses(history)

****ROC, Sensitivity and Specificity****

Plot multi class ROC for test data

In [None]:
preds = torch.zeros((0,7)).cuda()
labels = torch.zeros((0)).cuda()
best_model.eval()
with torch.no_grad():
    for i, (images, label) in enumerate(test_dl):
        pred = best_model(images)
        preds = torch.cat((preds,pred),dim = 0)
        labels = torch.cat((labels,label.float()))

# one hot encoding test labels
y_true = np.zeros(preds.shape)
for i in range (preds.shape[0]):
    for j in range(preds.shape[1]):
        y_true[i][j] = 1 if labels[i]== j else 0

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from itertools import cycle

from sklearn import svm, datasets
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import roc_auc_score
fpr = dict()
tpr = dict()
roc_auc = dict()
n_classes = preds.shape[1]
out = preds.cpu().detach().numpy()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true[:, i], out[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_true.ravel(), out.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

In [None]:
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

# Finally average it and compute AUC
mean_tpr /= n_classes

fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure()
plt.plot(
    fpr["micro"],
    tpr["micro"],
    label="micro-average ROC curve (area = {0:0.2f})".format(roc_auc["micro"]),
    color="deeppink",
    linestyle=":",
    linewidth=4,
)

plt.plot(
    fpr["macro"],
    tpr["macro"],
    label="macro-average ROC curve (area = {0:0.2f})".format(roc_auc["macro"]),
    color="navy",
    linestyle=":",
    linewidth=4,
)

colors = cycle(["aqua", "darkorange", "cornflowerblue"])
for i, color in zip(range(n_classes), colors):
    plt.plot(
        fpr[i],
        tpr[i],
        color=color,
        lw=2,
        label="ROC curve of class {0} (area = {1:0.2f})".format(dataset.classes[i], roc_auc[i]),
    )

plt.plot([0, 1], [0, 1], "k--", lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC of multiclass prediction")
plt.legend(loc="lower right")
plt.show()

Calculate sensitivity and specificity for each class

In [None]:
_, y_pred = torch.max(preds, dim=1)

In [None]:
from sklearn.metrics import classification_report
target_names = [dataset.classes[i] for i in range(7)]
print(classification_report(labels.cpu(), y_pred.cpu(), target_names = target_names))
print('Note: In binary classification, recall of the positive class is also known as “sensitivity”; \n\
recall of the negative class is “specificity”.')

# Visualizing Predictions:

In [None]:
def predict_image(img, model):
    # Convert to a batch of 1
    xb = to_device(img.unsqueeze(0), device)
    # Get predictions from model
    yb = model(xb)
    # Pick index with highest probability
    prob, preds  = torch.max(yb, dim=1)
    # Retrieve the class label
    return dataset.classes[preds[0].item()]

Let us see the model's predictions on the test dataset:

In [None]:
img, label = test_ds[7]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

In [None]:
img, label = test_ds[3]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

In [None]:
img, label = test_ds[1]
plt.imshow(img.permute(1, 2, 0))
print('Label:', dataset.classes[label], ', Predicted:', predict_image(img, model))

# Conclusion:

Our model is able to classify garbage with **95% accuracy**!

It's great to see the model's predictions on the test set. It works pretty good on external images too!

You can try experimenting with more images and see the results!

### If you liked the kernel, don't forget to show some appreciation :)

In [None]:
## save model 
import torch

# state_dict = model.state_dict()
# torch.save(state_dict, checkpoint_new_file, _use_new_zipfile_serialization=False)

torch.save(model.state_dict(), 'mobilenet_fcn.pt')

In [None]:
import torch
import torchvision
## load model and inference
model =MobileNet()
model_path  = 'mobilenet_fcn.pt' 
model.load_state_dict(torch.load(model_path))
model=model.cuda()
model.eval()

def img_preprocess(img):
    img = transformations(img)
    # img = img.unsqueeze(0)
    return img

from PIL import Image
img = Image.open(r'data\blue_tan\img2.jpg')
img = img_preprocess(img)
predict_image(img, model)


In [None]:
import torch

  # 实例化您的PyTorch模型
# model.load_state_dict(torch.load('new_resnet18.pt'))
# model.eval()
size = (1,3,224,224)
dummy_input = torch.randn(*size).cuda()  # 替换为适当的输入形状
torch.onnx.export(model, dummy_input, model_path[:-3]+".onnx", verbose=True,opset_version=12)


In [None]:
def show_outputs(output):
    output_sorted = sorted(output, reverse=True)
    top5_str = '\n-----TOP 5-----\n'
    for i in range(5):
        value = output_sorted[i]
        index = np.where(output == value)
        for j in range(len(index)):
            if (i + j) >= 5:
                break
            if value > 0:
                topi = '{}: {}\n'.format(index[j], value)
            else:
                topi = '-1: 0.0\n'
            top5_str += topi
    print(top5_str)