In [None]:
import tensorflow as tf
from keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

from sklearn.model_selection import train_test_split

import wandb
from wandb.integration.keras import WandbMetricsLogger

from tools import load_images_with_labels, calculate_metrics, evaluate_model

# Obtener imágenes y etiquetas de un directorio

In [None]:
path = '../data/burn_images/'
width = 540
height = 960

In [None]:
nombre_canales = ["green", "blue"]

X, y, n_canales = load_images_with_labels(path, channels=nombre_canales)
print(X.shape, y.shape)

# Separar los datos en entrenamiento y validación

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.20, random_state=42)

print('Shape of X_train:', X_train.shape)
print('Shape of X_val:', X_val.shape)
print('Shape of y_train:', y_train.shape)
print('Shape of y_val:', y_val.shape)

# Función para hiperparametros
Esta función recibe un parámetro y una lista de parámetros y se van probando uno por uno en el modelo

In [None]:
parametros_base = {
    'conv_layers': 3,
    'filters_layer_1': 32,
    'filters_layer_2': 32,
    'filters_layer_3': 64,
    'kernel_size': 3,
    'strides': 2,
    'dense_layers': 1,
    'dense_units_1': 64,
    'dense_units_2': 64,
    'dense_units_3': 64,
    'dropout_rate': 0.4,
    'batch_size': 32
}

In [None]:
def cambiar_parametro(diccionario, parametro, lista_parametros):
    """
    Cambia el valor de un parámetro en un diccionario y genera un nombre de ejecución.
    Args:
        diccionario (dict): Diccionario que contiene los parámetros del modelo.
        parametro (str): Clave del parámetro que se desea cambiar.
        lista_parametros (list): Lista de valores que se asignarán al parámetro.
    Yields:
        tuple: Un nombre de ejecución (str) y el diccionario actualizado (dict).
    Si el parámetro no se encuentra en el diccionario, imprime un mensaje de error y retorna None.
    """
    for valor in lista_parametros:
        if parametro in diccionario:
            diccionario[parametro] = valor
        else:
            print(f'El parametro {parametro} no se encuentra en el diccionario')
            return None
        
        nombre_run = (f"cl:{diccionario['conv_layers']}, " 
        + f"fl1:{diccionario['filters_layer_1']}, ")
        
        if diccionario['conv_layers'] >= 2: 
            nombre_run = nombre_run + f"fl2:{diccionario['filters_layer_2']}, "
        if diccionario['conv_layers'] >= 3: 
            nombre_run = nombre_run + f"fl3:{diccionario['filters_layer_3']}, "

        nombre_run = (nombre_run + f"ks:{diccionario['kernel_size']}, "
            + f"st:{diccionario['strides']}, "
            + f"dl:{diccionario['dense_layers']}, "
            + f"du1:{diccionario['dense_units_1']}, ")
        
        if diccionario['dense_layers'] >= 2:
            nombre_run = nombre_run + f"du2:{diccionario['dense_units_2']}, "  
        if diccionario['dense_layers'] >= 3:
            nombre_run = nombre_run + f"du3:{diccionario['dense_units_3']}, "  

        nombre_run = (nombre_run + f"dr:{diccionario['dropout_rate']}, "
            + f"bs:{diccionario['batch_size']}")

        yield nombre_run, diccionario

In [None]:
# Crear generador de nombres y diccionarios
generador_parametros = cambiar_parametro(parametros_base, 'batch_size', [32])

### A partir de aquí correr para probar los demás valores de la lista

In [None]:
nombre_run, parametros = next(generador_parametros)
print(nombre_run)

# Crear Red neuronal

In [None]:
#Primera capa convolucional (se agrega input_shape)
arquitectura = [layers.Conv2D(parametros["filters_layer_1"], 
                              kernel_size=(parametros["kernel_size"], parametros["kernel_size"]), 
                              strides=(parametros["strides"], parametros["strides"]),
                              input_shape=(height, width, n_canales)),
                layers.BatchNormalization(),
                layers.Activation('relu'),
                layers.MaxPooling2D(pool_size=(2, 2))]

#Capas convolucionales restantes
if parametros['conv_layers'] > 1:
    arquitectura += [layers.Conv2D(parametros["filters_layer_2"], 
                                   kernel_size=(parametros["kernel_size"], parametros["kernel_size"]),
                                   strides=(parametros["strides"], parametros["strides"])),
                    layers.BatchNormalization(),
                    layers.Activation('relu'),
                    layers.MaxPooling2D(pool_size=(2, 2))]

    if parametros['conv_layers'] > 2:
        arquitectura += [layers.Conv2D(parametros["filters_layer_3"], 
                                       kernel_size=(parametros["kernel_size"], parametros["kernel_size"]),
                                       strides=(parametros["strides"], parametros["strides"])),
                        layers.BatchNormalization(),
                        layers.Activation('relu'),
                        layers.MaxPooling2D(pool_size=(2, 2))]
            
