# **Elaborazione di Immagini Mediche**
## Vessel Wall Segmentation Challenge - A.A. 2021/22 

### Script di Testing

Rigazio Sofia, Roccaro Lucia, Romano Anastasio, Ruzzante Elena

Collegamento a Google Drive e installazione delle librerie necessarie

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!pip install tensorflow==2.1.0
!pip install keras==2.3.1
!pip install h5py==2.10.0 
!pip install plotly==5.3.1
!pip install pydicom

Importazione delle librerie necessarie



In [None]:
import os
import numpy as np
import plotly.express as px
import pickle
import json
import math
import pydicom
import glob

from matplotlib import pyplot as plt
from tqdm import tqdm
from skimage.io import imread, imsave
from skimage.transform import resize
from skimage.segmentation import mark_boundaries

from keras.models import load_model
from keras.utils.np_utils import to_categorical

from skimage import morphology
from skimage.measure import regionprops
import tensorflow as tf
from skimage.io import imsave

Definizione dei path e settaggio dei parametri

In [None]:
current_dir = '/content/drive/MyDrive/Colab Notebooks/EIM/VesselWallSegmentationChallenge'
dataset_name = '3D_dataset'
n_subjects = 50

carotid_types = ['ICA','ECA']
arts = ['ICAL','ECAL','ICAR','ECAR']

# Path
TRAIN_path = os.path.join(current_dir,dataset_name,'TRAIN')
VAL_path = os.path.join(current_dir,dataset_name,'VALIDATION')
TEST_path = os.path.join(current_dir,'DATASET','TEST')
RESULTS_path = os.path.join(current_dir,'RESULTS')
JSON_path = os.path.join(current_dir,'PICKLE_and_JSON')
PICKLE_ICA_path = os.path.join(JSON_path,'ICA')
PICKLE_ECA_path = os.path.join(JSON_path,'ICA')

# Cartelle
train_folders_ICA = []
train_folders_ECA = []
val_folders_ICA = []
val_folders_ECA = []
for arti in arts:
  # Training set
  for casei in os.listdir(TRAIN_path):
    cpath = os.path.join(TRAIN_path, casei, arti)
    if arti[0:3] == carotid_types[0]: # ICA
      train_folders_ICA.append(cpath)
    else: # ECA
      train_folders_ECA.append(cpath)

  # Validation set
  for casei in os.listdir(VAL_path):
    cpath = os.path.join(VAL_path, casei, arti)
    if arti[0:3] == carotid_types[0]: # ICA
      val_folders_ICA.append(cpath)
    else: # ECA
      val_folders_ECA.append(cpath)
# Test set
test_folders = [os.path.join(TEST_path,os.listdir(TEST_path)[i]) for i in range(len(os.listdir(TEST_path)))]

# Load dei dizionari contenenti le informazioni sulle immagini
with open(os.path.join(JSON_path,'labels_TRAIN.json'),"r") as f:
  labels_TRAIN = json.load(f)
  f.close
with open(os.path.join(JSON_path,'labels_VAL.json'),"r") as f:
  labels_VAL = json.load(f)
  f.close
with open(os.path.join(JSON_path,'labels_TEST.json'),"r") as f:
  labels_TEST = json.load(f)
  f.close

# Definizione dei parametri
chosen = val_folders_ICA[0].split('/')[-2] # estraiamo il nome del primo soggetto
typical_shape = labels_VAL[chosen]['original_shape']
IMG_WIDTH = typical_shape[1] # la larghezza sono le colonne -> 720
IMG_HEIGHT = typical_shape[0] # l'altezza sono le righe -> 100
NUM_CLASSES = 3 # vogliamo segmentare tre classi per ogni rete: lume, wall e background
IMG_CHANNELS = 1 # Le immagini fornite sono grayscale
# il numero di slice è variabile -> non lo definiamo

# Creazione delle cartelle che conterranno i risultati della segmentazione automatica
try:
  os.mkdir(RESULTS_path)
except:
  pass

for subject in range(1,n_subjects+1):
  if subject == 16:
    pass # abbiamo eliminato il soggetto 16 dal dataset
  else:
    name = '0_P'+str(subject)+'_U'
    try:
      os.mkdir(os.path.join(RESULTS_path,name))
    except:
      pass

## Creazione delle maschere automatiche

Load delle informazioni sulle ROI e sulle immagini da fornire alla rete

In [None]:
with open(os.path.join(PICKLE_ICA_path,'ROI_NET_informations.pickle'), 'rb') as f:
  row_ICA,col_ICA,ROI_HEIGHT_ICA,ROI_WIDTH_ICA,NET_IMG_HEIGHT,NET_IMG_WIDTH = pickle.load(f)

with open(os.path.join(PICKLE_ECA_path,'ROI_NET_informations.pickle'), 'rb') as f:
  row_ECA,col_ECA,ROI_HEIGHT_ECA,ROI_WIDTH_ECA,NET_IMG_HEIGHT,NET_IMG_WIDTH = pickle.load(f)

