In [None]:
"""
Master’s thesis - UNIR - Luis Enrique Ramirez Pelaez - 2023
Deep Learning as an alternative to deconvolution of galaxy images captured with the space telescope Hubble
Code developed
------------------------------------------------------------------
    https://www.linkedin.com/in/soy-luisen/ 
    https://github.com/soy-luisen/GalaxyHubble
------------------------------------------------------------------
"""

"""
Main project.

"""
import cv2, os, glob, time, math
import pandas as pd
import numpy as np
import pprint as pprint
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.signal import convolve2d as conv2

from skimage import color, data, restoration, io, exposure
from skimage.metrics import structural_similarity as ssim

from astropy.visualization import astropy_mpl_style, ImageNormalize, SqrtStretch, MinMaxInterval,imshow_norm
plt.style.use(astropy_mpl_style)
from astropy.utils.data import get_pkg_data_filename
from astropy.io import fits
from astropy.table import Table

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import normalize

from keras_unet.models import vanilla_unet, satellite_unet, custom_unet
from keras_unet.utils import plot_imgs

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, utils
from tensorflow.keras.layers import Input , Conv2D , MaxPooling2D , Dropout , concatenate , UpSampling2D, Normalization, Rescaling
from tensorflow.keras import models
from tensorflow.keras import losses
from tensorflow.keras import optimizers

In [None]:
# Python magic syntax to reload modules if they have changed. This avoids having to reset the kernel and lose everything.
%load_ext autoreload
%autoreload 2
%reload_ext autoreload
import arquitecturas
import ejecucion
import dataset

In [None]:
# Check availability cuda
ejecucion.is_cuda()

In [None]:
###########################################
## DATASET GENERAL PARAMS
###########################################
imtope=128 #The value chosen for the size of the images in the dataset.
carpeta_dataset=".\\dataset\\"        #Folder with dataset files 
carpeta_resultados=".\\results\\"  #Folder to save results
carpeta_pretrained_models=".\\pretrained_models\\"  #Folder to load pretrained models

#List of .fits files with galaxy images of dataset.
dataset_files=['dataset_galaxy128_1.fits','dataset_galaxy128_2.fits','dataset_galaxy128_3.fits',
               'dataset_galaxy128_4.fits','dataset_galaxy128_5.fits','dataset_galaxy128_6.fits',
               'dataset_galaxy128_7.fits','dataset_galaxy128_8.fits','dataset_galaxy128_9.fits',
               'dataset_galaxy128_10.fits','dataset_galaxy128_11.fits',
               'dataset_galaxy128_20.fits','dataset_galaxy128_21.fits']           

#List of .fits files with deconvolutioned galaxy images of dataset (targets)
dataset_deconv_files=["dataset_galaxy128_deconv_1.fits","dataset_galaxy128_deconv_2.fits","dataset_galaxy128_deconv_3.fits",
                      "dataset_galaxy128_deconv_4.fits","dataset_galaxy128_deconv_5.fits","dataset_galaxy128_deconv_6.fits",
                      "dataset_galaxy128_deconv_7.fits","dataset_galaxy128_deconv_8.fits","dataset_galaxy128_deconv_9.fits",
                      "dataset_galaxy128_deconv_10.fits","dataset_galaxy128_deconv_11.fits",
                      "dataset_galaxy128_deconv_20.fits","dataset_galaxy128_deconv_21.fits"]  

#List of test images to display.
imagenes_a_mostrar=[0,1,3,7,8,9,16,22,28,41,48,63,65,557,578,588,633,674,706,810,872]


In [None]:
############################################
# Import dataset
# X: read images from .fits files
# Y: deconvolutioned images from .fits files
# X_test: Test images
# Y_test: Deconvolutioned test images (to predict)
############################################

# Dataset load
X,Y = dataset.carga_dataset([0,1,2,3,4,5,6,7,8,9,11],carpeta_dataset,dataset_files,dataset_deconv_files)    #Training data,
x_test,y_test = dataset.carga_dataset([10,12],carpeta_dataset,dataset_files,dataset_deconv_files)  #Test data.

print("Loaded "+str(len(X))+" images for training.")
print("Loaded "+str(len(x_test))+" images for test.")

