# Multi-scale Dilation with Residual Fused Attention for Low Dose CT Noise Artifact (MD-RFA)

This notebook has three sections:
* **Section 1:** Data Processing of DICOM files.
* **Section 2:** Building MD-RFA model architecture.
* **Section 3:** Training Network and Evaluating.

Let's begin with installing any necesssary libraries and importing our libraries for preprocessing and training of network.

I have acquired my data from my supervisor and can share it upon request, however for now I will import my datasets to the Jupyter working directory.

In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [2]:
!mkdir data/
!mkdir data/processed
!mkdir model
!mkdir model/weights/
!cp -r /gdrive/MyDrive/datasets/ datasets/

In [None]:
!pip install pydicom
!pip install keras

In [4]:
import tensorflow as tf
import numpy as np # linear algebra
import pydicom as dicom
import os
import gc
import scipy.ndimage
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing import image
import h5py

# Model Components
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Multiply, Conv2DTranspose
from tensorflow.keras.layers import Activation, AveragePooling2D
from tensorflow.keras.layers import Add, concatenate, Input, subtract
from tensorflow.keras.layers import BatchNormalization as BN

# For model training and testing
import multiprocessing
from tensorflow.keras.optimizers import Adam
from keras import losses
from keras.losses import MeanSquaredError
import math
from keras import backend as K
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.efficientnet import EfficientNetB2
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras.applications.resnet_v2 import ResNet50V2
from keras.models import Model
from skimage.metrics import structural_similarity as compare_ssim

SEED= 42
np.random.seed(0)
tf.random.set_seed(SEED)

device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


## **Section 1:** Data Processing of DICOM files

* Let's build our functions to process the DICOM files.

In [5]:
# Load the scans in given folder path
def load_scan(path):
    slices = [dicom.read_file(path + '/' + s) for s in os.listdir(path)]
    slices.sort(key = lambda x: float(x.ImagePositionPatient[2]))
    try:
        #distance between slices, finds slice tkickness if not availabe
        slice_thickness = np.abs(slices[0].ImagePositionPatient[2] - slices[1].ImagePositionPatient[2])
    except:
        slice_thickness = np.abs(slices[0].SliceLocation - slices[1].SliceLocation)
        
    for s in slices:
        s.SliceThickness = slice_thickness
        
    return slices

# Taking Housenfield Unit into consideration
def get_pixels_hu(slices):
    # read the dicom images, find HU numbers (padding, intercept, rescale), and make a 4-D array, 
    # HU - Hounsfield Unit (HU): measure of radiodensity

    image = np.stack([s.pixel_array for s in slices])
    # Convert to int16 (from sometimes int16), 
    # should be possible as values should always be low enough (<32k)
    image = image.astype(np.int16)

    # Set outside-of-scan pixels to 0
    # The intercept is usually -1024, so air is approximately 0
    try:
        padding = slices[0].PixelPaddingValue
    except:
        padding = 0
    
    image[image == padding] = 0
    
    # Convert to Hounsfield units (HU)
    for slice_number in range(len(slices)):
        
        intercept = slices[slice_number].RescaleIntercept
        slope = slices[slice_number].RescaleSlope
        
        if slope != 1:
            image[slice_number] = slope * image[slice_number].astype(np.float64)
            image[slice_number] = image[slice_number].astype(np.int16)
            
        image[slice_number] += np.int16(intercept)
        
    return np.array(image, dtype=np.int16)

* Next we want to extract patches for faster training and reduces computation.