Load dei modelli allenati

In [None]:
net_name = 'trained_net_ICA.h5'
model_path = os.path.join(current_dir, 'Trained_Nets', net_name)
model_ICA = load_model(model_path)

net_name = 'trained_net_ECA.h5'
model_path = os.path.join(current_dir, 'Trained_Nets', net_name)
model_ECA = load_model(model_path)

Funzione che effettua l'eliminazione automatica delle biforcazioni

In [None]:
def rimozione_biforcazioni(image,wall_mask,lumen_mask):
  
  # lumen
  properties = regionprops(lumen_mask.astype(np.uint8), cache=False)
  # se le maschere sono nere, la funzione regionprops non calcola le proprietà
  if len(properties): 
    if properties[0].major_axis_length == 0:
      # se ci sono maschere con lume composto da un solo punto (l'asse 
      # maggiore in questo caso ha lunghezza pari a 0) si tratta di errori 
      # nelle segmentazioni manuali: impostiamo quindi le roundness pari a 0 e
      # l'area maggiore della soglia in modo da eliminare le maschere
      roundness_lumen = 0
      roundness_wall = 0
      area_lumen = soglia_area_lumen + 1
    else:
      area_lumen = lumen_mask.sum()
      roundness_lumen = 4*area_lumen/(math.pi*properties[0].major_axis_length**2)
      # wall
      area_wall = wall_mask.sum()
      properties = regionprops(wall_mask.astype(np.uint8), cache=False)
      roundness_wall = 4*area_wall/(math.pi*properties[0].major_axis_length**2)
  else:
    # se le maschere sono nere (quelle introdotte per il training della rete), 
    # impostiamo dei valori delle proprietà in modo che le slice non vengano 
    # eliminate
    roundness_lumen = 1
    roundness_wall = 1
    area_lumen = 0

  # rimozione della slice:
  # effettuiamo il controllo sia su wall che su lumen. se solo wall o solo 
  # lumen ha forma allungata, allora non ci troviamo in corrispondenza di una 
  # biforcazione. la forma non circolare potrebbe essere docuta ad esempio 
  # alla presenza di una placca
  if roundness_wall < soglia_wall and roundness_lumen < soglia_lumen and area_lumen > soglia_area_lumen:
    image = None

  return image

Funzione che effettua il preprocessing

In [None]:
def preprocess(image,wall_mask,lumen_mask,side,row,col):

  # flip left-right se la carotide si trova sul lato sinistro (a destra nell'immagine)
  if side == 'L':
    image = np.fliplr(image)
    if wall_mask is not None:
      wall_mask = np.fliplr(wall_mask)
      lumen_mask = np.fliplr(lumen_mask)

  # consideriamo solo la metà sinistra dell'immagine (che contiene ICAL/ECAL flippate o ICAR/ECAR)
  half_width = int(image.shape[1]/2)
  image = image[:,0:half_width]
  if wall_mask is not None:
    wall_mask = wall_mask[:,0:half_width]
    lumen_mask = lumen_mask[:,0:half_width]

  # controllo sulle dimensioni delle immagini
  if image.shape != (IMG_HEIGHT,int(IMG_WIDTH/2)): #(100,360)
    # effettuiamo un resize with pad per mantenere l'aspect ratio delle immagini 
    # e delle maschere
    image = np.expand_dims(image,axis = 2)
    image = np.squeeze(tf.image.resize_with_pad(image,IMG_HEIGHT,int(IMG_WIDTH/2), antialias=True).numpy())
    
    # Spostiamo le colonne aggiunte con lo zero padding a sinistra dell'immagine
    # con lo zero padding vengono inserite 80 colonne nere per lato
    to_shift = 80
    image = np.roll(image, to_shift, axis = 1)
     
    if wall_mask is not None: # resize delle maschere in Training e Validation Set
      # Conversione della maschera in dato categorico
      mask = to_categorical(wall_mask+lumen_mask, num_classes=NUM_CLASSES, dtype='float32')
      mask = tf.image.resize_with_pad(mask,IMG_HEIGHT,int(IMG_WIDTH/2), antialias=True).numpy() 
        # reimpostiamo i valori dei pixel coinvolti nel resize a 0 e 1: i pixel ai 
      # bordi delle segmentazioni potrebbero essersi sfumati
      soglia = 0.5
      mask[mask < soglia] = 0.0
      mask[mask >= soglia] = 1.0
      
      mask = np.roll(mask, to_shift, axis = 1)
      wall_mask = mask[:,:,1]
      lumen_mask = mask[:,:,2]
  # rimozione delle biforcazioni in Training e Validation Set
  if wall_mask is not None:
    image = rimozione_biforcazioni(image,wall_mask,lumen_mask)
   
  if image is not None: # se la slice non è stata rimossa (biforcazione)
    # estrazione della ROI
    image = image[row[0]:row[1]+1,col[0]:col[1]+1]
    # preparazione dell'immagine per la rete
    image = np.expand_dims(image, axis = [0,3]) # da dimensione (x,x) a (1,x,x,1)
    image = tf.image.pad_to_bounding_box(image,0,0,NET_IMG_HEIGHT,NET_IMG_WIDTH).numpy() # .numpy() perché dopo lo zero-padding img_slice è diventata un tensore -> serve riconvertirla in array!

  return image

