In [1]:
import os

os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ['TF_ENABLE_ONEDNN_OPTS'] =  "0"



import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy

from tensorflow import keras 
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist

## Librerías
import seaborn as sns
import sys
import cv2 as cv2
import glob
from PIL import Image
import tensorflow as tf
import sys
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import gc
import tensorflow.keras as keras
from sklearn.model_selection import train_test_split
import os


from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.preprocessing import StandardScaler

print(tf.__version__)

2024-09-08 20:19:35.028983: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-08 20:19:35.029022: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-08 20:19:35.032805: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-08 20:19:35.385937: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


2.14.0


In [None]:
## Uso de GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

# Dataset

In [3]:
## function to load images
def load_binary_images(file_names):  
    image = np.load(file_names).astype(np.float32)
    image = np.expand_dims(image, axis=-1)
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = tf.image.resize(image, [32,32])   
    ## binariazar
    image = tf.where(image > 0.5, 1.0, 0.0)  
    return image

def load_images(file_names):
    image = np.load(file_names).astype(np.float32)
    #expand_dims
    image = np.expand_dims(image, axis=-1)
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = tf.image.resize(image, [32,32])/255.0    
      
    return image

def min_max_scaler(ruta,image):
    ## load images in the path
    images = glob.glob(ruta + '/*.npy')
    images = sorted(images)
    ## load images
    images = [load_images(image) for image in images]
    ## min max scaler of thei image using the max and min of all images
    images = np.array(images)
    max_value = np.max(images)
    min_value = np.min(images)
    image = tf.math.abs((image - min_value)/(max_value - min_value))
    return image   

