# LIBRERIE


In [None]:
#Importa le librerie necessarie per l'esecuzione del codice
import cv2
import os
import csv
import numpy as np
import keras
import pickle
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, confusion_matrix, classification_report, accuracy_score
from skimage import io
from keras.utils import to_categorical
from keras.layers import Dense, BatchNormalization, Activation, Flatten, Conv2D, MaxPooling2D, Dropout, GlobalAveragePooling2D
from keras.models import Sequential,Model
from keras.applications.inception_v3 import InceptionV3
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.backend import clear_session

# PREPROCESSING

In [None]:
# Specifica la directory contenente le immagini da elaborare
input_dir = "images"
# Specifica la directory in cui salvare le immagini elaborate
output_dir = "images/imagesprocess"

# Crea la directory di output se non esiste già
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Cicla attraverso tutte le immagini nella directory di input
for filename in os.listdir(input_dir):

    #leggo l'immagine
    img = cv2.imread(os.path.join(input_dir, filename))

    # Resize the image
    resized_img = cv2.resize(img, (224,224))
    
    # Salva l'immagine elaborata nella directory di output con lo stesso nome dell'immagine originale
    cv2.imwrite(os.path.join(output_dir, filename),resized_img)

# REALIZZAZIONE FOLD DI TRAINING E DI TEST



In [None]:
#Si crea una lista contenente il nome dei file associati alle fold di train
training_fold_files = []
for file_name in os.listdir(r'bootstrap_folds/train_folds'):
    if file_name.endswith('.csv'):
        training_fold_files.append(file_name)
        
#Lista di liste contente il path delle singole immagini per ogni fold di train       
Train_image=[]      
for fold_n in training_fold_files:
    training_immagine_fold_n=[]
    with open (f'bootstrap_folds/train_folds/{fold_n}') as csvfile:
        reader= csv.reader(csvfile)
        for i,row in enumerate(reader):
            if i == 0:
                continue  # salta la prima riga
            training_immagine_fold_n.append(row[1])
    Train_image.append(training_immagine_fold_n)


#partendo dagli indici contenuti nelle fold andiamo a creare il percorso associato a ciascuna immagine 
training_files=[]
file_path = 'images'
file_prefix = 'cell'

for fold_row in Train_image:    
    training_files_row=[]
    for train_image_data in fold_row:
        for file_name in os.listdir(file_path):
            if file_name.startswith(file_prefix):  
                file_number=int(file_name[len(file_prefix):-4])
                if file_number == int(train_image_data):
                    training_files_row.append(os.path.join(file_path,f'imagesprocess/{file_name}')) 
    training_files.append(training_files_row)
training_files_=[[elem.replace('\\','/')for elem in training_files_row]for training_files_row in training_files]


#Si definisce il percorso del file CSV contenente le etichette
label_file = 'labels.csv'

#Si crea il dizionario che associa ogni immagine alla sua etichetta
labels_immage = {}
with open(label_file) as f:
    reader = csv.reader(f)
    for row in reader:
        img_path, label_str = row[0].split('\t')
        img_path = img_path.replace('/','/imagesprocess/')
        label = int(label_str)
        labels_immage[img_path] = label


#Per ogni fold di training si carica l'immagine e l'etichetta associata seguendo l'indicizzazione delle fold
training_images=[]
training_labels=[]

for fold in training_files_:
    training_fold_images=[]
    training_fold_labels=[]
    for img in fold:
        training_fold_images.append(io.imread(f'{img}'))
        training_fold_labels.append(labels_immage[img])
    training_images.append(training_fold_images)
    training_labels.append(training_fold_labels)

In [None]:
#Si crea una lista contenente il nome dei file associati alle fold di test
testing_fold_files = []
for file_name in os.listdir(r'bootstrap_folds/test_folds'):
    if file_name.endswith('.csv'):
        testing_fold_files.append(file_name)
        
#Lista di liste contente il path delle singole immagini per ogni fold di tesst
Test_image=[] 
for fold_n in testing_fold_files:
    testing_immagine_fold_n=[]
    with open (f'bootstrap_folds/test_folds/{fold_n}') as csvfile:
        reader= csv.reader(csvfile)
        for i,row in enumerate(reader):
            if i == 0:
                continue  # salta la prima riga
            testing_immagine_fold_n.append(row[1])
    Test_image.append(testing_immagine_fold_n)          

#partendo dagli indici contenuti nelle fold andiamo a creare il percorso associato a ciascuna immagine
testing_files=[]
file_path = 'images'
file_prefix = 'cell'

for fold_row in Test_image:    
    testing_files_row=[] 
    for test_image_data in fold_row:
        for file_name in os.listdir(file_path):
            if file_name.startswith(file_prefix):
                file_number=int(file_name[len(file_prefix):-4])
                if file_number == int(test_image_data):
                    testing_files_row.append(os.path.join(file_path,file_name))       
    testing_files.append(testing_files_row)
testing_files_=[[elem.replace('\\','/')for elem in testing_files_row]for testing_files_row in testing_files]

#Per ogni fold di test si carica l'immagine e l'etichetta associata seguendo l'indicizzazione delle fold
images_test=[]
labels_test=[]

