In [None]:
# ---------- visualizzo piu output a schermo insieme ------------------------------
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity="all"
#-------------------------- scelgo se usare CPU o GPU ------------------------------
import tensorflow as tf
import numpy as np
import os

SEED = 1234
tf.random.set_seed(SEED)
cwd = os.getcwd()

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = '0' # CPU = -1 , GPU = 0

gpus = tf.config.experimental.list_physical_devices('GPU')
cpus = tf.config.experimental.list_physical_devices('CPU')

if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)
elif cpus:
    try:
        logical_cpus= tf.config.experimental.list_logical_devices('CPU')
        print(len(cpus), "Physical CPU,", len(logical_cpus), "Logical CPU")
    except RuntimeError as e:
        print(e)
#-------------------------------------------------------------------------------------

In [None]:
#---------------serve per dividere in sottocartelle il training set-------------------
import json, shutil, os

with open('MaskDataset\\train_gt.json', 'r') as labels:
    data=labels.read()
#----------- scegli cartella immagini miste ------------------
imm_source= os.path.join(cwd, 'MaskDataset\\training')
#----------- nome dataset organizzato ------------------------
data_dir=os.path.join(cwd,'MaskDataset\\dataset')
#----------- se non ce ordino tutto --------------------------
if not os.path.exists(data_dir):
    dir0 = os.path.join(data_dir, '0')
    dir1 = os.path.join(data_dir, '1')
    dir2 = os.path.join(data_dir, '2')
    os.mkdir(data_dir)
    os.mkdir(dir0)
    os.mkdir(dir1)
    os.mkdir(dir2)
    #leggo i label dal file json
    obj = json.loads(data)
    #ciclo su tutti i label e sposto immagini in cartelle separate
    for i in obj:
        if obj[i]==0:
            shutil.copy(os.path.join(imm_source,i), dir0,  follow_symlinks=True)
        elif obj[i]==1:
            shutil.copy(os.path.join(imm_source,i), dir1,  follow_symlinks=True)
        elif obj[i]==2:
            shutil.copy(os.path.join(imm_source,i), dir2,  follow_symlinks=True)

    labels.close()
    
#------------------------------------------------------------------------------------

In [None]:
#------------------------ creo i data generator -------------------
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# scegli se applicare data augmentation (solo su training)
apply_data_augmentation = True

#---------------- training ----------------------------------------
if apply_data_augmentation:
    train_data_gen = ImageDataGenerator(validation_split=0.2,
                                        rotation_range=30,
                                        #brightness_range=[0.8,1.2],
                                        width_shift_range=0.2,
                                        height_shift_range=0.2,
                                        zoom_range=0.15,
                                        shear_range=0.15,
                                        horizontal_flip=True,
                                        vertical_flip=False,
                                        fill_mode='nearest',
                                        #cval=0,
                                        rescale=1./255)
# datagenerator senza augmentation
else:
    train_data_gen = ImageDataGenerator(validation_split=0.2,
                                        rescale=1./255)
    
#------------------- validation -----------------------------------   
validation_data_gen = ImageDataGenerator(validation_split=0.2,
                                         rescale=1./255)
#------------------------------------------------------------------

In [None]:
#----------------------------- Creo generators da file ------------------------------------
source_dir = os.path.join(cwd,'MaskDataset')

# Batch size
bs = 32

# dimensione immagini (VGG16 vuole 224x224)
img_h=299
img_w=299

#ho tre calssi: 0, 1, 2
num_classes=3

# usa se vuoi assegnare label diversi
decide_class_indices = False
if decide_class_indices:
    classes = ['0', # no maschera
               '1', # tutti maschera
               '2'] # alcuni maschera
else:
    classes=None

# Training set
dataset_dir = os.path.join(source_dir, 'dataset')
train_gen = train_data_gen.flow_from_directory(dataset_dir,
                                               target_size=(img_h, img_w),
                                               subset='training',
                                               batch_size=bs,
                                               classes=classes,
                                               color_mode="rgb",
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED)

# Validation set
validation_gen = validation_data_gen.flow_from_directory(dataset_dir,
                                                         target_size=(img_h, img_w),
                                                         subset='validation',
                                                         batch_size=bs,
                                                         classes=classes,
                                                         color_mode="rgb",
                                                         class_mode='categorical',
                                                         shuffle=False,
                                                         seed=SEED)

#---------------------------------------------------------------------------------------------

In [None]:
#---------------------- Creo dataset training -------------------------------------------------------------------
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

train_dataset = train_dataset.repeat()

#---------------------- Creo dataset validation -----------------------------------------------------------------
valid_dataset = tf.data.Dataset.from_generator(lambda: validation_gen, 
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

valid_dataset = valid_dataset.repeat()


In [None]:

base = tf.keras.applications.Xception(weights="imagenet",
                                      include_top=False,
                                      input_shape=(img_h, img_w, 3))

#aggiungo i FC layer
model = tf.keras.Sequential()
model.add(base)
model.add(tf.keras.layers.GlobalAveragePooling2D())
model.add(tf.keras.layers.Dense(256, activation=('relu')))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.Dense(units=num_classes, activation=('softmax')))

#------------------- scelgo se caricare i pesi ------------------------------
load_weights = True
model_name = 'model_01'
model_dir = os.path.join(cwd, 'Xception')

