# Classificazione YOOX Maniche con Keras

## Importazione Librerie

In [1]:
import os
import numpy as np
import random
from glob import glob

import cv2
import tensorflow as tf

## Preparazione dati

Definisco una classe astratta per generici dataset di classificazione immagini, e la estendo con un'implementazione relativa al dataset trattato. Nell'implementazione, le classi vengono assegnate alle immagini in base alla cartella in cui esse sono contenute. Tramite la funzione load_data, virtuale pura in ClassificationDataset, implementata nelle classi derivate, carico i dati e li splitto, randomicamente, tra training e test set in base al parametro train_imgs_to_test_imgs_ratio. Di default, inserisco circa il 70% delle immagini nel training set, il restante nel test set. Lo shuffling verrà gestito dal generatore.
Infine, calcolo dei pesi per ogni classe inversamente proporzionali alla frequenza delle loro istanze nel dataset, dando così maggiore importanza alle classi meno frequenti. Ad esempio, la classe 'monospalla' risulta meno frequente rispetto alle altre. I pesi saranno poi utilizzati in fase di training.

Si assume che il dataset sia stato estratto in una cartella con path relativo 'data/dati_maniche'. È possibile settare il parametro root_dir da costruttore per utilizzare altre directory.

In [2]:
class ClassificationDataset(object):
    """
    Generic interface for classification datasets
    """
    def __init__(self, root_dir, resize_to = None, train_imgs_to_test_imgs_ratio = 0.7):
        self.root_dir = root_dir

        self.class_str_names = []
        self.class_weights = {} #dict to store class weights for unbalanced datasets
        self.n_of_imgs_per_class = {}

        self.x_train = []
        self.y_train = []
        #
        self.x_test = []
        self.y_test = []

        self.total_n_of_imgs = 0

        self.load_data(resize_to, train_imgs_to_test_imgs_ratio)

    def load_data(self, resize_to, train_imgs_to_test_imgs_ratio):
        raise NotImplementedError
    def get_training_set(self):
        return self.x_train,self.y_train
    def get_test_set(self):
        return self.x_test,self.y_test
    def get_input_shape(self):
        return self.x_train[0].shape
    
class YOOX_ManicheDataset(ClassificationDataset):
    """
    Data archive is assumed to be stored and unzipped into a directory called 'data'
    """
    def __init__(self, root_dir = 'data/dati_maniche', resize_to = None, train_imgs_to_test_imgs_ratio = 0.7):
        super(YOOX_ManicheDataset,self).__init__(root_dir, resize_to, train_imgs_to_test_imgs_ratio)

    def load_data(self,resize_to,train_imgs_to_test_imgs_ratio):

        #Each directory in the root dir represents a single class
        dir_paths = [dir_path for dir_path in glob(os.path.join(self.root_dir,'*')) if os.path.isdir(dir_path)]
        self.class_str_names = [path.split('/')[-1] for path in dir_paths]
        self.num_classes = len(self.class_str_names)

        for class_idx, class_dir in enumerate(dir_paths):
            file_list = glob(os.path.join(class_dir,'*.png'))

            self.n_of_imgs_per_class[class_idx] = len(file_list)
            self.total_n_of_imgs += self.n_of_imgs_per_class[class_idx]

            for file in file_list:
                #Read and normalize input in [0..1] range
                img = cv2.imread(file,1)/255.

                #Set background white to 0, as if it was the result of zero-padding (slighty improved accuracy when done)
                img[(img==1).all(axis=2)] = 0

                if resize_to is not None:
                    img = cv2.resize(img, resize_to)

                #Create one-hot encoding with numpy basic functions
                label = np.zeros(self.num_classes)
                label[class_idx] = 1

                #Split data randomly between train and test sets
                if np.random.random() < train_imgs_to_test_imgs_ratio:
                    self.x_train.append(img)
                    self.y_train.append(label)
                else:
                    self.x_test.append(img)
                    self.y_test.append(label)

        self.x_train = np.asarray(self.x_train)
        self.y_train = np.asarray(self.y_train)
        self.x_test = np.asarray(self.x_test)
        self.y_test = np.asarray(self.y_test)

        #Compute weights for unbalanced classes
        for i in range(self.num_classes):
            self.class_weights[i] = 1.0 - self.n_of_imgs_per_class[i] / self.total_n_of_imgs

## Definizione Modello

Step 3: Definisco un semplice modello convoluzionale per la classificazione delle immagini usando le API dei modelli sequenziali di Keras

In [3]:
class ClassificationModel(object):
    def __init__(self, input_shape, num_classes):
        self.graph = None
        self.define_graph(input_shape,num_classes)
    def define_graph(self,input_shape,num_classes):
        raise NotImplementedError
    def preprocess_img(self,img):
        #Ovveride for model-specific preprocessing
        return img

class BaseConvClassifier(ClassificationModel):
    def __init__(self, input_shape,num_classes):
        super(BaseConvClassifier,self).__init__(input_shape,num_classes)
    def define_graph(self,input_shape,num_classes):
        self.graph = tf.keras.Sequential()

        self.graph.add(tf.keras.layers.Conv2D(16, (3, 3), padding='same',
                                         input_shape=input_shape))
        self.graph.add(tf.keras.layers.Activation('relu'))
        self.graph.add(tf.keras.layers.Conv2D(16, (3, 3), padding='same'))
        self.graph.add(tf.keras.layers.Activation('relu'))
        self.graph.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        self.graph.add(tf.keras.layers.Dropout(0.25))

        self.graph.add(tf.keras.layers.Conv2D(32, (3, 3), padding='same'))
        self.graph.add(tf.keras.layers.Activation('relu'))
        self.graph.add(tf.keras.layers.Conv2D(32, (3, 3), padding='same'))
        self.graph.add(tf.keras.layers.Activation('relu'))
        self.graph.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))

        self.graph.add(tf.keras.layers.Flatten())
        self.graph.add(tf.keras.layers.Dense(512))
        self.graph.add(tf.keras.layers.Activation('relu'))
        self.graph.add(tf.keras.layers.Dropout(0.5))
        self.graph.add(tf.keras.layers.Dense(num_classes))
        self.graph.add(tf.keras.layers.Activation('softmax'))