In [None]:
# Random division of the dataset into training and validation in 80-20 ratio.
# If random_state=None each division is different.
x_train, x_val, y_train, y_val = train_test_split(X, Y, test_size=0.2, random_state=None, shuffle=True)

In [None]:
def ejecuta_prueba(nombre_modelo,input_shape,epochs,batch_size,optimizer,lr,loss,metrics,dub=0):
    """
    Runs a complete test, displaying results
    
    Parameters:
    -----------
    nombre_modelo: Type of model to build
    input_shape: Image input, rows x columns x depth
    epochs: Number of epochs to training.
    batch_size: Value of batch size to training.
    optimizer: Type of optimizer.
    lr: Value of learning rate.
    loss: Value of loss rate.
    metrics: Type of metric to training.
    dub: Only for DIDN's architectures. Number of processing units. Default 0.
    
    Return:
    -------
    none   
    """   
    
    # It resets state and model to avoid problems in successive trainings.        
    ejecucion.reset_keras()
    time.sleep(10)    
    
    # Start run time. Only compilation and training time.
    tiempo_inicio=time.time()
    
    # Test name to write in result filename.
    print("********************************************************************")    
    if (dub>0):  #DIDN models has another param, dub, that is added to the name.
        nombre_prueba="modelo_"+nombre_modelo+"DUB"+str(dub)
    else:
        nombre_prueba="modelo_"+nombre_modelo
        
    nombre_prueba = nombre_prueba+"_"+optimizer+str(format(lr,".1E"))+"_"+loss+"_"+metrics+"_"+str(batch_size)+"_"+str(epochs)  
    print("PRUEBA: "+nombre_prueba);    
    
    # The model indicated in the parameter is created
    modelo=arquitecturas.crea_modelo(nombre_modelo,input_shape,dub=dub)  
    
    # Training of model
    history=ejecucion.entrena_modelo(modelo, x_train, y_train, x_val, y_val, input_shape = input_shape,
                                     epochs=epochs, batch_size=batch_size, optimizer=optimizer, lr=lr, loss=loss, metrics=metrics)

    # Training graphics are shown
    resultados_prediccion_texto=ejecucion.muestra_estadisticas_entrenamiento(history,modelo,x_test,y_test,carpeta_resultados+nombre_prueba)
    
    tiempo_fin=round(time.time() - tiempo_inicio)
    
    # Predictions are made and results are shown
    # 'num' predictions are displayed
    # If az=True 'Num' random predictions are displayed
    # If az=False y num>0 The first 'num' prediction images are displayed
    # If az=False y num=0 The images of 'imagenes_a_mostrar' are displayed  (concrete images of the test data)
    predictions=ejecucion.muestra_predicciones(modelo,x_test,y_test,carpeta_resultados+nombre_prueba,0,imagenes_a_mostrar,az=False)
    
    # Metrics (deconvolutioned images versus predicts images)
    predictions_form=predictions[:,:,:,0]
    MSE_pred = ejecucion.obtain_MSE(y_test,predictions_form)
    metricas_texto="MSE: "+str(MSE_pred)+"\n"
    RMSE = math.sqrt(MSE_pred)
    metricas_texto += "RMSE: "+str(RMSE)+"\n"
    PSNR_pred = ejecucion.obtain_PSNR(MSE_pred, bits = 8)
    metricas_texto += "PSNR: "+str(PSNR_pred)+"\n"
    SSIM_pred = ejecucion.obtain_SSIM(y_test, predictions_form)
    metricas_texto += "SSIM: "+str(SSIM_pred)+"\n"

    resultados_totales=nombre_prueba+"\nPREDICTIONS RESULTS:\n"+resultados_prediccion_texto+"\nMETRICS:\n"+metricas_texto
    resultados_totales=resultados_totales+"\nEXECUTION TIME: "+str(tiempo_fin)+" seconds"
    print(resultados_totales)
    
    # Results are saved in text mode
    with open(carpeta_resultados+nombre_prueba+".txt", "w") as text_file:
        text_file.write("%s" % resultados_totales)
    
    # Model and predictions are saved
    modelo.save(carpeta_resultados+nombre_prueba+".h5")
    
    # Uncoment next line to save predictions.
    #ejecucion.guarda_predicciones(predictions,nombre_prueba+".fits",carpeta_dataset)

    print("END TEST: "+nombre_prueba);