Funzione che effettua il post processing

In [None]:
def postprocess(wall_mask,lumen_mask,side,original_shape,to_shift,row,col,ROI_HEIGHT,ROI_WIDTH):

  # rimozione del bordo nero aggiunto con zero-padding durante il preprocessing
  wall_mask_ROI = wall_mask[0:ROI_HEIGHT,0:ROI_WIDTH]
  lumen_mask_ROI = lumen_mask[0:ROI_HEIGHT,0:ROI_WIDTH]

  if original_shape != [IMG_HEIGHT,IMG_WIDTH]:
    # riposizionamento della maschera alle giuste coordinate rispetto alla metà 
    # sinistra dell'immagine
    wall_mask_half = np.zeros((IMG_HEIGHT,int(IMG_WIDTH/2)), dtype = np.float32) # 100 360
    lumen_mask_half = np.zeros((IMG_HEIGHT,int(IMG_WIDTH/2)), dtype = np.float32)
    # posizioniamo la maschera alle giuste coordinate
    wall_mask_half[row[0]:row[1]+1,col[0]:col[1]+1] = wall_mask_ROI
    lumen_mask_half[row[0]:row[1]+1,col[0]:col[1]+1] = lumen_mask_ROI
    # rimozione delle colonne nere (zero padding) aggiunte durante il resize in 
    # fase di preprocessing per mantenere l'aspect ratio
    black_cols = 80
    wall_mask_half = wall_mask_half[:,black_cols*2+1:]
    lumen_mask_half = lumen_mask_half[:,black_cols*2+1:]
    # effettuiamo il resize per riportare la mezza immagine alle sue dimensioni
    # originali (160*320)
    wall_mask_half = resize(wall_mask_half,(original_shape[0],int(original_shape[1]/2)), preserve_range = True, anti_aliasing = True)
    lumen_mask_half = resize(lumen_mask_half,(original_shape[0],int(original_shape[1]/2)), preserve_range = True, anti_aliasing = True)

    # riposizionamento della maschera alle giuste coordinate rispetto all'immagine intera 
    wall_mask = np.zeros((original_shape[0],original_shape[1]), dtype = np.float32) # 160 640
    lumen_mask = np.zeros((original_shape[0],original_shape[1]), dtype = np.float32)
    wall_mask[:,0:int(original_shape[1]/2)] = wall_mask_half
    lumen_mask[:,0:int(original_shape[1]/2)] = lumen_mask_half
    
    # reimpostiamo i valori dei pixel coinvolti nel resize a 0 e 1: i pixel ai 
    # bordi delle segmentazioni potrebbero essersi sfumati
    soglia = 0.5
    lumen_mask[lumen_mask < soglia] = 0
    lumen_mask[lumen_mask >= soglia] = 1
    wall_mask[wall_mask < soglia] = 0
    wall_mask[wall_mask >= soglia] = 1
  else:
    # riposizionamento della maschera alle giuste coordinate rispetto all'immagine intera
    wall_mask = np.zeros((IMG_HEIGHT,IMG_WIDTH), dtype = np.float32)
    lumen_mask = np.zeros((IMG_HEIGHT,IMG_WIDTH), dtype = np.float32)
    # posizioniamo la maschera nel lato sinistro dell'immagine
    wall_mask[row[0]:row[1]+1,col[0]:col[1]+1] = wall_mask_ROI
    lumen_mask[row[0]:row[1]+1,col[0]:col[1]+1] = lumen_mask_ROI
  
  if side == 'L':
    # se la maschera è del lato sinistro (destra nell'immagine originale) la 
    # flippiamo left-right
    wall_mask = np.fliplr(wall_mask)
    lumen_mask = np.fliplr(lumen_mask)

  # Rimozione di piccoli oggetti e piccoli buchi. Le soglie sono state 
  # identificate mediante l'osservazione della minima area di wall e lumen delle
  # al termine dello script C
  soglia_holes = 10
  soglia_obj_wall = 75
  soglia_obj_lumen = 40
  wall_mask = morphology.remove_small_holes(wall_mask.astype(bool), area_threshold=soglia_holes)
  wall_mask = morphology.remove_small_objects(wall_mask, min_size=soglia_obj_wall).astype(np.float32)
  lumen_mask = morphology.remove_small_holes(lumen_mask.astype(bool), area_threshold=soglia_holes)
  lumen_mask = morphology.remove_small_objects(lumen_mask, min_size=soglia_obj_lumen).astype(np.float32)
  
  # Shift sulle colonne delle maschere nella giusta posizione (operazione 
  # inversa a quella fatta nello script A_ReadCASCADEcontours per centrare il 
  # collo del soggetto nell'immagine)
  # NOTA: qui axis=1 perché in questo caso le maschere sono delle singole slice,
  # nello script A invece era axis=2 perché lavoravamo con volumi 3D
  wall_mask = np.roll(wall_mask, -to_shift, axis=1)
  lumen_mask = np.roll(lumen_mask, -to_shift, axis=1)
  
  return wall_mask, lumen_mask