In [6]:
# This function extracts patches from the larger image
def extract_patches(image, patch_size=40,stride=20):
    
    images_num,h,w = image.shape
    out = np.empty((0,patch_size,patch_size))
    sz = image.itemsize
    shape = ((h-patch_size)//stride+1, (w-patch_size)//stride+1, patch_size,patch_size)
    strides = sz*np.array([w*stride,stride,w,1])

    for d in range (0,images_num):
        patches=np.lib.stride_tricks.as_strided(image[d,:,:], shape=shape, strides=strides)
        blocks=patches.reshape(-1,patch_size,patch_size)
        out=np.concatenate((out,blocks[:,:,:]))
        #print(d)
    
    return out[:,:,:]

* Last we want to save our data in `.h5` format.

In [7]:
def write_hdf5(data,labels, output_filename):
    """
    This function is used to save image data and its label(s) to hdf5 file.
    output_file.h5,contain data and label
    """

    with h5py.File(output_filename, 'w') as h:
        h.create_dataset('data', data=data, shape=data.shape)        
        h.create_dataset('label', data=labels, shape=labels.shape)
        h.close()
 

def read_hdf5(file):
    with h5py.File(file, 'r') as hf:
        data = np.array(hf.get('data'))
        labels = np.array(hf.get('label'))
   
        return data,labels 

* Here the required folders and file names are saved into an array for automating the data processing.

In [8]:
# Setting paths for all data sets and for saving files
low_dose_files=['datasets/Thoracic/Copies/Low Dose', 'datasets/Piglet/Paired/Low', 
                  'datasets/Head - TCIA - Subject N012/Low Dose Images','datasets/Chest - TCIA - Subject C002/Low Dose Images',
                  'datasets/Abdomen - TCIA - Subject L058/Low Dose Images']

full_dose_files=['datasets/Thoracic/Copies/High Dose', 'datasets/Piglet/Paired/High', 
                  'datasets/Head - TCIA - Subject N012/Full Dose Images','datasets/Chest - TCIA - Subject C002/Full Dose Images',
                  'datasets/Abdomen - TCIA - Subject L058/Full Dose Images']

patch_size = '32_32'
dataset_type = ['thoracic', 'piglet', 'head_N012', 'chest_C002', 'abdomen_L058']
train_data_path = ['data/processed/train_{}_{}_dicom_0.h5'.format(patch_size, type_) for type_ in dataset_type]
test_data_path = ['data/processed/test_{}_{}_dicom_0.h5'.format(patch_size, type_) for type_ in dataset_type]

                  ###############################################
folder = ['Thoracic', 'Piglet', 'Head - TCIA - Subject N012', 'Chest - TCIA - Subject C002', 'Abdomen - TCIA - Subject L058']
train_data_files = ['datasets/{}/train_{}_{}_dicom_0.h5'.format(folder[i], patch_size, dataset_type[i]) for i in range(len(folder))]
test_data_files = ['datasets/{}/test_{}_{}_dicom_0.h5'.format(folder[i], patch_size, dataset_type[i]) for i in range(len(folder))]

We then do the following:
* Load dataset
* Process DICOM data in terms of HU and convert to numpy array.
* Split data to 70% train and 30% test.
* Extract patches for patch training.

* The below function automates the process of saving pocessed data to your google drive directory.
* It also saves to local directory under ``data/processed``

In [9]:
drive_file = ['Thoracic', 'Piglet', 'Head - TCIA - Subject N012', 'Chest - TCIA - Subject C002', 'Abdomen - TCIA - Subject L058']
def process_data(path_low, path_high, train_data_path, test_data_path, patch_size, drive_file):
  for i in range(len(path_high)):
    # Loading dicom files from Piglet dataset 
    slices_data_train = load_scan(path_low[i])
    slices_labels_train = load_scan(path_high[i])

    #callibrating in terms of HU
    data = get_pixels_hu(slices_data_train)+1024 
    labels = get_pixels_hu(slices_labels_train)+1024

    # Dividing into train (70%) and test(30%) datasets 
    n = len(data)*7//10
    data_train ,data_test = data[:n],data[n:]
    labels_train,labels_test = labels[:n],labels[n:]

    # Extract patches from data (low) and label (high), patch size and stride of 32
    data_patch=extract_patches(data_train,patch_size=patch_size,stride=patch_size)
    labels_patch=extract_patches(labels_train,patch_size=patch_size,stride=patch_size)

    # write in h5 files (piglet)
    write_hdf5(data_patch,labels_patch, train_data_path[i])
    write_hdf5(data_test,labels_test,test_data_path[i])

    # cmd1 = 'cp {} /gdrive/MyDrive/datasets/{}'.format(train_data_path[i], drive_file[i])
    # cmd2 = 'cp {} /gdrive/MyDrive/datasets/{}'.format(test_data_path[i], drive_file[i])
    # os.system(cmd1)
    # os.system(cmd2)

process_data(low_dose_files, full_dose_files, train_data_path, test_data_path, 32, drive_file)

## **Section 2:** Building MD-RFA Architecture


### Building the Boosting Module Group (BMG)
* The spatial-attention module is developed as a seperate function which accepts a feature map as it's input
* The channel-attention module is developed as a seperate function which accepts a feature map as it's input
* Using both channel- and spatial-attention modules a parallel integration with skip connections with each modules are done to develop the Boosting Attention Fusion Block (BAFB)
* Once the BAFB is developed a series connection of 4 BAFB are used with one skip connection to build the BMG

In [10]:
# Spatial Attention
def SA(feat_map):
    conv1 = Conv2D(64, (1,1), strides=1, padding='valid')(feat_map)
    conv2 = Conv2D(64, (1,1), strides=1, padding='valid')(feat_map)
    conv3 = Conv2D(64, (3,3), strides=1, activation='relu', padding='same')(feat_map)
    conv3 = Conv2D(64, (3,3), strides=1, padding='same')(conv3)
    
    conv4 = Multiply()([conv1, conv2])
    conv4_softmax = tf.keras.activations.softmax(conv4)
    conv5 = Multiply()([conv3, conv4_softmax])
    conv5 = Conv2D(64, (1,1), strides=1, padding='valid')(conv5)
    
    sa_feat_map = Add()([feat_map, conv5]) # Original
    
    return sa_feat_map

# Channel Attention 
def CA(feat_map):
    conv1 = AveragePooling2D(pool_size=(1,1), strides=1, padding='valid')(feat_map)  
    conv2 = Conv2D(64, (1,1), strides=1, activation='relu', padding='valid')(conv1)  
    conv3 = Conv2D(64, (1,1), strides=1, activation='sigmoid',padding='valid')(conv2)
    
    ca_feat_map = Multiply()([feat_map, conv3])
    
    return ca_feat_map

# BAFB -- four for each BMG in the denoiser
def BAFB(input_bafb):
    fcr1 = Conv2D(64, (1,1), strides=1, activation='relu', padding='valid')(input_bafb)
    
    fsa1 = SA(fcr1)
    fes_up = Add()([fsa1,fcr1])
    
    fca1 = CA(fcr1)
    fes_down = Add()([fca1, fcr1])
    
    fca2 = CA(fes_up)
    fsa2 = SA(fes_down)
    
    fcr2=concatenate([fca2, fes_up, fes_down, fsa2],axis=3)
    fcr2=Conv2D(1, (1,1), strides=1, activation='relu', padding='valid')(fcr2) 
    
    fc=concatenate([fcr1, fcr2],axis=3)
    fc=Conv2D(1, (1,1), strides=1, padding='valid')(fc) 
    
    return fc

# Boosting Module Groups (BMG)
def BMG(bmg_input):
    bafb1 = BAFB(bmg_input)
    bafb2 = BAFB(bafb1)
    bafb3 = BAFB(bafb2)
    bafbn = BAFB(bafb3)   
    fg = Add()([bmg_input,bafbn])
    return fg

###Fused Attention Modules with Dilated Residual Learning (FAM-DRL)
* Below is the model architecture for the benchmark model that achieved state-of-the-arts results.
* This model is used to compare performance with the MD-RFA model.

In [11]:
def fam_drl():
    inputs = (None, None, 1)
    inputs=Input(shape=inputs)
    # 1) Preconvolutional module (number of filters in each layer = 1)
    f1sf = Conv2D(64, (3,3), dilation_rate=(2,2), padding='same')(inputs)
    f1sf = BN()(f1sf)
    f1sf = Activation('relu')(f1sf)
    
    f2sf = Conv2D(64, (3,3), dilation_rate=(2,2),padding='same')(f1sf)
    f2sf = BN()(f2sf)
    f2sf = Activation('relu')(f2sf)
    
    f3sf = Conv2D(64, (3,3), dilation_rate=(2,2),padding='same')(f2sf)
    f3sf = BN()(f3sf)
    f3sf = Activation('relu')(f3sf)
    
    
    # 2) Three BMGs with 3 BAFBs each
    bmg1= BMG(f3sf)
    bmg2= BMG(bmg1)
    bmg3 = BMG(bmg2)
    
    f4sf = Add()([f3sf, bmg3])
    
    # 3) Post convolution Fpost = R3postC(R2postC( R1postC(bmg2)+F2sf ) + F1sf )
    dconv3 = Conv2D(64, (3,3), dilation_rate=(2,2),padding='same')(f4sf)
    dconv3 = BN()(dconv3)
    dconv3 = Activation('relu')(dconv3)
    
    dconv2 = Conv2D(64, (3,3), dilation_rate=(2,2),padding='same')(dconv3)  
    dconv2 = BN()(dconv2)
    dconv2 = Activation('relu')(dconv2)
    
    dconv1 = Add()([f2sf, dconv2])  # Symmetric skip connection (SSC)
    # skip1 = concatenate([dconv2, f1sf], axis=3)
    dconv1 = Conv2D(64, (3,3), dilation_rate=(2,2),padding='same')(dconv1)  
    dconv1 = BN()(dconv1)
    dconv1 = Activation('relu')(dconv1)
    
    # 4) Reconstruction Layer
    out = Add()([inputs, dconv1])
    out= Conv2DTranspose(3, (3,3), dilation_rate=(2,2),padding='same')(out) 

    resnet=Model(inputs=[inputs], outputs=[out, out, out]) 
    return resnet

* Model summary

In [None]:
model = fam_drl()
model.summary()

###Multi-scale Dilation with Residual Fused Attention (MD-RFA)

####Main model components
* **Multi-scale feature mapping:** Using dirrent dilation rates with the convolution layer a hierarchy of noise feature maps are extracted.
* **Boosting Module Groups:** Utilizing the BMG, the model learns the long-range dependencies of the LDCT image.
* **Reconstruction Layers:** Reconstructs the learned noise feature map to remove from the LDCT image.

In [13]:
def dilated_conv_block(dc_input, num_filters, kernel_size, dilation_rate, padding='same', activation='relu'):
    x = Conv2D(num_filters, kernel_size, dilation_rate=dilation_rate, padding=padding, activation=activation)(dc_input)
    x = Conv2D(num_filters, kernel_size, dilation_rate=dilation_rate, padding=padding, activation=activation)(x)
    return x

def md_rfa():
    inputs = (None, None, 1)
    inputs=Input(shape=inputs)

    fm1 = dilated_conv_block(inputs, 32, 3, 1)
    fm2 = dilated_conv_block(inputs, 32, 3, 2)
    fm3 = dilated_conv_block(inputs, 32, 3, 4)
    fm4 = concatenate([fm1, fm2, fm3], axis=-1)
    fm_out = Conv2D(64, 3, dilation_rate=2, padding='same', activation='relu')(fm4)

    bmg1= BMG(fm_out)
    bmg2= BMG(bmg1)
    bmg3 = BMG(bmg2)

    s1 = Add()([fm_out, bmg3])

    decoder1 = Conv2D(64, 3, dilation_rate=2, padding='same', activation='relu')(s1)
    decoder2 = Conv2D(64, 3, dilation_rate=2, padding='same', activation='relu')(decoder1)
    decoder3 = Conv2D(3, 3, dilation_rate=2, padding='same')(decoder2)

    out = subtract([inputs, decoder3])

    model = Model(inputs=[inputs], outputs=[out, out, out])
    return model

In [None]:
model = md_rfa()
model.summary()

## **Section 3:** Training Network and Evaluating

Three steps for training the network:\
1) Initializing the feature extraction models and loss functions\
2) Preparing training data\
3) Training model