def ejecuta_prueba_preentrenada(modelo_pretraining):  
    """
    A pretrained model is loaded from a .h5 file. The name of this file already carries the parameterization for the names of the images.
    Predictions are made with the test data, and the results are displayed.
    
    Parameters:
    -----------
    modelo_pretraining: Pretrained model file name.   
    
    Return:
    -------
    none   
    """   
   
    # Load model
    modelo = ejecucion.carga_modelo(carpeta_pretrained_models,modelo_pretraining)
    
    nombre_prueba = modelo_pretraining[:-3]                       
    # Test name
    print("********************************************************************") 
    print("TEST pretrained model: "+nombre_prueba);    
                               
    # Predictions are made and results are shown
    # 'num' predictions are displayed
    # If az=True 'Num' random predictions are displayed
    # If az=False y num>0 The first 'num' prediction images are displayed
    # If az=False y num=0 The images of 'imagenes_a_mostrar' are displayed  (concrete images of the test data)
    predictions=ejecucion.muestra_predicciones(modelo,x_test,y_test,carpeta_resultados+nombre_prueba,0,imagenes_a_mostrar,az=False)
    
    # Metrics (deconvolutioned images versus predicts images)
    predictions_form=predictions[:,:,:,0]
    MSE_pred = ejecucion.obtain_MSE(y_test,predictions_form)
    metricas_texto="MSE: "+str(MSE_pred)+"\n"
    RMSE = math.sqrt(MSE_pred)
    metricas_texto += "RMSE: "+str(RMSE)+"\n"
    PSNR_pred = ejecucion.obtain_PSNR(MSE_pred, bits = 8)
    metricas_texto += "PSNR: "+str(PSNR_pred)+"\n"
    SSIM_pred = ejecucion.obtain_SSIM(y_test, predictions_form)
    metricas_texto += "SSIM: "+str(SSIM_pred)+"\n"
    
    resultados_totales=nombre_prueba+"\nMETRICS:\n"+metricas_texto
    print(resultados_totales)
    
    # Results are saved in text mode
    with open(carpeta_resultados+nombre_prueba+"_TEST.txt", "w") as text_file:
        text_file.write("%s" % resultados_totales)
        
    print("END TEST: "+nombre_prueba);

In [None]:
"""
RUN TEST. 
You can modify

epoch: 100, 150, 200
batch_size: 32, 64, 128, 256
lr: 0.001, 0.0001, 0.00001. 0.000001

EXAMPLES

ejecuta_prueba(nombre_modelo="unetclassic4",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")
ejecuta_prueba(nombre_modelo="unetclassic8",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")
ejecuta_prueba(nombre_modelo="unetclassic16",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")
ejecuta_prueba(nombre_modelo="unetclassic32",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")
ejecuta_prueba(nombre_modelo="unetclassic64",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")
ejecuta_prueba(nombre_modelo="unetclassic128",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")

ejecuta_prueba(nombre_modelo="DIDN16",input_shape=(128,128,1),epochs=150,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error",dub=1)
ejecuta_prueba(nombre_modelo="DIDN16",input_shape=(128,128,1),epochs=150,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error",dub=2)

ejecuta_prueba(nombre_modelo="AEPP2",input_shape=(128,128,1),epochs=100,batch_size=32,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")

RUN PRETRAINING MODEL
ejecuta_prueba_preentrenada("modelo_unetclassic16_adam_ssim_mean_absolute_error_64_100.h5")
ejecuta_prueba_preentrenada("modelo_DIDN16DUB2_adam1.0E-03_ssim_mean_absolute_error_64_150.h5")
ejecuta_prueba_preentrenada("modelo_AEPP2_adam1.0E-03_ssim_mean_absolute_error_64_200.h5")

"""

In [None]:
#ejecuta_prueba(nombre_modelo="unetclassic16",input_shape=(128,128,1),epochs=5,batch_size=64,optimizer="adam",lr=0.001,loss="ssim",metrics="mean_absolute_error")

In [None]:
#ejecuta_prueba_preentrenada("modelo_AEPP2_adam1.0E-03_ssim_mean_absolute_error_64_200.h5")