Funzione per leggere i file DICOM

In [None]:
def readDicom(path):
  pi = os.path.basename(path).split('_')[1]
  dcm_size = len(glob.glob(path+'/*.dcm'))
  dcms = []
  dicom_slicei = 0
  for n_slicei in range(1,dcm_size+1):
    dicom_slicei = dicom_slicei+1
    path_dicom = path+'/E'+pi+'S101I%d.dcm'%dicom_slicei
    while os.path.exists(path_dicom) == False:
      dicom_slicei = dicom_slicei+1
      path_dicom = path+'/E'+pi+'S101I%d.dcm'%dicom_slicei
    dcms.append(path_dicom)

  dcm_f = pydicom.read_file(dcms[0]).pixel_array
  dcm_size = max(dcm_f.shape)

  cdcm1 = pydicom.read_file(dcms[0]).pixel_array

  cdcm_img = np.zeros((len(dcms),cdcm1.shape[0],cdcm1.shape[1]),dtype=np.uint8)
  for dcmi in range(len(dcms)):
    cdcm_img[dcmi,:,:] = pydicom.read_file(dcms[dcmi]).pixel_array

  # nomi delle immagini
  original_names = [dcms[i].split('/')[-1] for i in range(len(dcms))]
  original_names = [original_names[i].split('.')[0] for i in range(len(original_names))]

  return cdcm_img, original_names

Creazione delle maschere automatiche

In [None]:
def net_prediction(list_folders,set_,labels,arti,model,row,col,ROI_HEIGHT,ROI_WIDTH):
  for folder in tqdm(list_folders,total = len(list_folders)):
    
    if os.path.exists(folder) == True:
      if set_ == 'TEST':
        
        v_subj = folder.split('/')[-1] # ID soggetto
        # load dei dati
        image, original_name = readDicom(folder)
        
        # shift dell'immagine lungo le colonne
        to_shift = labels[v_subj]['shift']
        image = np.roll(image, to_shift, axis=2)
        
      else: 
        # load dei dati
        image = imread(os.path.join(folder,'image.tiff'))
        # carichiamo anche le maschere per eliminare le slice con biforcazioni 
        wall_mask = imread(os.path.join(folder,'wall_mask.tiff'))
        lumen_mask = imread(os.path.join(folder,'lumen_mask.tiff'))
        v_subj = folder.split('/')[-2] # ID soggetto
        arti = folder.split('/')[-1] # ICAL, ICAR, ECAL, ECAR
        # lista di nomi dei file originali
        original_name = labels[v_subj][arti]['name']


      # dimensione delle immagini originali
      original_shape = labels[v_subj]['original_shape']
      
      # consideriamo le slice separatamente
      slice_counter = 0 # contatore del numero effettivo di slice (escludendo le biforcazioni)
      for i in range(image.shape[0]):
        img_slice = image[i]
        
        # preprocessing
        if set_ == 'TEST':
          img_slice = preprocess(img_slice,None,None,arti[-1],row,col)
        else:
          img_slice = preprocess(img_slice,wall_mask[i],lumen_mask[i],arti[-1],row,col)
        # se la slice non è stata eliminata nel preprocessing procediamo 
        if img_slice is not None: 
          
          slice_counter = slice_counter+1 # incrementiamo il contatore delle slice effettive
          # predizione della maschera tramite la rete
          softmax = model.predict(img_slice)
          softmax = np.squeeze(softmax) # rimuoviamo la prima e la quarta dimensione (pari a 1)
          
          pred_lumen_mask = softmax[:,:,2] # 2 è solo lume
          pred_wall_mask = softmax[:,:,1] + softmax[:,:,2] # 1 è solo parete senza lume

          # scegliamo soglia pari a 0.5 in modo da massimizzare la distanza tra i valori attesi (0 e 1)
          soglia = 0.5
          pred_lumen_mask[pred_lumen_mask < soglia] = 0
          pred_lumen_mask[pred_lumen_mask >= soglia] = 1
          pred_wall_mask[pred_wall_mask < soglia] = 0
          pred_wall_mask[pred_wall_mask >= soglia] = 1

          # post processing
          pred_wall_mask,pred_lumen_mask = postprocess(pred_wall_mask,pred_lumen_mask,arti[-1],original_shape,labels[v_subj]['shift'],row,col,ROI_HEIGHT,ROI_WIDTH)
          
          # salvataggio delle maschere automatiche: effettuiamo un controllo in 
          # quanto le maschere nere non devono essere salvate
          name = original_name[slice_counter-1] + '_' + arti
          if pred_wall_mask.sum() > 0 and pred_lumen_mask.sum() > 0:
            imsave(os.path.join(RESULTS_path,v_subj,name+'_wall.png'), 255*pred_wall_mask.astype(np.uint8),check_contrast=False)   
            imsave(os.path.join(RESULTS_path,v_subj,name+'_lume.png'), 255*pred_lumen_mask.astype(np.uint8),check_contrast=False)