## Definizione Viewer

Definisco poi un'interfaccia per la visualizzazione dei dati. L'implementazione è basata sulle API OpenCV. (Non ho utilizzato matplotlib per semplicità a causa di alcuni problemi di dipendenze sul mio computer)

In [4]:
class Viewer(object):
    def show_images_with_labels(self, images, labels, label_str_descriptions, num_images_to_show = 1, select_imgs_at_random = True,title=""):
        pass
    def print_dataset_stats(self,data):
        assert(isinstance(data,ClassificationDataset))

        print("Images per class:{}".format(data.n_of_imgs_per_class))
        print("Total images in dataset:{}".format(data.total_n_of_imgs))
        print("Images in training set: {}".format(len(data.x_train)))
        print("Images in test set: {}".format(len(data.x_test)))
    def print_accuracy_result(self,accuracy):
        print("Accuracy: {}".format(accuracy))

class OpenCV_Viewer(Viewer):
    """
    Show images with OpenCV
    """
    def show_images_with_labels(self, images, labels, label_str_descriptions, num_images_to_show = 1, select_imgs_at_random = True, title=""):

        if select_imgs_at_random:
            imgs_to_show = [random.randint(0,len(images)-1) for i in range(num_images_to_show)]
        else:
            imgs_to_show = range(num_images_to_show)
        for i, idx in enumerate(imgs_to_show):
            str_description = label_str_descriptions[np.argmax(labels[idx])]
            cv2.imshow("{}: {}".format(i, len(imgs_to_show),title,str_description),
                       cv2.cvtColor((images[idx]*255).astype(np.uint8),cv2.COLOR_BGR2RGB))
            cv2.waitKey(10)
    
        cv2.waitKey(15000) #Wait 15 seconds
        cv2.destroyAllWindows()


## Impostazione training e test

Imposto una classe Trainer con un metodo statico 'start' per avviare l'addestramento. Il metodo prende come argomento le istanze del dataset e del modello, l'istanza di un oggetto DataGenerator(uno di default o un'eventuale estensione) oltre a vari parametri per l'addestramento. Analogalmente, imposto una classe Tester. Utilizzo un riferimento al Viewer per stampare e visualizzare dati.

In [5]:
class Trainer(object):
    @classmethod
    def start(cls,data,model,loss,optimizer,eval_metrics,datagen,batch_size,epochs, viewer=Viewer()):
        assert(isinstance(data, ClassificationDataset))
        assert(isinstance(model, ClassificationModel))

        viewer.print_dataset_stats(data)

        x_train, y_train = data.get_training_set()

        #Show samples through the viewer
        viewer.show_images_with_labels(x_train, y_train, data.class_str_names, num_images_to_show=8,select_imgs_at_random=True,title="Train Label")

        #Compile graph
        model.graph.compile(loss=loss,optimizer=optimizer,metrics=eval_metrics)
        #Compute stats for the generator, if necessary
        datagen.fit(x_train)
        #Train
        model.graph.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size, shuffle=True),
                                  epochs=epochs, workers=4, class_weight=data.class_weights)
class Tester(object):
    @classmethod
    def start(cls,data,model,viewer=Viewer()):
        x_test,y_test = data.get_test_set()
        predictions = model.graph.predict(x_test)

        viewer.show_images_with_labels(x_test,predictions,data.class_str_names,num_images_to_show=8,select_imgs_at_random=True,title="Prediction")

        accuracy = model.graph.evaluate(x_test,y_test,batch_size=32)[1]
        viewer.print_accuracy_result(accuracy)


## Main method

Infine, imposto il main, instanziando il dataset, il modello, il generatore e il viewer. 

Il resize delle immagini tramite la classe Dataset mi è stato necessario per limitazione della RAM disponibile. Un approccio alternativo avrebbe potuto prevedere un'implementazione del dataset che caricasse in RAM, nei campi x_train, x_test, i path delle immagini, e un'estensione del generatore custom in grado di leggere, per ogni batch, i path e caricare le immagini dinamicamente. Tale approccio avrebbe avuto lo svantaggio di causare frequenti letture in memoria secondaria. Se il resizing fosse stato richiesto dal modello (es. modelli tipo VGG richiedono input di dimensione fissata), sarebbe stato più opportuno implementarlo nel metodo preprocess_img della classe Model, che viene passata come parametro al generatore.

Con i parametri così impostati, ho riscontrato una accuracy sul test set pari a circa il 90.7%

In [6]:
if __name__ == '__main__':

    data = YOOX_ManicheDataset(resize_to=(64,64))
    model = BaseConvClassifier(data.get_input_shape(),data.num_classes)
    datagen = tf.keras.preprocessing.image.ImageDataGenerator(
            horizontal_flip=True,
            preprocessing_function=model.preprocess_img,
            validation_split=0.2)

    viewer = OpenCV_Viewer()

    Trainer.start(data,model,
                  loss='categorical_crossentropy',
                  optimizer=tf.keras.optimizers.Adam(),
                  eval_metrics=['accuracy'],
                  datagen=datagen,
                  batch_size=16,
                  epochs=5,
                  viewer=viewer)


    print("**************TEST******************")
    Tester.start(data,model,viewer)


IndexError: index 0 is out of bounds for axis 0 with size 0