for fold in testing_files_:
    test_fold_images=[]
    test_fold_labels=[]
    for img in fold:
        img_path = img.replace('images/', 'images/imagesprocess/')
        test_fold_images.append(io.imread(f'{img_path}'))
        test_fold_labels.append(labels_immage[img_path])
    images_test.append(test_fold_images)
    labels_test.append(test_fold_labels)


# DEFINIZIONE MODELLO 

In [None]:
#Inizializzazione del modello preaddestrato
base_model = InceptionV3(weights='imagenet', 
                                include_top=False, 
                                input_shape=(224, 224,3))

#I parametri dei layer di convoluzione non vengono riaddestrati
base_model.trainable = False

# Vengono rinizializzati i layer di classificazione in base al task specifico
x = base_model.output
x = keras.layers.Flatten()(x)
x = Dense(1024, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)
predictions = Dense(2, activation='sigmoid')(x)

# first: train only the top layers (which were randomly initialized)
# i.e. freeze all convolutional InceptionV3 layers
#for layer in base_model.layers:
#    layer.trainable = False

# Si utilizza il modello appena definito per iniziare l'addestramento
model = Model(base_model.input, predictions)
model.compile(Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

# TRAINING

In [None]:
#Una volta aver definito anche il criterio di early stopping, si addestra il modello per ogni fold di training
model_save_dir='Modello'
early_stopping = EarlyStopping(monitor='val_loss', patience=3)

for i,fold in enumerate(training_fold_files):
    
    clear_session()
    print(f"..... Start {fold} Training....")
    x_train_=np.array(training_images[i])
    x_train_ = x_train_/255.0
    y_train_ = np.array(training_labels[i])
    y_train_ = to_categorical(y_train_)
    
    #Si salva il migliore modello per ogni fold
    checkpoint_filepath = f"{model_save_dir}/best_model_fold_{i+1}.h5"
    checkpoint_callback = ModelCheckpoint(filepath=checkpoint_filepath, 
                                          monitor='val_loss', 
                                          mode='min', 
                                          save_best_only=True, 
                                          save_weights_only=False)
    
    history = model.fit(x_train_, y_train_, batch_size=32, epochs=15, callbacks=[early_stopping, checkpoint_callback], validation_split=0.2)
    
    #Salvo inoltre le prestazioni su loss e accuracy per ogni fold
    with open(f'{model_save_dir}/history_train_fold_{i+1}.pkl', 'wb') as f:
        pickle.dump(history.history, f)

In [None]:
#Per ogni fold di test vado a plottare i grafici relativi a loss e accuracy (Training e validation)
for i,fold in enumerate(training_fold_files):

    with open(f'{model_save_dir}/history_train_fold_{i+1}.pkl', 'rb') as f:
        graph = pickle.load(f)
        
    print(f"GRAPH FOR ACCURACY AND LOSS ON TRAINING AND VALIDATION SET FOLD {i+1}")

    acc = graph['accuracy']
    val_acc = graph['val_accuracy']

    loss = graph['loss']
    val_loss = graph['val_loss']

    plt.figure(figsize=(8, 8))
    plt.subplot(2, 1, 1)
    plt.plot(acc, label='Training Accuracy')   
    plt.plot(val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.ylabel('Accuracy')
    plt.ylim([min(plt.ylim()),1])
    plt.title('Training and Validation Accuracy')

    plt.subplot(2, 1, 2)
    plt.plot(loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.ylabel('Cross Entropy')
    plt.ylim([0,1.0])
    plt.title('Training and Validation Loss')
    plt.xlabel('epoch')
    plt.show()


# TESTING E VALUTAZIONE PERFORMANCE DEL MODELLO

In [None]:
#Si inizializzano due liste con gli score richiesti (f1_score e accuracy)
f_score=[]
accuracy=[]

#Per ogni fold di test carico il modello addestrato e valuto le prestazioni 
for i,fold in enumerate(testing_fold_files):
    
    my_model = keras.models.load_model(f"{model_save_dir}/best_model_fold_{i+1}.h5")
    
    clear_session()
    print(f"..... Start {fold} Testing....")
    x_test_= np.array(images_test[i])
    x_test_ = x_test_/255.0
    y_test_ = np.array(labels_test[i])
    y_test_ = to_categorical(y_test_)
    
    #Valutazione delle prestazioni del modello
    test_eval = my_model.evaluate(x_test_, y_test_, verbose=0)
    y_pred = my_model.predict(x_test_)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_test_classes = np.argmax(y_test_, axis=1) 
    f1 = f1_score(y_test_classes, y_pred_classes, average=None)
    f=np.mean(f1)
    acc = accuracy_score(y_test_classes, y_pred_classes)
    f_score.append(f)
    accuracy.append(acc)
    
    print('F1 Score:', f)
    print('Accuracy:', acc)
    print("CM:\n" + str(confusion_matrix(y_test_classes, y_pred_classes)) + "\n")
    print(classification_report(y_test_classes, y_pred_classes))

In [None]:
#Dalle liste degli score valuto le prestazioni medie su tutte le fold 
print(sum(f_score)/len(f_score))
print(sum(accuracy)/len(accuracy))