In [None]:
# soglie per la rimozione delle biforcazioni (individuate nello script B)
soglia_wall = 0.7
soglia_lumen = 0.65
soglia_area_lumen = 245

for carotid_type in carotid_types:

  row = eval(f'row_{carotid_type}')
  col = eval(f'col_{carotid_type}')
  ROI_HEIGHT = eval(f'ROI_HEIGHT_{carotid_type}')
  ROI_WIDTH = eval(f'ROI_WIDTH_{carotid_type}')

  print(f'Net prediction - {carotid_type}')
  # Training Set
  #print('Training set')
  #net_prediction(eval(f'train_folders_{carotid_type}'), 'TRAIN', labels_TRAIN, None,eval(f'model_{carotid_type}'),row,col,ROI_HEIGHT,ROI_WIDTH)

  # Validaion Set
  #print('Validation set')
  #net_prediction(eval(f'val_folders_{carotid_type}'), 'VALIDATION', labels_VAL, None,eval(f'model_{carotid_type}'),row,col,ROI_HEIGHT,ROI_WIDTH)

  # Test Set
  print('Test set - Left side')
  net_prediction(test_folders, 'TEST', labels_TEST, carotid_type+'L',eval(f'model_{carotid_type}'),row,col,ROI_HEIGHT,ROI_WIDTH)
  print('Test set - Right side')
  net_prediction(test_folders, 'TEST', labels_TEST, carotid_type+'R',eval(f'model_{carotid_type}'),row,col,ROI_HEIGHT,ROI_WIDTH)
  print()

## Valutazione delle performance

Funzione per il calcolo del coefficiente di Dice Similarity (DSC)

In [None]:
def calculate_DSC(Y_manual, Y_auto):
  n_images = Y_manual.shape[0]
  DSC = np.zeros(n_images, dtype = np.float64)
  
  for i in range(n_images):
    x = Y_manual[i] # manual mask
    y = Y_auto[i] # automatic mask

    x_int_y = np.logical_and(x,y)
    
    num = 2.*x_int_y.sum()
    den = x.sum()+y.sum()
    
    # se entrambe le maschere sono nere 
    if num == 0 and den == 0: 
      DSC[i] = np.nan # da rimuovere in seguito
    else:
    # Coefficiente di Dice Similarity
      DSC[i] = num/den

  # eliminiamo i NaN dal vettore
  DSC = DSC[~np.isnan(DSC)] 

  return DSC

Funzione per il calcolo della Relative Volume Difference (RVD)

In [None]:
def calculate_RVD(Y_manual, Y_auto):
  
  # inizializzazione dei contatori
  n_correct_black_mask = 0 # contatore per le maschere che devono essere nere e sono nere
  n_black_where_segmented = 0 # contatore per le maschere che devono segmentare e invece sono nere
  n_segmented_where_black = 0 # contatore per le maschere che devono essere nere e invece c'è segmentazione
  
  n_images = Y_manual.shape[0]
  RVD = np.zeros(n_images, dtype = np.float64)
  
  for i in range(n_images):
    x = Y_manual[i] # manual mask
    y = Y_auto[i] # automatic mask

    # Relative volume difference
    num = y.sum()-x.sum()
    den = x.sum()
    # se le maschere sono uguali e sono entrambe nere
    if den == 0 and num == 0: 
      RVD[i] = np.nan  
      n_correct_black_mask += 1 # maschere che devono essere nera e sono nere
    # se vi sono delle segmentazioni "in eccesso" nelle maschere auto rispetto alle manuali, che invece sono nere
    elif den == 0 and num > 0:   
      RVD[i] = np.nan
      n_segmented_where_black += 1 # maschere che devono essere nere e invece c'è segmentazione
    elif y.sum() == 0 and num != 0: # in questo caso avremo RVD minore di zero. 
      RVD[i] = num/den # -1
      n_black_where_segmented += 1 # maschera è nera e invece doveva segmentare
    else:
      # Relative Volume Difference
      RVD[i] = num/den

  # creiamo un vettore contenente i contatori per le maschere nere
  black_counter = [n_correct_black_mask, n_segmented_where_black, n_black_where_segmented] 
  # eliminiamo i NaN dal vettore
  RVD = RVD[~np.isnan(RVD)] 

  return RVD, black_counter

Funzione per la visualizzazione dei grafici delle metriche di valutazione