## Function to get contours and features
def get_contours_and_features(binary_map):
    #https://docs.opencv.org/4.x/d3/d05/tutorial_py_table_of_contents_contours.html
    #binary_map = cv2.cvtColor(binary_map, cv2.COLOR_BGR2GRAY)
    contours, hierarchy = cv2.findContours(binary_map, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
    contours_features = []
    for contour in contours:
        error = 1e-5
        moments = cv2.moments(contour)
        cx = moments['m10'] / (moments['m00'] + error)
        cy = moments['m01'] / (moments['m00'] + error)
        center_of_mass = [cx, cy]
        x,y,w,h = cv2.boundingRect(contour)        
        rect_area = w*h
        features = {
            'bounding_box': (x,y,w,h),
            'area': cv2.contourArea(contour),
            'perimeter': cv2.arcLength(contour, True),       
            #'solidity': np.float32(cv2.contourArea(contour))/cv2.convexHull(contour),
            'equivalent_diameter': np.sqrt(4*cv2.contourArea(contour)/np.pi),            
            'moments': moments,
            'center_of_mass': center_of_mass,
            'contour': contour
        }
        contours_features.append(features)
        del features
    #plt.imshow(contours_map, cmap='gray')
    return contours_features

# function to get a determined property from a list of contours features (area by default)
def get_item(contour_features, key='area'):
    areas = []
    for contour_feature in contour_features:
        area =  contour_feature[key]
        areas.append(area)
    return areas

## function to get geometric_attributes
def get_geometric_atributes(binary_images):
    descriptors = []
    for binary_img in binary_images:
        ## Formato
        image = binary_img.numpy().astype(np.uint8)       
        
        ## Capturar contornos
        contour_features = get_contours_and_features(image)
        
        ## Calcular vector de áreas de poro (todos los poros)
        areas = get_item(contour_features, key='area')

        ## Calcular vector de perímetros de poro (todo los poros)
        pmtro = get_item(contour_features, key='perimeter')

        ## Calcular el diametro equivalente de los poros
        eq_diameter = get_item(contour_features, key='equivalent_diameter')                

        descriptor = [np.mean(areas), np.mean(pmtro),np.mean(eq_diameter)]
        
        descriptors.append(descriptor)
        
    
    return descriptors

## function to get the middle image route in the folder
def get_middle_image(folder_route):
    ## get list of images
    images = glob.glob(folder_route + '/*.npy')
    ## order images
    images = sorted(images)
    ## get index of middle image
    if len(images) == 0:
        indx = -1
        middle_image = 'empty'
    else:
        indx = int(np.floor(len(images)/2))
        middle_image = images[indx]   
    
    return middle_image

In [None]:
## annotations to get nodule_features
annotations_cvs_4R = pd.read_csv('/data/Datasets/Nodules_ISBI/meta_created_info_3d_4R.csv')
annotations_csv_3R = pd.read_csv('/data/Datasets/Nodules_ISBI/meta_created_info_3d_3R.csv')
annotations = pd.concat([annotations_cvs_4R, annotations_csv_3R])
## drop malignancy = 3
annotations = annotations[annotations['malignancy'] != 3]
annotations

In [None]:
## ruta dataset
rutas_images_npy = glob.glob('/data/Datasets/Nodules_ISBI/images/*/*.npy')

## order list by folder
rutas_images_npy = sorted(rutas_images_npy)

## get list of folders
folders = [ruta.split('/')[-2] for ruta in rutas_images_npy]
folders = np.unique(folders)

## build dataframe
rutas_images = []
rutas_masks = []
areas = []
perimetros = []
diametros = []
calsifications = []
spiculations = []
lobulations = []
sphericities = []
textures = []
margins = []

## cancer or not
labels = []
    
for folder in folders:
    ruta = '/data/Datasets/Nodules_ISBI/images/' + folder
    ruta_masc = '/data/Datasets/Nodules_ISBI/masks/' + folder
    
    ## load middle image route
    image_route = get_middle_image(ruta)
    name_image = image_route.split('/')[-1]
    mask_route = ruta_masc + '/' + name_image       
    
    if image_route != 'empty':    
        ## get attributes
        image = load_images(image_route)
        ## min-max scaler
        image = min_max_scaler(ruta, image)
        ## binary image
        binary_image = load_binary_images(mask_route)   
        ## get features of the image            
        features = annotations[annotations['folder'] == folder]
        if features.shape[0] != 0:     
            ## append rutas
            geometric_attributes = get_geometric_atributes([binary_image])[0]
            rutas_images.append(image_route)
            rutas_masks.append(mask_route)
            
            ## append attributes
            areas.append(geometric_attributes[0])
            perimetros.append(geometric_attributes[1])
            diametros.append(geometric_attributes[2])     
              
            malignancy = features['malignancy'].values[0]
            calsification = features['calcification'].values[0]
            spiculation = features['spiculation'].values[0]
            lobulation = features['lobulation'].values[0]
            texture = features['texture'].values[0]
            label = features['is_cancer'].values[0]
            margin = features['margin'].values[0]
            
            ## append featuresprint(features)
            calsifications.append(calsification)
            spiculations.append(spiculation)
            lobulations.append(lobulation)
            textures.append(texture)
            labels.append(label)
            margins.append(margin)


## build dataframe
df = pd.DataFrame()
df['ruta'] = rutas_images
df['mask'] = rutas_masks
df['area'] = areas
df['perimetro'] = perimetros
df['diametro'] = diametros
df['calsification'] = calsifications
df['spiculation'] = spiculations
df['lobulation'] = lobulations
df['texture'] = textures
df['margin'] = margins
df['label'] = labels

df

In [None]:
## plot a random image with mask
indx = np.random.randint(0, df.shape[0])
image = load_images(df['ruta'].values[indx])
mask = load_binary_images(df['mask'].values[indx])

print(np.shape(image), np.shape(mask),type(image), type(mask))

plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.imshow(image, cmap='gray')
plt.title('Image')
plt.subplot(1,2,2)
plt.imshow(mask, cmap='gray')
plt.title('Mask')
plt.show()

## Dataset

In [None]:
df_positive = df[df['label'] == 'True']
df_negative = df[df['label'] == 'False']

print('Positive shape: ', df_positive.shape)
print('Negative shape: ', df_negative.shape)

## seed 
np.random.seed(0)

## shuffle data
df_positive = df_positive.sample(frac=1)
df_negative = df_negative.sample(frac=1)

## sample 120 positive and 120 negative
pctg = 0.8
n_train_samples_positive = int(df_positive.shape[0]*pctg)
n_train_samples_negative = int(df_negative.shape[0]*pctg)

df_positive_train = df_positive.iloc[:n_train_samples_positive]
df_positive_test = df_positive.iloc[n_train_samples_positive:]

df_negative_train = df_negative.iloc[:n_train_samples_negative]
df_negative_test = df_negative.iloc[n_train_samples_negative:]

print('Positive train shape: ', df_positive_train.shape)
print('Positive test shape: ', df_positive_test.shape)
print('Negative train shape: ', df_negative_train.shape)
print('Negative test shape: ', df_negative_test.shape)

## concat dataframes
df_train = pd.concat([df_positive_train, df_negative_train])
df_test = pd.concat([df_positive_test, df_negative_test])

## re-shuffle data
df_train = df_train.sample(frac=1)
df_test = df_test.sample(frac=1)

## replace labels
df_train['label'] = df_train['label'].replace({'True': 1, 'False': 0})
df_test['label'] = df_test['label'].replace({'True': 1, 'False': 0})

print('Train shape: ', df_train.shape)
print('Test shape: ', df_test.shape)

In [None]:
## create dataset function 
def create_train_dataset(batch,df):
    images_routes = df['ruta'].to_numpy()
    labels = df['label'].to_numpy()
    features = df[['area', 'spiculation', 'lobulation', 'margin','texture']].to_numpy()
    
    
    ## Data augmentation
    def flip_left_image(image):
        ## flip left-right
        image = tf.image.random_flip_left_right(image)
        return image
    
    def rotate90_image(image):
        ## rotate 90 degrees
        image = tf.image.rot90(image)
        return image
    
    def rotate180_image(arg):
        ## rotate 180 degrees
        image = tf.image.rot90(arg, k=2)
        return image
    
    def flip_right_image(image):
        ## flip up-down
        image = tf.image.random_flip_up_down(image)
        return image
    
    def flip_up_image(image):
        ## flip up-down
        image = tf.image.random_flip_up_down(image)
        return image
    
    def flip_up_image(image):
        ## flip up-down
        image = tf.image.random_flip_up_down(image)
        return image
        
    
    images = []
    new_labels = []
    new_features = []
    for image_route, label, features in zip(images_routes, labels,features):
        image_original = load_images(image_route)
        image_left_flip = flip_left_image(image_original)
        image_rotate90 = rotate90_image(image_original)
        image_rotate180 = rotate180_image(image_original)
        image_up_flip = flip_up_image(image_original)
        images.append(image_original)
        images.append(image_left_flip)
        images.append(image_rotate90)
        images.append(image_rotate180)
        images.append(image_up_flip)
        new_labels.append(label)
        new_labels.append(label)
        new_labels.append(label)
        new_labels.append(label)
        new_labels.append(label)       
        new_features.append(features)
        new_features.append(features)
        new_features.append(features)
        new_features.append(features)
        new_features.append(features)
        
        
        
    ## convert to tensor
    images = tf.convert_to_tensor(images, dtype=tf.float32)
    labels = tf.convert_to_tensor(new_labels, dtype=tf.float32)
    features = tf.convert_to_tensor(new_features, dtype=tf.float32)    
    
    
    dataset = tf.data.Dataset.from_tensor_slices((images, features, labels))
    
    ## apply preprocess_image function
    dataset = dataset.map(lambda image, features, label: (image, features, label))
    
    dataset = dataset.shuffle(buffer_size=1000)
    
    dataset = dataset.batch(batch)
    
    return dataset


## create dataset function 
def create_test_dataset(batch,df):    
    ## randomize data
    df = df.sample(frac=1)
    
    images_routes = df['ruta'].to_numpy()
    labels = df['label'].to_numpy()
    features = df[['area', 'spiculation', 'lobulation', 'margin','texture']].to_numpy()
    
    
    images = []
    for image_route in images_routes:
        image = load_images(image_route)
        images.append(image)
        
    ## convert to tensor
    images = tf.convert_to_tensor(images, dtype=tf.float32)
    labels = tf.convert_to_tensor(labels, dtype=tf.float32)
    features = tf.convert_to_tensor(features, dtype=tf.float32)    
    
      
    
    dataset = tf.data.Dataset.from_tensor_slices((images, features, labels))
    
    ## apply preprocess_image function
    dataset = dataset.map(lambda image, features, label: (image, features, label))
      
      
    dataset.batch(batch)
    
    return dataset

## both-clases
train_dataset = create_train_dataset(8, df_train)

# train dataset with just no-cancer class
#train_dataset = create_train_dataset(16, df_train.drop(df_train[df_train['label'] == 1].index))

for image, features, label in train_dataset:
    print(image.shape)
    print(features.shape)
    print(label.shape)
    break

i = 0 
for image, features, label in train_dataset:
    i += np.shape(image)[0]

print(i)

In [None]:
for image, features, label in train_dataset:
    ## plot 8 batch images
    plt.figure(figsize=(10,10))
    for i in range(8):
        plt.subplot(4,2,i+1)
        plt.imshow(image[i], cmap='gray')
        plt.title('Label: ' + str(label[i].numpy()))
    plt.tight_layout()
    plt.show()
    break

# Variational autoencoders

In [15]:
# Hiperparámetros
epochs = 2000
learning_rate = 0.0001
batch = 8

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

## GVAE

### Encoder

In [None]:
###_____________________________ENCODER________________________##
latent_dim = 8

nx, ny = 32, 32

encoder_inputs = keras.Input(shape=(nx, ny, 1))

x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same',name='layer_E1')(encoder_inputs)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same',name='layer_E2')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(16, (2, 2), activation='relu', padding='same',name='layer_E6')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (2, 2), activation='relu', padding='same',name='layer_E7')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='relu', padding='same',name='layer_E8')(x)
x = tf.keras.layers.BatchNormalization()(x)