Two steps for eveluating:\
1) Prepaing test data\
2) Testing model

### Feature Extraction and model loss functions
#### Train Model and Validate
To do this we do the following:
* Develop and Initialize the Loss functions for training
* Load data and prepare it.
* Train the network using tensorflow.

In [15]:
# Strutural Dissimilarity Loss function
def dssim_loss(y_true, y_pred):
    ssim = tf.image.ssim(y_true, y_pred, 1.0)
    return (1.0-ssim)/2.0

* Feature Extraction with ResNet50V2

In [16]:
class ResNet50V2Model(object):
    def __init__(self, image_shape=(None, None, 3)):
        self.image_shape = image_shape
        self.resnet = ResNet50V2(include_top=False, weights='imagenet', input_shape=self.image_shape)
        self.selectedLayers = ['conv1_conv','conv2_block2_out','conv3_block3_out']
        self.selectedOutputs = [self.resnet.get_layer(i).output for i in self.selectedLayers]
        self.loss_model = Model(inputs=self.resnet.input, outputs=self.selectedOutputs)
        self.loss_model.trainable = False
        self.mse = MeanSquaredError()

    def perceptual_loss(self, y_true, y_pred):
        loss_value = 0
        for i in range(0,3):
            loss_value += self.mse(self.loss_model((y_true*2.0)-1.0)[i], self.loss_model((y_pred*2.0)-1.0)[i])
        return loss_value