In [None]:
def show_metrics(DSC, RVD_wall, RVD_lumen, black_counter_wall, black_counter_lumen, set_, side):
  
  # Grafici
  fig = plt.figure(figsize=(15, 5))
  fig.suptitle(f'{set_} set - {side} side')
  ax1 = plt.subplot(1, 3, 1)
  ax1.scatter(range(1,DSC.shape[0]+1),DSC)
  ax1.set_title('DSC')

  ax2 = plt.subplot(1, 3, 2)
  ax2.scatter(range(1,RVD_wall.shape[0]+1),RVD_wall)
  ax2.set_title('RVD wall')

  ax3 = plt.subplot(1, 3, 3)
  ax3.scatter(range(1,RVD_lumen.shape[0]+1),RVD_lumen)
  ax3.set_title('RVD lumen')
  fig.show()

  # Valori medi
  print(f'{set_} set - {side} side:')
  print("\tDSC (media ± std) = {:.4f} ± {:.4f}".format(np.mean(DSC),np.std(DSC))) 
  print("\tRVD del wall (media ± std) = {:.4f} ± {:.4f}".format(np.mean(RVD_wall),np.std(RVD_wall)))
  print("\tRVD del lume (media ± std) = {:.4f} ± {:.4f}".format(np.mean(RVD_lumen),np.std(RVD_lumen)))
  # Contatori maschere nere
  print(f'Numero di maschere correttamente nere:\n\t wall = {black_counter_wall[0]}\n\t lumen = {black_counter_lumen[0]}')
  print(f'Numero di maschere erroneamente nere:\n\t wall = {black_counter_wall[1]}\n\t lumen = {black_counter_lumen[1]}')
  print(f'Numero di maschere erroneamente segmentate:\n\t wall = {black_counter_wall[2]}\n\t lumen = {black_counter_lumen[2]}\n')
  

Calcolo delle metriche e visualizzazione dei risultati

In [None]:
def performance_evaluation(labels, set_path, side, subject_to_visualize):
  if side == 'L':
    arts = ['ICAL','ECAL']
  elif side == 'R':
    arts = ['ICAR','ECAR']

  # Inizializzazioni
  DSC = []
  RVD_wall = []
  RVD_lumen = []
  black_counter_wall = np.array([0,0,0])
  black_counter_lumen = np.array([0,0,0])
  
  for v_subj in tqdm(list(labels.keys()), total = len(list(labels.keys()))):

    # dimensioni originali delle slice del soggetto
    original_shape = labels[v_subj]['original_shape']
    # cartella che contiene le segmentazioni automatiche del soggetto
    results_folder = os.path.join(RESULTS_path,v_subj)

    # estrazione dal dizionario del parametro per lo shift
    to_shift = labels[v_subj]['shift']

    for arti in arts:
      
      # numero di slice che abbiamo utilizzato per l'allenamento della rete
      # -> quelle su cui calcolare le metriche di valutazione
      slice_number = len(labels[v_subj][arti]['number'])

      # inizializzazione delle matrici
      X = np.zeros((slice_number,original_shape[0],original_shape[1]),dtype = np.uint8)
      Y = np.zeros((slice_number,original_shape[0],original_shape[1],2),dtype = np.float32)
      Y_auto = np.zeros((slice_number,original_shape[0],original_shape[1],2),dtype = np.float32)

      # load del volume e delle maschere manuali
      path = os.path.join(set_path, v_subj, arti)
      if os.path.exists(path):
        image = imread(os.path.join(path, 'image.tiff'))
        wall_mask = imread(os.path.join(path, 'wall_mask.tiff'))
        lumen_mask = imread(os.path.join(path, 'lumen_mask.tiff'))
        
        # nomi di tutte le slice contenute in 'image'
        slice_list_original = labels[v_subj][arti+'_original']['name']
        # nomi delle slice che abbiamo utilizzato per l'allenamento della rete
        # -> quelle su cui calcolare le metriche di valutazione
        slice_list = labels[v_subj][arti]['name']
        
        for i, slice_name in enumerate(slice_list):
          # troviamo l'indice in slice_list_original (il numero di slice in image)
          index = slice_list_original.index(slice_name)
          # inseriamo la slice e le maschere manuali nelle matrici
          X[i] = image[index]
          Y[i,:,:,0] = wall_mask[index]
          Y[i,:,:,1] = lumen_mask[index]
          # load delle maschere automatiche
          # proviamo il caricamento delle maschere automatiche. se non sono presenti 
          # significa che la maschere sono nere e quindi non sono state salvate
          try:
            Y_auto[i,:,:,0] = imread(os.path.join(results_folder, slice_name+'_'+arti+'_wall.png')).astype(np.float32)/255.0
            Y_auto[i,:,:,1] = imread(os.path.join(results_folder, slice_name+'_'+arti+'_lume.png')).astype(np.float32)/255.0
          except:
            Y_auto[i,:,:,0] = np.zeros((original_shape[0],original_shape[1]),dtype = np.float32)
            Y_auto[i,:,:,1] = np.zeros((original_shape[0],original_shape[1]),dtype = np.float32)  

        # prima di calcolare le metriche di valutazione delle performance, 
        # shiftiamo le immagini e le maschere manuali in modo da riportarle alla
        # loro posizione originale (sono salvate in .tiff nella cartella 
        # 3D_dataset già shiftate -> script A)
        X = np.roll(X, -to_shift, axis=2)
        Y = np.roll(Y, -to_shift, axis=2)
        # NOTA: non è necessario shiftare anche Y_auto in quanto le maschere 
        # automatiche sono salvate nella cartella RESULTS in .png in posizione 
        # corrispondente alla posizione originale delle slice DICOM

        # calcoliamo le metriche di valutazione
        DSC = np.append(DSC,calculate_DSC(Y[:,:,:,0]-Y[:,:,:,1], Y_auto[:,:,:,0]-Y_auto[:,:,:,1]))
        # Wall
        RVD, black_counter = calculate_RVD(Y[:,:,:,0], Y_auto[:,:,:,0])
        RVD_wall = np.append(RVD_wall,RVD)
        black_counter_wall = black_counter_wall + black_counter
        # Lumen
        RVD, black_counter = calculate_RVD(Y[:,:,:,1], Y_auto[:,:,:,1])
        RVD_lumen = np.append(RVD_lumen, RVD)
        black_counter_lumen = black_counter_lumen + black_counter

        # se il nome del soggetto corrisponde a subject_to_visualize, 
        # visualizzazione dei risultati
        if v_subj == subject_to_visualize:
          Y_vis = np.zeros((slice_number,original_shape[0],original_shape[1],3),dtype = np.float32)
          for n_slice in range(slice_number):
            mask = Y[n_slice,:,:,0]-Y[n_slice,:,:,1]
            mask_auto = Y_auto[n_slice,:,:,0]-Y_auto[n_slice,:,:,1]
            Y_vis[n_slice] = mark_boundaries(mark_boundaries(X[n_slice].astype(np.float32),mask.astype(np.uint8),color=[0,255,0]),mask_auto.astype(np.uint8),color=[255,0,0])

          fig = px.imshow(Y_vis, animation_frame=0, binary_string=True, labels=dict(animation_frame="slice"))
          for i, frame in enumerate(fig.frames):
            frame.layout.title = f'Soggetto: {v_subj} - Carotide: {arti} - Nome slice: {slice_list[i]}'
          fig.show()
     
  return DSC, RVD_wall, RVD_lumen, black_counter_wall, black_counter_lumen