x = layers.Flatten()(x)

z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(inputs=encoder_inputs, outputs=[z_mean, z_log_var, z], name="encoder")
encoder.summary()

### Decoder

In [None]:
## ------------------ decoder -------------------------
## Entrada Z
latent_inputs = keras.Input(shape=(latent_dim,))

##dimensiones
ndim = 4
nfilts = 32

## anti-flatten de la última capa convolucional
x = layers.Dense(ndim * ndim * nfilts, activation="relu")(latent_inputs)

## Reshape para reconstruir la última convolucional
x = layers.Reshape((ndim,ndim, nfilts))(x)

## Capas convolucionales
x = tf.keras.layers.Conv2D(4, (2, 2), activation='relu', padding='same',name='layer_D1')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (2, 2), activation='relu', padding='same',name='layer_D2')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(16, (2, 2), activation='relu', padding='same',name='layer_D3')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(32, (2, 2), activation='relu', padding='same',name='layer_D4')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(32, (2, 2), activation='relu', padding='same',name='layer_D5')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

decoder_outputs = tf.keras.layers.Conv2D(1, (2, 2), activation='sigmoid', padding='same',name='layer_D6')(x)

## Construcción del decoder
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

In [15]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")        

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
        
            z_mean, z_log_var, z = self.encoder(data[0])
            reconstruction = self.decoder(z)
            #reconstruction /= tf.reduce_max(reconstruction)
            
            
            reconstruction_loss = tf.math.abs(tf.reduce_mean(
                tf.reduce_sum(
                   keras.losses.binary_crossentropy(data[0], reconstruction), axis=(1, 2)
                )
            ))
            
            
            '''
            reconstruction_loss =tf.reduce_mean(
                tf.reduce_sum( tf.keras.losses.MeanSquaredError()(data, reconstruction)
                             )
            )
            '''
            
            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))       
               
            total_loss = reconstruction_loss + kl_loss
            
        grads = tape.gradient(total_loss, self.trainable_weights)        
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)       
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),            
        }