if load_weights:            
    model.load_weights(os.path.join(model_dir, model_name))
#----------------------------------------------------------------------------

#frizzo tutta la base
for layer in base.layers:
    layer.trainable = False

In [None]:
#-------- classe per cambiare lr ----------------------
class CLR(tf.keras.callbacks.Callback):
    #costruttore (devo passargli schedule)
    def __init__(self, schedule):
        super(CLR, self).__init__()
        #attributo classe = input
        self.schedule = schedule
    #chiama a ogni epoch
    def on_epoch_begin(self, epoch, logs=None):
        #se non ha lr --> errore
        if not hasattr(self.model.optimizer, "lr"):
            raise ValueError('non hai settato lr')
        # Cout << get(lr)
        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        # lookup table per settare lr
        scheduled_lr = self.schedule(epoch, lr)
        # cin >> set(lr)
        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)

In [None]:
#------------ preparo il modello per visualizzare i risultati -------------------------------
from datetime import datetime
from tensorflow.keras.callbacks import EarlyStopping

# lista di callbacks che vado a usare
callbacks = []

# --------------- definisco early stopping (non mi va) -----------------
early_stop = True
if early_stop:
    es_callback = EarlyStopping(monitor='val_loss', patience=5)
    callbacks.append(es_callback)

#--------------------------- lookup table per lr (standard)------------- 

LUT_STD = []

#------------------- funzione per passare lr ---------------------------
def get_lr_std(epoch, lr):
    # ritorna lr corrente
    if epoch < LUT_STD[0][0]:
        return LUT_STD[0][1]
    elif epoch > LUT_STD[len(LUT_STD)-1][0]:
        # finito STD passo a FT
        return LUT_STD[len(LUT_STD)-1][1]
    # lookup table --> lr(epoch): cambia solo ai target
    for i in range(len(LUT_STD)):
        if epoch == LUT_STD[i][0]:
            print("\nnuovo lr: "+str(LUT_STD[i][1]))
            return LUT_STD[i][1]
    #se non specificato ritorno current
    return lr

# aggiungo a lista callbacks
callbacks.append(CLR(get_lr_std))


In [None]:
#------------- setto parametri di ottimizzazione ------------------------------
# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# Compilo il Modello
model.compile(optimizer='adam',
              loss=loss,
              metrics=['accuracy'])

In [None]:
#------------ start fit model ------------------------------
EP = 12

LUT_STD = [(0, 1e-6),
           (6, 1e-8)]

for layer in base.layers[-4:]:
    layer.trainable = True

H = model.fit(x=train_dataset,
              epochs=EP,
              steps_per_epoch=len(train_gen),
              validation_data=valid_dataset,
              validation_steps=len(train_gen), 
              callbacks=callbacks)

finetuning = False

if finetuning:
    
    LUT_STD = [(0, 1e-4),
               (4, 1e-6),
               (8, 1e-8)]
    
    for layer in base.layers[-6:]:
        layer.trainable = True
        
    EP = 8
    H = model.fit(x=train_dataset,
                  epochs=EP,
                  steps_per_epoch=len(train_gen),
                  validation_data=valid_dataset,
                  validation_steps=len(train_gen), 
                  callbacks=callbacks)


In [None]:
from matplotlib import pyplot as plt

def plot_metrics(history):
    metrics = ['accuracy']
    for n, metric in enumerate(metrics):
        name = metric.replace("_"," ").capitalize()
        plt.subplot(2,2,n+1)
    plt.plot(history.epoch, history.history[metric], label='Train')
    plt.plot(history.epoch, history.history['val_'+metric],
              linestyle="--", label='Val')
    plt.xlabel('Epoch')
    plt.ylabel(name)

    if metric == 'accuracy':
        plt.ylim([0.8,1])

    plt.legend()

plot_metrics(H)

In [None]:
#------------- salvo i pesi del modello -------------------------------------------------------
save_weights = True

if save_weights:
    model_name = 'model_01'
    model_dir = os.path.join(cwd, 'Xception')
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)    
    model.save_weights(os.path.join(model_dir, model_name))


In [None]:
# directory dove salvoi i risultati (di entrambi i modelli)
results_dir = os.path.join(cwd, 'results')
if not os.path.exists(results_dir):
        os.makedirs(results_dir)
# directori risultati transfer_model
results_dir = os.path.join(results_dir, 'Xception')
if not os.path.exists(results_dir):
        os.makedirs(results_dir)

#------------ funzione per formattare il file csv ---------------------------------------------
def create_csv(results, results_dir=results_dir):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
#--------- scorro le immagini di test e faccio predizioni --------------------------------------
from PIL import Image
from keras.preprocessing import image

imm_source = os.path.join(cwd, 'MaskDataset\\test')
image_filenames = next(os.walk(imm_source))[2]
# scegli tra questi label per le predizioni
labels=['0', '1', '2']

results = {}
for image_name in image_filenames:   
    img = image.load_img(os.path.join(imm_source, image_name), target_size=(img_h, img_w))    
    img_tensor = image.img_to_array(img)
    img_tensor = np.expand_dims(img_tensor, axis=0)
    img_tensor /= 255.    
    pred = model.predict(img_tensor)
    results[image_name] = labels[np.argmax(pred)]
    
create_csv(results)