In [None]:
# inserire l'ID di un soggetto appartenente al Validation set (ad es '0_P19_U')
# per visualizzarne i risultati
subject_to_visualize = None 

# Validation Set
print(f'Evaluating performances: Validation set')
print('\nLeft side')
DSC_L_val, RVD_wall_L_val, RVD_lumen_L_val, black_counter_wall_L_val, black_counter_lumen_L_val = performance_evaluation(labels_VAL,VAL_path,'L',subject_to_visualize)
print('Right side')
DSC_R_val, RVD_wall_R_val, RVD_lumen_R_val, black_counter_wall_R_val, black_counter_lumen_R_val = performance_evaluation(labels_VAL,VAL_path,'R',subject_to_visualize)
print()

# Left side
show_metrics(DSC_L_val, RVD_wall_L_val, RVD_lumen_L_val, black_counter_wall_L_val, black_counter_lumen_L_val,'Validation','Left')
# Right side
show_metrics(DSC_R_val, RVD_wall_R_val, RVD_lumen_R_val, black_counter_wall_R_val, black_counter_lumen_R_val,'Validation','Right')

In [None]:
# inserire l'ID di un soggetto appartenente al Training set (ad es '0_P34_U')
# per visualizzarne i risultati
subject_to_visualize = None 

# Training Set
print(f'Evaluating performances: Training set')
print('\nLeft side')
DSC_L_train, RVD_wall_L_train, RVD_lumen_L_train, black_counter_wall_L_train, black_counter_lumen_L_train = performance_evaluation(labels_TRAIN,TRAIN_path,'L',subject_to_visualize)
print('Right side')
DSC_R_train, RVD_wall_R_train, RVD_lumen_R_train, black_counter_wall_R_train, black_counter_lumen_R_train = performance_evaluation(labels_TRAIN,TRAIN_path,'R',subject_to_visualize)
print()

# Left side
show_metrics(DSC_L_train, RVD_wall_L_train, RVD_lumen_L_train, black_counter_wall_L_train, black_counter_lumen_L_train, 'Training','Left')
# Right side
show_metrics(DSC_R_train, RVD_wall_R_train, RVD_lumen_R_train, black_counter_wall_R_train, black_counter_lumen_R_train, 'Training','Right')

Salvataggio dei coefficienti calcolati