* Metrics used to measure training loss

In [17]:
# Metrics
def psnr_loss(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, 1.0)

def ssim_loss(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, 1.0)

* Functions for loading and reading data.

In [18]:
def read_hdf5(file):
    with h5py.File(file, 'r') as hf:
        data = np.array(hf.get('data'))
        labels = np.array(hf.get('label'))
   
        return data,labels

### Prepare training Data

* Define data path and where to store model weights, then Initialize model
* Load data and split data and labels.

In [19]:
# patch_size = '64_64'
# dataset_type = ['thoracic', 'piglet', 'head_N012', 'chest_C002', 'abdomen_L058']
# train_data_path = ['data/processed/train_{}_{}_dicom_0.h5'.format(patch_size, type_) for type_ in dataset_type]
# test_data_path = ['data/processed/test_{}_{}_dicom_0.h5'.format(patch_size, type_) for type_ in dataset_type]

def prep_all_train_data(training_files):
  training_data, training_labels = [], []

  for train_address in training_files:
    data,labels = read_hdf5(train_address)
    ## divison by 4095 keeps the input output between 0-1
    data = (data[:,:,:,None]/4095).astype(np.float32)
    labels = (labels[:,:,:,None]/4095).astype(np.float32)
    labels_3 = np.concatenate((labels,labels,labels),axis=-1)
    training_data.append(data)
    training_labels.append(labels_3)

  train_data = np.concatenate(training_data, axis=0)
  train_labels = np.concatenate(training_labels, axis=0)

  return train_data, train_labels