In [None]:
vae = VAE(encoder, decoder)
opt = tf.optimizers.Adam(learning_rate = learning_rate, )
#nll = lambda x , rv_x: -rv_x.log_prob(x)
vae.compile(optimizer=opt)


vae.fit(train_dataset, epochs=epochs, 
                batch_size=batch,
                         callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=30)])

In [None]:
### Guardar modelohttps://www.tensorflow.org/guide/saved_model?hl=es-419
vae.encoder.save('VAE_Models/GVAE_8_encoder.h5')
vae.decoder.save('VAE_Models/GVAE_8_decoder.h5')

## GVAE + REGRESSOR

### Encoder

In [None]:
###_____________________________ENCODER________________________##
latent_dim = 8

nx, ny = 32, 32

encoder_inputs = keras.Input(shape=(nx, ny, 1))

x = tf.keras.layers.Conv2D(32, (3, 3), activation='tanh', padding='same',name='layer_E1')(encoder_inputs)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (3, 3), activation='tanh', padding='same',name='layer_E2')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='tanh', padding='same',name='layer_E6')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)


x = layers.Flatten()(x)


z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(inputs=encoder_inputs, outputs=[z_mean, z_log_var, z], name="encoder")
encoder.summary()

### Decoder

In [None]:
## ------------------ decoder -------------------------
## Entrada Z
latent_inputs = keras.Input(shape=(latent_dim,))

