# Libs

In [1]:
import numpy as np
import pandas as pd
from PIL import Image
from os.path import join, splitext
import time
from sklearn.metrics import accuracy_score  # computes subset accuracy: the set of labels predicted for a sample must exactly match the corresponding set of labels in y_true. https://scikit-learn.org/stable/modules/generated/sklearn.metrics.accuracy_score.html
import random

from torchvision import transforms
import torch
from torch.utils import data # necessary to create a map-style dataset https://pytorch.org/docs/stable/data.html
from torch import nn    # basic building-blocks for graphs https://pytorch.org/docs/stable/nn.html
from torch.utils.data import DataLoader
from torch.optim import SGD
from torch.utils.tensorboard import SummaryWriter

# *** torchvision pretrained models https://pytorch.org/vision/stable/models.html ***
from torchvision.models import squeezenet1_0
from torchvision.models import alexnet
from torchvision.models import vgg16

In [2]:
from google.colab import drive
import zipfile

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [3]:
random.seed(1996)
np.random.seed(1996)

In [4]:
GDRIVE_PATHS = {
    'main': '/content/gdrive/MyDrive/trashbean-classifier/',
    'dataset': '/content/gdrive/MyDrive/trashbean-classifier/dataset/',
    'logs': '/content/gdrive/MyDrive/trashbean-classifier/logs/',
    'models': '/content/gdrive/MyDrive/trashbean-classifier/logs/models/'
}

print(join(GDRIVE_PATHS['logs'], 'test_join'))

/content/gdrive/MyDrive/trashbean-classifier/logs/test_join


In [5]:
print(type(GDRIVE_PATHS), type(GDRIVE_PATHS['logs']))

<class 'dict'> <class 'str'>


# Dataset and average value meters

### TrashbeanDst

In [5]:
class TrashbeanDataset(data.Dataset): # data.Dataset https://pytorch.org/docs/stable/_modules/torch/utils/data/dataset.html#Dataset
    """ A map-style dataset class used to manipulate a dataset composed by:
        image path of trashbean and associated label that describe the available capacity of the trashbean
            0 : empty trashbean
            1 : half trashbean
            2 : full trashbean

        Attributes
        ----------
        data : str
            path of csv file
        transform : torchvision.transforms

        Methods
        -------
        __len__()
            Return the length of the dataset

        __getitem__(i)
            Return image, label of i element of dataset  
    """

    def __init__(self, csv=None, transform=None):
        """ Constructor of the dataset
            Parameters
            ----------
            csv : str
            path of the dataset

            transform : torchvision.transforms
            apply transform to the dataset

            Raises
            ------
            NotImplementedError
                If no path is passed is not provided a default dataset
        """
        
        if csv is None:
            raise NotImplementedError("No default dataset is provided")
        if splitext(csv)[1] != '.csv':
            raise NotImplementedError("Only .csv files are supported")
        
        self.data = pd.read_csv(csv)        # import from csv using pandas
        self.data = self.data.iloc[np.random.permutation(len(self.data))]       # random auto-permutation of the data
        self.transform = transform

    def __len__(self):
        """ Return length of dataset """
        return len(self.data)

    def __getitem__(self, i=None):
        """ Return the i-th item of dataset

            Parameters
            ----------
            i : int
            i-th item of dataset

            Raises
            ------
            NotImplementedError
            If i is not a int
        """
        if i is None:
            raise NotImplementedError("Only int type is supported for get the item. None is not allowed")
        
        im_path, im_label = self.data.iloc[i]['path'], self.data.iloc[i].label
        im = Image.open(im_path)        # Handle image with Image module from Pillow https://pillow.readthedocs.io/en/stable/reference/Image.html
        if self.transform is not None:
            im = self.transform(im)
        return im, im_label

### TsbContainer