training_data, training_labels = prep_all_train_data(train_data_files)
# training_data, training_labels = prep_all_train_data(train_data_path)

In [20]:
training_data.shape

(136448, 32, 32, 1)

### Train the model
Main components of the ``train_model`` method:\
* Initialize loss functions and Adam optimizer
* Compile model with optimizer, loss functions and metrics
* Set up required callbacks for training:\
    * Checkpoint callback: saves model after every epoch (save training if anything were to occur so you can begin from where you left off)
    * Clear memory callback: Clears up RAM memory of any unused data so training doesn't crash.
* Save model weights


In [None]:
def train_model(model, data, labels, weight_address, checkpoint_filepath, loss_weight, numOfEpoch = 60, batch_size = 32):

  # Loss functions that require initialization
  perceptual = ResNet50V2Model()
  mse = MeanSquaredError()

  loss = [perceptual.perceptual_loss, mse, dssim_loss]
  loss_weights = loss_weight

  ADAM=Adam(learning_rate=0.0002, beta_1=0.01, beta_2=0.999)
  model.compile(optimizer=ADAM,loss=loss, loss_weights=loss_weights,metrics=[psnr_loss, ssim_loss])

  model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
      filepath=checkpoint_filepath,
      save_weights_only=True,
      monitor='loss',
      save_freq='epoch')
  
  class ClearMemory(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()
        tf.keras.backend.clear_session()

  # Train
  hist_adam = model.fit(x=data,y=[labels,labels,labels],batch_size=batch_size,epochs=numOfEpoch
                      ,validation_split=0, verbose=1, shuffle=True, callbacks=[model_checkpoint_callback, ClearMemory()]) # early_stop_callback
  model.save_weights(weight_address)

  cmd = 'cp {} /gdrive/MyDrive/datasets/'.format(weight_address)
  os.system(cmd)


# Train model
weight_address='model/weights/weights_mdrfa_perceptual40_mse30_dsim30.h5'
checkpoint_filepath = '/gdrive/MyDrive/datasets/weights_mdrfa_perceptual40_mse30_dsim30_ckpt.hdf5'
model = md_rfa()
loss_weights = [40,30,30]

train_model(model, training_data, training_labels, weight_address, checkpoint_filepath, loss_weights)

### If required resume training from where it left of.

In [None]:
def resume_train_loop(model, data, labels, weight_path, weight_address, checkpoint_filepath, loss_weight, numOfEpoch = 60, batch_size = 32, current_epoch):

  numOfEpoch = numOfEpoch - current_epoch
  batch_size=32

  perceptual = ResNet50V2Model()
  mse = MeanSquaredError()

  loss = [perceptual.perceptual_loss, mse, dssim_loss]
  loss_weights = loss_weight

  ADAM=Adam(learning_rate=0.0002, beta_1=0.01, beta_2=0.999)
  model.compile(optimizer=ADAM,loss=loss, loss_weights=loss_weights,metrics=[psnr_loss, ssim_loss])

  model.load_weights(weight_path)

  model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
      filepath=checkpoint_filepath,
      save_weights_only=True,
      monitor='loss',
      save_freq='epoch')
  
  class ClearMemory(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        gc.collect()
        tf.keras.backend.clear_session()

  ###Train
  hist_adam = model.fit(x=data,y=[labels,labels,labels],batch_size=batch_size,epochs=numOfEpoch
                      ,validation_split=0, verbose=1, shuffle=True, callbacks=[model_checkpoint_callback, ClearMemory()]) # early_stop_callback
  model.save_weights(weight_address)

  cmd = 'cp {} /gdrive/MyDrive/datasets/'.format(weight_address)
  os.system(cmd)

# Resume Training if required
weight_path = '/content/datasets/weights_md_rfa_50_perceptual40_mse30_dsim30_ckpt.hdf5'
weight_address='model/weights/weights_md_rfa_60_perceptual40_mse30_dsim30.h5'
checkpoint_filepath = '/gdrive/MyDrive/datasets/weights_md_rfa_50_perceptual40_mse30_dsim30_ckpt.hdf5'
model=md_rfa()
loss_weights = [40,30,30]
last_epoch = 50

resume_train_loop(model, training_data, training_labels, weight_path, weight_address, checkpoint_filepath, loss_weights, current_epoch = last_epoch)

### Test the model

* Prepare dataset
* Prepare methods for mesuring performance (PSNR and SSIM)
* Test model and save results

### Miscellaneous

In [None]:
dataset_type = ['thoracic', 'piglet', 'head', 'chest', 'abdomen']
def drive_to_local():
  cmd = 'cp ./datasets/weights_kevin_unet_perceptual40_mse30_dsim30.h5 ./model/weights/'
  os.system(cmd)
  # for data_type in dataset_type:
  #   cmd = 'cp ./datasets/weights_luella_original_perceptual50_mse20_dsim30.h5 ./model/weights/'
  #   os.system(cmd)

drive_to_local()

### Preparing test dataset and metrics

In [22]:
# patch_size = '64_64'
# dataset_type = ['thoracic', 'piglet', 'head_N012', 'chest_C002', 'abdomen_L058']
# test_data_path = ['data/processed/test_{}_{}_dicom_0.h5'.format(patch_size, type_) for type_ in dataset_type]
def prep_all_testing_data(test_data_files):
  testing_data, testing_labels = [], []
  for test_address in test_data_files:
    data_test,labels_test = read_hdf5(test_address)
    data_test = (data_test[:,:,:,None]/4095).astype(np.float32)
    labels_test = (labels_test[:,:,:,None]/4095).astype(np.float32)
    testing_data.append(data_test)
    testing_labels.append(labels_test)

  return testing_data, testing_labels

testing_data, testing_labels = prep_all_testing_data(test_data_files)
# testing_data, testing_labels = prep_all_testing_data(test_data_path)

In [None]:
testing_data[4].shape

* Metrics used for measuring performance

In [23]:
def measure_psnr(test_labels, pred_labels):
  diff = test_labels-pred_labels
  diff = diff.flatten('C')
  rmse = math.sqrt( np.mean(diff ** 2.) )
  psnr = 20*math.log10(1.0/rmse)
  return psnr

def measure_ssim(test_labels, pred_labels):
  ssim = 0
  for i in range (test_labels.shape[0]):
      ssim += compare_ssim(test_labels[i,:,:,0],pred_labels[i,:,:,0],
                  data_range=pred_labels[i,:,:,0].max()-pred_labels[i,:,:,0].min())
      
  ssim = ssim/test_labels.shape[0]
  return ssim

### Testing the model:

* Initialize a dictionary and list for storing both measurements and images
* Initialize loss functions and Adam optimizer then recompile model for testing

In [None]:
def test_model(model, data, labels, model_weight_address, dataset_type, loss_weight):
  results_dict = {}
  pred_results = []
  
  perceptual = ResNet50V2Model()
  mse = MeanSquaredError()

  loss = [perceptual.perceptual_loss, mse, dssim_loss]
  loss_weights = loss_weight

  ADAM=Adam(learning_rate=0.0002, beta_1=0.01, beta_2=0.999)
  model.compile(optimizer=ADAM,loss=loss, loss_weights=loss_weights,metrics=[psnr_loss, ssim_loss])

  model.load_weights(weight_address)

  for i in range(len(dataset_type)):

    [labels_pred,labels_pred, labels_pred]= model.predict(data[i],batch_size=8,verbose=1)

    psnr = measure_psnr(labels[i], labels_pred)
    ssim = measure_ssim(labels[i], labels_pred)

    results_dict[dataset_type[i]] = {'psnr':psnr, 'ssim':ssim}
    pred_results.append(labels_pred)

    labels_test_3 = np.concatenate((labels[i],labels[i],labels[i]),axis=-1)

    loss=model.evaluate(x=data[i],y=[labels_test_3,labels_test_3,labels_test_3], batch_size=8,verbose=1)

  return results_dict, pred_results

# Test model
model=md_rfa()
loss_weight = [40,30,30]
weight_address='/content/datasets/weights_mdrfa_perceptual40_mse30_dsim30.h5'
results, pred_results = test_model(model, testing_data, testing_labels, weight_address, dataset_type, loss_weight)
print(results)

In [None]:
pred_results[0].shape

### Viewing results

Windowing2 Method:
* Denormalizes images

In [27]:
def windowing2(image,center,width):
    min_bound = center - width/2
    max_bound = center + width/2
    output = (image-min_bound)/(max_bound-min_bound)
    output[output<0]=0
    output[output>1]=1
    return output

### Used to create new directories to save results in

In [None]:
dataset_dir = ['thoracic', 'piglet', 'head_N012', 'chest_C002', 'abdomen_L058']
dir_set = ['test_data', 'test_labels', 'test_results']
for dataset in dataset_dir:
  os.mkdir(dataset)
  for dir in dir_set:
    path = '{}/{}'.format(dataset,dir)
    os.mkdir(path)

In [None]:
import os
from tensorflow.keras.utils import array_to_img                                        

dataset_dir = ['thoracic', 'piglet', 'head_N012', 'chest_C002', 'abdomen_L058']
dir_set = ['test_data', 'test_labels', 'test_results']
for i in range(len(pred_results)):
  num_images, _, _, _ = pred_results[i].shape
  for dir in dir_set:
    for j in range(num_images):
      if dir == 'test_data':
        data = testing_data[i][j,:,:,:]
        img = windowing2((data)*4095-1024,40,400)
        img = array_to_img(img)
        path = '{}/{}/test_img{}.png'.format(dataset_dir[i], dir, j)
        img.save(path)
      elif dir == 'test_labels':
        data = testing_labels[i][j,:,:,:]
        img = windowing2((data)*4095-1024,40,400)
        img = array_to_img(img)
        path = '{}/{}/gt_img{}.png'.format(dataset_dir[i], dir, j)
        img.save(path)
      if dir == 'test_results':
        data = pred_results[i][j,:,:,:]
        img = windowing2((data)*4095-1024,40,400)
        img = array_to_img(img)
        path = '{}/{}/pred_img{}.png'.format(dataset_dir[i], dir, j)
        img.save(path)


In [None]:
!zip -r /content/abdomen_L058.zip /content/abdomen_L058/
!zip -r /content/chest_C002.zip /content/chest_C002/
!zip -r /content/head_N012.zip /content/head_N012/
!zip -r /content/piglet.zip /content/piglet/
!zip -r /content/thoracic.zip /content/thoracic/

### Prints the results saved on the Pandas dataframe

In [None]:
import pandas as pd

df = pd.DataFrame(results)
df

In [None]:
plt.imshow(testing_data[1][1,:,:,0], cmap='gray')
plt.show()
plt.figure()
plt.imshow((labels_pred)[1,:,:,0], cmap='gray')
plt.show()
plt.figure()
plt.imshow(testing_labels[1][1,:,:,0], cmap='gray')
plt.show()

print('The PSNR is:', psnr_edge_p)
print('The SSIM is:', ssim_edge_p)