##dimensiones
ndim = 4
nfilts = 64

## anti-flatten de la última capa convolucional
x = layers.Dense(ndim * ndim * nfilts, activation="relu")(latent_inputs)

## Reshape para reconstruir la última convolucional
x = layers.Reshape((ndim,ndim, nfilts))(x)

## Capas convolucionales
x = tf.keras.layers.Conv2D(16, (2, 2), activation='relu', padding='same',name='layer_D1')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (2, 2), activation='relu', padding='same',name='layer_D2')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='relu', padding='same',name='layer_D4')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

decoder_outputs = tf.keras.layers.Conv2D(1, (2, 2), activation='sigmoid', padding='same',name='layer_D7')(x)

## Construcción del decoder
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

### Regressor

In [None]:
## --------------- REGRESOR ---------------------
### Estimador de valores geométricos

## Input layer
regressor_inputs = keras.Input(shape=(32, 32, 1))

# Example of your model layers
x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same', name='layer_R1')(regressor_inputs)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same', name='layer_R2')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (3, 3), activation='relu', padding='same', name='layer_R3')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same', name='layer_R4')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (3, 3), activation='relu', padding='same', name='layer_R5')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same', name='layer_R6')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(8, activation='relu')(x)

# Define the outputs
area_output = tf.keras.layers.Dense(1, activation="linear")(x)
spiculation_output = tf.keras.layers.Dense(1, activation="linear")(x)
lobulation_output = tf.keras.layers.Dense(1, activation="linear")(x)
margin_output = tf.keras.layers.Dense(1, activation="linear")(x)
texture_output = tf.keras.layers.Dense(1, activation="linear")(x)

# Ensure you use the correct input tensor
regressor = keras.Model(regressor_inputs, [area_output, spiculation_output, lobulation_output, margin_output, texture_output], name="regressor")

# Print model summary
regressor.summary()

### Model