In [None]:
with open(os.path.join(JSON_path,'DSC_RVD_train.pickle'), 'wb') as f:
    pickle.dump([DSC_L_train, RVD_wall_L_train, RVD_lumen_L_train,DSC_R_train, RVD_wall_R_train, RVD_lumen_R_train], f)

with open(os.path.join(JSON_path,'DSC_RVD_validation.pickle'), 'wb') as f:
    pickle.dump([DSC_L_val, RVD_wall_L_val, RVD_lumen_L_val,DSC_R_val, RVD_wall_R_val, RVD_lumen_R_val], f)

Load dei coefficienti calcolati

In [None]:
with open(os.path.join(JSON_path,'DSC_RVD_train.pickle'), 'rb') as f:
  DSC_L_train, RVD_wall_L_train, RVD_lumen_L_train,DSC_R_train, RVD_wall_R_train, RVD_lumen_R_train = pickle.load(f)

with open(os.path.join(JSON_path,'DSC_RVD_validation.pickle'), 'rb') as f:
  DSC_L_val, RVD_wall_L_val, RVD_lumen_L_val,DSC_R_val, RVD_wall_R_val, RVD_lumen_R_val = pickle.load(f)

Box plot delle metriche

In [None]:
fig = plt.figure(figsize=(15, 5))
fig.suptitle('Carotidi interne (ICA) ed esterne (ECA)')
ax1 = plt.subplot(1, 3, 1)
ax1.boxplot(np.array([DSC_L_val,DSC_R_val,DSC_L_train,DSC_R_train],dtype = "object"), showfliers=False)
plt.xticks([1, 2, 3, 4], ['Val L', 'Val R', 'Train L', 'Train R'])
ax1.set_title('DSC')

ax2 = plt.subplot(1, 3, 2)
ax2.boxplot(np.array([RVD_wall_L_val,RVD_wall_R_val,RVD_wall_L_train,RVD_wall_R_train],dtype = "object"), showfliers=False)
plt.xticks([1, 2, 3, 4], ['Val L', 'Val R', 'Train L', 'Train R'])
ax2.set_title('RVD wall')

ax3 = plt.subplot(1, 3, 3)
ax3.boxplot(np.array([RVD_lumen_L_val,RVD_lumen_R_val,RVD_lumen_L_train,RVD_lumen_R_train],dtype = "object"), showfliers=False)
plt.xticks([1, 2, 3, 4], ['Val L', 'Val R', 'Train L', 'Train R'])
ax3.set_title('RVD lumen')
fig.show()

Visualizzazione dei risultati su un soggetto appartenente al Test set

In [None]:
# inserire l'ID di un soggetto appartenente al Test set (ad es '0_P43_U') per
# visualizzarne i risultati
v_subj = list(labels_TEST.keys())[0]

# dimensioni originali delle slice del soggetto
original_shape = labels_TEST[v_subj]['original_shape']
# cartella che contiene le segmentazioni automatiche del soggetto
results_folder = os.path.join(RESULTS_path,v_subj)

# load del volume
image, slice_list = readDicom(os.path.join(TEST_path,v_subj))

# Visualizziamo i risultati di una slice ogni 10 per il Test set
kept_slices = range(0,image.shape[0]-1,10)
image = image[kept_slices]
slice_list = [slice_list[i] for i in kept_slices]

# numero di slice disponibili
slice_number = image.shape[0]

for arti in arts:

  # inizializzazione della matrice che conterrà le maschere automatiche
  Y_auto = np.zeros((slice_number,original_shape[0],original_shape[1],2),dtype = np.uint8)

  for i, slice_name in enumerate(slice_list):
    # load delle maschere automatiche
    # proviamo il caricamento delle maschere automatiche. se non sono presenti 
    # significa che la maschere sono nere e quindi non sono state salvate
    try:
      Y_auto[i,:,:,0] = imread(os.path.join(results_folder, slice_name+'_'+arti+'_wall.png'))
      Y_auto[i,:,:,1] = imread(os.path.join(results_folder, slice_name+'_'+arti+'_lume.png'))
    except:  
      Y_auto[i,:,:,0] = np.zeros((original_shape[0],original_shape[1]),dtype = np.uint8)  
      Y_auto[i,:,:,1] = np.zeros((original_shape[0],original_shape[1]),dtype = np.uint8)
      
  # visualizzazione dei risultati
  Y_vis = np.zeros((slice_number,original_shape[0],original_shape[1],3),dtype = np.float32)
  for n_slice in range(slice_number):
    mask_auto = Y_auto[n_slice,:,:,0]-Y_auto[n_slice,:,:,1]
    Y_vis[n_slice] = mark_boundaries(image[n_slice].astype(np.float32),mask_auto.astype(np.uint8),color=[255,0,0])

  fig = px.imshow(Y_vis, animation_frame=0, binary_string=True, labels=dict(animation_frame="slice"))
  for i, frame in enumerate(fig.frames):
    frame.layout.title = f'Soggetto: {v_subj} - Carotide: {arti} - Nome slice: {slice_list[i]}'
  fig.show()