#GlobalAveragePooling2D
arquitectura += [layers.GlobalAveragePooling2D()]

#Capas densas
arquitectura += [layers.Dense(parametros["dense_units_1"]),
                layers.Activation('relu'),
                layers.Dropout(parametros["dropout_rate"])]

if parametros['dense_layers'] > 1:
    arquitectura += [layers.Dense(parametros["dense_units_2"]),
                    layers.Activation('relu'),
                    layers.Dropout(parametros["dropout_rate"])]

    if parametros['dense_layers'] > 2:
        arquitectura += [layers.Dense(parametros["dense_units_3"]),
                        layers.Activation('relu'),
                        layers.Dropout(parametros["dropout_rate"])]

#Capa de salida
arquitectura += [layers.Dense(1),
                layers.Activation('sigmoid')]

model = tf.keras.Sequential(
    arquitectura
)

In [None]:
model.summary()

In [None]:
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Crear experimento en wandb

In [None]:
# Se crea el experimento en wandb
configuracion = {
                    'conv_layers': parametros["conv_layers"],
                    'filters_layer_1': parametros["filters_layer_1"],
                    'kernel_size': parametros["kernel_size"],
                    'strides': parametros["strides"],
                    'dense_layers': parametros["dense_layers"],
                    'dense_units_1': parametros["dense_units_1"],
                    'dropout_rate': parametros["dropout_rate"],
                    'batch_size': parametros["batch_size"]
                }

if parametros["conv_layers"] > 1:
    configuracion['filters_layer_2'] = parametros["filters_layer_2"]
    if parametros["conv_layers"] > 2:
        configuracion['filters_layer_3'] = parametros["filters_layer_3"]

if parametros["dense_layers"] > 1:
    configuracion['dense_units_2'] = parametros["dense_units_2"]
    if parametros["dense_layers"] > 2:
        configuracion['dense_units_3'] = parametros["dense_units_3"]

run = wandb.init(project="CNN_quemaduras_2", 
                 entity="frantorres14",
                 name="_".join(nombre_canales),
                 config=configuracion)

config = wandb.config
wandb_logger = WandbMetricsLogger(config)

# Se entrena el modelo

In [None]:
#Detiene el entrenamiento cuando el modelo deja de mejorar
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10, #Espera 10 épocas sin mejora antes de detener
    restore_best_weights=True #Restaura los pesos de la época con mejor val_loss
)

#Reduce el learning rate cuando el modelo se estanca
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss', 
    factor=0.5, #Reduce el learning rate a la mitad cuando no hay mejora
    patience=5, #Espera 5 épocas sin mejora antes de reducir
    min_lr=1e-7 #Learning rate mínimo permitido
)

#Entrena el modelo
history = model.fit(X_train, y_train,
                    epochs=100,
                    batch_size=config.batch_size,
                    validation_data=(X_val, y_val),
                    callbacks=[WandbMetricsLogger(), early_stopping, reduce_lr])

# Métricas de evaluación

In [None]:
CM_train, accuracy_train, precision_train, recall_train, f1_train = calculate_metrics(model, X_train, y_train)
evaluate_model(model, X_train, y_train, dataset='Train')

In [None]:
CM_val, accuracy_val, precision_val, recall_val, f1_val = calculate_metrics(model, X_val, y_val)
evaluate_model(model, X_val, y_val, dataset='Validation')

In [None]:
# Registrar métricas en wandb
wandb.log({"accuracy_train": accuracy_train,
           "precision_train": precision_train,
           "recall_train": recall_train,
           "f1_train": f1_train,
           "accuracy_val": accuracy_val,
           "precision_val": precision_val,
           "recall_val": recall_val,
           "f1_val": f1_val})

# Termina el experimento
run.finish()

### Hiperparámetros a probar

'conv_layers': [1, 2, 3]  
'filters_layer_k': [16, 32, 64]  
'kernel_size': [3, 5]  
'strides': [1, 2, 3]  
'dense_layers': [1, 2, 3]  
'dense_units_k': [32, 64, 128]  
'dropout_rate': [0.3, 0.4, 0.5]  
'batch_size': [16, 32, 64]  