In [6]:
class TDContainer:
    """ Class that contains the dataset for training, validation and test
        Attributes
        ----------
        self.training, self.validation, self.test are the TrashbeanDataset object
        self.training_loader, self.validation_loader, self.test_loader are DataLoader of the correspective TrashbeanDataset
    """

    def __init__(self, training=None, validation=None, test=None):
        """ Constructor of the class. Instantiate an Trashbean dataset for each dataset

            Parameters
            ----------
            training: str, required
                path of training dataset csv
            
            validation: str, required
                path of validation dataset csv
            
            test: str, required
                path of test dataset csv
        """
    
        if training is None or validation is None or test is None:
            raise NotImplementedError("No default dataset is provided")
        
        if isinstance(training, dict) is False or isinstance(validation, dict) is False or isinstance(test, dict) is False:
            raise NotImplementedError("Constructor accept only dict file.")

        if training['path'] is None or validation['path'] is None or test['path'] is None or isinstance(training['path'], str) is False or isinstance(validation['path'], str) is False or isinstance(test['path'], str) is False:
            raise NotImplementedError("Path file is required and need to be a str type.")

        self.training = TrashbeanDataset(training['path'], transform=training['transform'])
        self.validation = TrashbeanDataset(validation['path'], transform=validation['transform'])
        self.test = TrashbeanDataset(test['path'], transform=test['transform'])
        self.hasDl = False

    def create_data_loader(self, _batch_size=32, _num_workers=2, _drop_last=False):
        """ Create data loader for each dataset

            https://pytorch.org/docs/stable/data.html
            
            Parameters
            ----------

            _batch_size: int
                number of batches, default 32

            _num_workers: int
                number of workers
        """

        if isinstance(_batch_size, int) is False or isinstance(_num_workers, int) is False:
            raise NotImplementedError("Parameters accept only int value.")

        self.training_loader = DataLoader(self.training, batch_size=_batch_size, num_workers=_num_workers, drop_last=_drop_last, shuffle=True)
        self.validation_loader = DataLoader(self.validation, batch_size=_batch_size, num_workers=_num_workers, drop_last=_drop_last)
        self.test_loader = DataLoader(self.test, batch_size=_batch_size, num_workers=_num_workers, drop_last=_drop_last)
        self.hasDl = True

    def show_info(self):
        """ Print info of dataset """
        print("\n=== *** DB INFO *** ===")
        print("Training:", self.training.__len__(), "values, \nValidation:", self.validation.__len__(), "values, \nTest:", self.test.__len__())
        print("DataLoader:", self.hasDl)
        print("=== *** END *** ====\n")

### Avg value meter