In [None]:
class VAE_regressor(keras.Model):
    def __init__(self, encoder, decoder, regressor, **kwargs):
        super(VAE_regressor, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.regressor = regressor
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")        
        self.estimation_loss_tracker = keras.metrics.Mean(name="estimation_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            self.estimation_loss_tracker,            
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            ## Valores estimados por el encoder
            z_mean, z_log_var, z = self.encoder(data[0])
            
                    
            ## Valores geométricos asociados a los datos
            geometrics_features = data[1]
            

            ## Reconstrucción del embebido del encoder
            reconstruction = self.decoder(z)            

            ## Reducción de la reconstrucción
            reconstruction /= tf.reduce_mean(reconstruction)

            ## Valores estimados apartir del regresor            
            estimations = self.regressor(reconstruction)            
            
            
            ## Error de reconstrucción
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data[0], reconstruction  + tf.keras.backend.epsilon()), axis=(1, 2)
                )
            )

            '''
            reconstruction_loss =tf.reduce_mean(
                tf.reduce_sum( tf.keras.losses.MeanSquaredError()(data, reconstruction)
                             )
            )
            '''
            
            ## Error de estimación de valores geométricos
                        
            estimation_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.mean_squared_error(geometrics_features, estimations), axis=(1)
                )
            )
            
            
            ## Error de divergencia KL
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))   

            ## Error total          
            total_loss = reconstruction_loss + kl_loss + estimation_loss
        
        ## Actualización de los gradientes
        grads = tape.gradient(total_loss, self.trainable_weights)
        grads = [tf.clip_by_value(grad, -1.0, 1.0) for grad in grads]

        ### Actualización de los pesos
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)   
        self.estimation_loss_tracker.update_state(estimation_loss)    
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(), 
            "ArithmeticError": self.estimation_loss_tracker.result(),        
        }

In [None]:
vae = VAE_regressor(encoder, decoder,regressor)
opt = tf.optimizers.Adam(learning_rate = learning_rate)
#nll = lambda x , rv_x: -rv_x.log_prob(x)
vae.compile(optimizer=opt)


vae.fit(train_dataset, epochs=epochs, 
                batch_size=batch,
                         callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=10)])

In [None]:
### Guardar modelohttps://www.tensorflow.org/guide/saved_model?hl=es-419
vae.encoder.save('DistVAE_Models/GVAE_encoder_8_only_regressor_1.h5')
vae.decoder.save('DistVAE_Models/GVAE_decoder_8_only_regressor_1.h5')
vae.regressor.save('DistVAE_Models/GVAE_regressor_8_only_regressor_1.h5')

## $\beta$-VAES

### Encoder

In [None]:
###_____________________________ENCODER________________________##
latent_dim = 8

nx, ny = 32, 32

encoder_inputs = keras.Input(shape=(nx, ny, 1))

x = tf.keras.layers.Conv2D(32, (3, 3), activation='tanh', padding='same',name='layer_E1')(encoder_inputs)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (3, 3), activation='tanh', padding='same',name='layer_E2')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='tanh', padding='same',name='layer_E6')(x)
x = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)


x = layers.Flatten()(x)


z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(inputs=encoder_inputs, outputs=[z_mean, z_log_var, z], name="encoder")
encoder.summary()

### Decoder

In [None]:
## ------------------ decoder -------------------------
## Entrada Z
latent_inputs = keras.Input(shape=(latent_dim,))

##dimensiones
ndim = 4
nfilts = 84

## anti-flatten de la última capa convolucional
x = layers.Dense(ndim * ndim * nfilts, activation="relu")(latent_inputs)

## Reshape para reconstruir la última convolucional
x = layers.Reshape((ndim,ndim, nfilts))(x)

## Capas convolucionales
x = tf.keras.layers.Conv2D(16, (2, 2), activation='relu', padding='same',name='layer_D1')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (2, 2), activation='relu', padding='same',name='layer_D2')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='relu', padding='same',name='layer_D4')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

decoder_outputs = tf.keras.layers.Conv2D(1, (2, 2), activation='sigmoid', padding='same',name='layer_D7')(x)