In [7]:
class AverageValueMeter():
    """Calculate Average Value Meter"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.sum = 0
        self.num = 0

    def add(self, value, num):
        self.sum += value*num
        self.num += num

    def value(self):
        try:
            return self.sum/self.num
        except:
            return None

# Pretrained class

## Creator

In [65]:
from __future__ import annotations
from abc import ABC, abstractmethod # https://docs.python.org/3/library/abc.html

# Creator
class PretrainedModelsCreator(ABC):
    """The Creator class declares the factory method that is supposed to return an
    object of a Product class. The Creator's subclasses usually provide the
    implementation of this method."""

    @abstractmethod
    def factory_method(self):
        """ No default implementation needed"""
        pass

    def initialize_dst(self, dataset, output_class: int = 2, dl_attributes: dict = {'batch_size': 32, 'num_workers': 2, 'drop_last': False}) -> None:
        """
        The Creator's primary responsibility is not creating products. Usually, it contains
        some core business logic that relies on Product objects, returned by the factory method.
        Subclasses can indirectly change that business logic by overriding the
        factory method and returning a different type of product from it.
        """

        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # call factory method to create a Product object
        product = self.factory_method()
        # get the model from product
        self.model = product.get_model(output_class)
        # set the dataset inside the object
        self.dst = dataset
        ## instantiate DataLoader too
        self.dst.create_data_loader(_batch_size=dl_attributes['batch_size'], _num_workers=dl_attributes['num_workers'], _drop_last=dl_attributes['drop_last'] )        

    def trainval_classifier(self, exp_name='experiment', lr=0.01, epochs=10, momentum=0.99,
                            log_dir='logs',
                            models_dir='models',
                            train_from_epoch=0, save_on_runtime=False, save_each_iter=20):
        
        model = self.model
        timer_start = time.time()    
        
        criterion = nn.CrossEntropyLoss() # used for classification https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
        
        optimizer = SGD(model.parameters(), lr, momentum=momentum)

        # meters
        loss_meter = AverageValueMeter()
        acc_meter = AverageValueMeter()

        # writer
        writer = SummaryWriter(join(log_dir, exp_name))

        model.to(self.device)
        ## definiamo un dizionario contenente i loader di training e test
        loader = {
            'train': self.dst.training_loader,
            'validation': self.dst.validation_loader
        }
        global_step = 0
        print("Computing epoch:")
        for e in range(epochs):
            print(e+1, "/", epochs, "... ")
            # iteriamo tra due modalità: train e test
            for mode in ['train', 'validation']:
                loss_meter.reset(); acc_meter.reset()
                model.train() if mode == 'train' else model.eval()
                with torch.set_grad_enabled(mode=='train'): # abilitiamo i gradienti o solo in training
                    for i, batch in enumerate(loader[mode]):
                        x = batch[0].to(self.device) # portiamoli su device corretto
                        y = batch[1].to(self.device)
                        output = model(x)

                        # aggiorniamo il global_step
                        # conterrà il numero di campioni visti durante il training
                        n = x.shape[0]  # n di elementi nel batch
                        global_step += n
                        l = criterion(output, y)

                        if mode == 'train':
                            l.backward()
                            optimizer.step()
                            optimizer.zero_grad()

                        acc = accuracy_score(y.to('cpu'), output.to('cpu').max(1)[1])
                        loss_meter.add(l.item(), n)
                        acc_meter.add(acc,n)

                        # loggiamo i risultati iterazione per iterazione solo durante il training
                        if mode == 'train':
                            writer.add_scalar('loss/train', loss_meter.value(), global_step=global_step)
                            writer.add_scalar('accuracy/train', acc_meter.value(), global_step=global_step)

                    # una volta finita l'epoca sia nel caso di training che di test loggiamo le stime finali
                    writer.add_scalar('loss/' + mode, loss_meter.value(), global_step=global_step)
                    writer.add_scalar('accuracy/' + mode, acc_meter.value(), global_step=global_step)

            # conserviamo i pesi del modello alla fine di un ciclo di training e test..
            # ...sul runtime
            if save_on_runtime is True:
                torch.save(model.state_dict(), '%s-%d.pth'%(exp_name, (e+1) + train_from_epoch ) )

            # ...ogni save_each_iter salvo il modello sul drive per evitare problemi di spazio su Gdrive
            if ((e+1) % save_each_iter == 0 or (e+1) % 50 == 0):
                torch.save(model.state_dict(), models_dir + '%s-%d.pth'%(exp_name, (e+1) + train_from_epoch ) )

        timer_end = time.time()
        print("Ended in: ", ((timer_end - timer_start) / 60 ), "minutes" )
        return model


    def test_classifier(self, model, dataLoader):  # self.dataLoader
        model.to(self.device)
        predictions, labels = [], []
        for batch in dataLoader:
            x = batch[0].to(self.device)
            y = batch[1].to(self.device)
            output = model(x)
            preds = output.to('cpu').max(1)[1].numpy()
            labs = y.to('cpu').numpy()
            predictions.extend(list(preds))
            labels.extend(list(labs))
        return np.array(predictions), np.array(labels)


    def train(self, parameters, paths, train_from_epoch, save_on_runtime, save_each_iter) -> None:

        self.model_finetuned = self.trainval_classifier(exp_name=parameters['exp_name'], lr=parameters['lr'], epochs=parameters['epochs'],
                                                        momentum=parameters['momentum'],
                                                        log_dir=paths['logs'],
                                                        models_dir=paths['models'],
                                                        train_from_epoch=train_from_epoch,
                                                        save_on_runtime=save_on_runtime,
                                                        save_each_iter=save_each_iter
                                                        )

        print("**** Training procedure ended. Start to calculate accuracy ...")

        self.model_finetuned_predictions_test, self.dataset_labels_test = self.test_classifier(self.model_finetuned, self.dst.test_loader)
        print("Accuracy of " + parameters['exp_name'] + " %0.2f%%" % (accuracy_score(self.dataset_labels_test, self.model_finetuned_predictions_test)*100) )


    def load_model(self, path: str) -> None:
        print("Loading model using load_state_dict..")
        self.model.load_state_dict(torch.load(path))

    def get_info(self) -> None:
        print("Information about model:\n", self.model)

## Concrete creators

In [66]:
""" Concrete Creators override the factory method in order to change the resulting product's type. """

# ConcreteCreator1 
class CCSqueezeNet(PretrainedModelsCreator):
    def factory_method(self) -> PretrainedModel:
        return CPSqueezeNet()

# ConcreteCreator2
class CCAlexNet(PretrainedModelsCreator):
    def factory_method(self) -> PretrainedModel:
        return CPAlexNet()

# ConcreteCreator3
class CCVgg16(PretrainedModelsCreator):
    def factory_method(self) -> PretrainedModel:
        return CPVgg16()


## Product

In [67]:
# Product
class PretrainedModel(ABC):
    """ The Product interface declares the operations that all concrete products
    must implement."""
    @abstractmethod
    def get_model(self, output_class: int = 3):
        pass

## Concrete products

In [68]:
"""Concrete Products provide various implementations of the Product interface."""

# ConcreteProduct1
class CPSqueezeNet(PretrainedModel):
    def get_model(self, output_class: int = 3):
        model = squeezenet1_0(pretrained=True)
        model.classifier[1] = nn.Conv2d(512, output_class, kernel_size=(1,1), stride=(1,1))

        return model
    
# ConcreteProduct2
class CPAlexNet(PretrainedModel):
    def get_model(self, output_class: int = 3):
        model = alexnet(pretrained=True)
        model.classifier[6] = nn.Linear(4096, output_class) # https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html

        return model

# ConcreteProduct3
class CPVgg16(PretrainedModel):
    def get_model(self, output_class: int = 3):
        model = vgg16(pretrained=True)
        model.classifier[6] = nn.Linear(4096, output_class)

        return model

# Testing

## Load del dataset

In [12]:
train_transform = transforms.Compose([
  transforms.Resize(256),
  transforms.RandomCrop(224),
  transforms.RandomHorizontalFlip(),
  transforms.ToTensor(),
  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
  transforms.Resize(256),
  transforms.CenterCrop(224), # crop centrale
  transforms.RandomHorizontalFlip(),
  transforms.ToTensor(),
  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

trashbean_dataset = TDContainer({
  "path": join(GDRIVE_PATHS['dataset'], 'training.csv'),
  "transform": train_transform
},{
  "path": join(GDRIVE_PATHS['dataset'], 'validation.csv'),
  "transform": test_transform
}, {
    "path": join(GDRIVE_PATHS['dataset'], 'test.csv'),
    "transform": test_transform
})

In [13]:
trashbean_dataset.show_info()


=== *** DB INFO *** ===
Training: 900 values, 
Validation: 900 values, 
Test: 900
DataLoader: False
=== *** END *** ====



## Training

In [69]:
def do_training(creator: PretrainedModelsCreator, dataset: TDContainer, output_class: int, dl_attributes: dict, parameters: dict, paths: dict, train_from_epoch: int=0, save_on_runtime: bool=True, save_each_iter:int=20 ) -> None:
    creator.initialize_dst(dataset, output_class, dl_attributes)
    creator.train(parameters=parameters, paths=paths, train_from_epoch=train_from_epoch, save_on_runtime=save_on_runtime, save_each_iter=save_each_iter)

In [70]:
def do_loading(loaded_model, creator: PretrainedModelsCreator, dataset: TDContainer, output_class: int, dl_attributes: dict, parameters: dict, paths: dict, train_from_epoch: int=0, save_on_runtime: bool=True, save_each_iter:int=20 ) -> None:
  creator.initialize_dst(dataset, output_class, dl_attributes)
  creator.load_model(loaded_model)
  creator.train(parameters=parameters, paths=paths, train_from_epoch=train_from_epoch, save_on_runtime=save_on_runtime, save_each_iter=save_each_iter)

In [None]:
# trashbean_dataset

In [49]:
GDRIVE_PATHS = {
    'main': '/content/gdrive/MyDrive/trashbean-classifier/',
    'dataset': '/content/gdrive/MyDrive/trashbean-classifier/dataset/',
    'logs': '/content/gdrive/MyDrive/trashbean-classifier/logs/',
    'models': '/content/gdrive/MyDrive/trashbean-classifier/logs/models/'
}

In [None]:
# using the Concrete Creator
print("App: Launching training with the SqueezeNet.")
do_training(creator=CCSqueezeNet(), dataset=trashbean_dataset, output_class=3, dl_attributes={'batch_size': 32, 'num_workers': 2, 'drop_last': False}, parameters={'exp_name': 'SqueezeNet__v1', 'lr': 0.001, 'epochs': 1, 'momentum': 0.99 }, paths=GDRIVE_PATHS, train_from_epoch=0, save_on_runtime=True, save_each_iter=1)

### print("App: Launching training with Alexnet.")
### do_training(CCAlexNet(), trashbean_dataset)
### 
### print("App: Launching trainin with VGG16.")
### do_training(CCVgg16(), trashbean_dataset)

In [72]:
print("App: Reloading training with the SqueezeNet.")
do_loading('/content/gdrive/MyDrive/trashbean-classifier/logs/models/SqueezeNet__v1-1.pth', creator=CCSqueezeNet(), dataset=trashbean_dataset, output_class=3, dl_attributes={'batch_size': 32, 'num_workers': 2, 'drop_last': False}, parameters={'exp_name': 'SqueezeNet__v1', 'lr': 0.001, 'epochs': 1, 'momentum': 0.99 }, paths=GDRIVE_PATHS, train_from_epoch=1, save_on_runtime=True, save_each_iter=1)


App: Reloading training with the SqueezeNet.
Loading model using load_state_dict..
Computing epoch:
1 / 1 ... 
Ended in:  1.0224265416463216 minutes
**** Training procedure ended. Start to calculate accuracy ...
Accuracy of SqueezeNet__v1 55.89%