## Construcción del decoder
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
class VAE(keras.Model):
    def __init__(self, encoder, decoder, betha,**kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.betha = betha
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")        

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
        
            z_mean, z_log_var, z = self.encoder(data[0])
            reconstruction = self.decoder(z)
            reconstruction /= tf.reduce_max(reconstruction)
            
            
            reconstruction_loss = tf.abs(tf.reduce_mean(
                tf.reduce_sum(
                   keras.losses.binary_crossentropy(data[0], reconstruction), axis=(1, 2)
                )
            ))
            
            
            '''
            reconstruction_loss =tf.reduce_mean(
                tf.reduce_sum( tf.keras.losses.MeanSquaredError()(data, reconstruction)
                             )
            )
            '''
            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))         
               
            total_loss = reconstruction_loss + self.betha*kl_loss
            
        grads = tape.gradient(total_loss, self.trainable_weights)        
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)       
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),            
        }

In [None]:
bethas = [0.01, 0.1, 1, 10, 100]

for betha in bethas:
    vae = VAE(encoder, decoder, betha)
    opt = tf.optimizers.Adam(learning_rate = learning_rate)
    #nll = lambda x , rv_x: -rv_x.log_prob(x)
    vae.compile(optimizer=opt)


    vae.fit(train_dataset, epochs=epochs, 
                    batch_size=batch,
                            callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=30)])
    
    
    ## save models
    vae.encoder.save('BetaVAE_Models/betha_'+str(betha)+'/GVAE_8_encoder_betha_{}_.h5'.format(betha))
    vae.decoder.save('BetaVAE_Models/betha_'+str(betha)+'/GVAE_8_decoder_betha_{}_.h5'.format(betha))

## GVAE-Weigthed descriptor

In [None]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")        

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
        
            z_mean, z_log_var, z = self.encoder(data[0])
            reconstruction = self.decoder(z)
            #reconstruction /= tf.reduce_max(reconstruction)
            
            z = tf.matmul(z, features, transpose_a=True)
            
            reconstruction_loss = tf.math.abs(tf.reduce_mean(
                tf.reduce_sum(
                   keras.losses.binary_crossentropy(data[0], reconstruction), axis=(1, 2)
                )
            ))
            
            
            '''
            reconstruction_loss =tf.reduce_mean(
                tf.reduce_sum( tf.keras.losses.MeanSquaredError()(data, reconstruction)
                             )
            )
            '''
            
            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))         
               
            total_loss = reconstruction_loss + kl_loss
            
        grads = tape.gradient(total_loss, self.trainable_weights)        
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)       
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),            
        }

In [None]:
## ------------------ decoder -------------------------
## Entrada Z
latent_inputs = keras.Input(shape=(latent_dim,))

##dimensiones
ndim = 4
nfilts = 84

## anti-flatten de la última capa convolucional
x = layers.Dense(ndim * ndim * nfilts, activation="relu")(latent_inputs)

## Reshape para reconstruir la última convolucional
x = layers.Reshape((ndim,ndim, nfilts))(x)

## Capas convolucionales
x = tf.keras.layers.Conv2D(16, (2, 2), activation='relu', padding='same',name='layer_D1')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(8, (2, 2), activation='relu', padding='same',name='layer_D2')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

x = tf.keras.layers.Conv2D(4, (2, 2), activation='relu', padding='same',name='layer_D4')(x)
x = tf.keras.layers.UpSampling2D((2, 2))(x)
x = tf.keras.layers.BatchNormalization()(x)

decoder_outputs = tf.keras.layers.Conv2D(1, (2, 2), activation='sigmoid', padding='same',name='layer_D7')(x)

## Construcción del decoder
decoder_WeightEmbs = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder_WeightEmbs.summary()

In [None]:
vae = VAE(encoder, decoder_WeightEmbs)
opt = tf.optimizers.Adam(learning_rate = learning_rate)
#nll = lambda x , rv_x: -rv_x.log_prob(x)
vae.compile(optimizer=opt)


vae.fit(train_dataset, epochs=epochs, 
                batch_size=batch,
                         callbacks=[tf.keras.callbacks.EarlyStopping(monitor='loss', patience=30)])

In [None]:
### Guardar modelohttps://www.tensorflow.org/guide/saved_model?hl=es-419
vae.encoder.save('VAE_Models/Weight_GVAE_16_encoder.h5')
vae.decoder.save('VAE_Models/Weight_GVAE_16_decoder.h5')