<a href="https://colab.research.google.com/github/luciasalmeron/TFM_Ingenieria_Biomedica_y_Salud_Digital/blob/main/ResNet50_con_Albumentations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#1. Montaje de Google Drive y Preparación del Entorno

In [None]:
from google.colab import drive

# Monta Google Drive
drive.mount('/content/drive')

In [None]:
# Instalación de paquetes (solo usar en Colab o Jupyter si es necesario)
!pip install split-folders

# Procesamiento de imágenes y augmentación
import albumentations as A
from PIL import Image
import numpy as np

# TensorFlow y Keras
from tensorflow.keras.applications.resnet50 import preprocess_input
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping

# Visualización
import matplotlib.pyplot as plt

# Manejo de archivos
import os
import zipfile
import shutil
!pip install split-folders
!pip install keras-tuner

# Utilidades
import pandas as pd
import splitfolders
import time
from keras_tuner import HyperModel
from keras_tuner.tuners import RandomSearch


!pip install albumentations
!pip install tensorflow

#3. Descompresión del Dataset

In [None]:
!cp "/content/drive/My Drive/imagenes520.zip" "/content/imagenes520.zip"

zip_path = "/content/imagenes520.zip"  # Ruta del archivo ZIP
extract_path = "/content/imagenes520"  # Carpeta de destino

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

#4. Definición de Variables de Entrenamiento

In [None]:
## Training variables ##
INPUT_SIZE = 224
BATCH_SIZE = 32  # size of the readed batches from generator, must fit on memory
VAL_SPLIT = 0.15  # fraction of the images used for validation
TEST_SPLIT = 0.15  # fraction of the images used for testing
EPOCHS = 20

#5. Clasificación de Imágenes según Etiquetas

In [None]:
# Leer el archivo Excel
df = pd.read_csv("/content/drive/My Drive/challenge-training_metadata.csv", sep=";")

# Crear las carpetas "benigno" y "maligno" si no existen
BASE_DATASET = '/content/imagenes520/imagenes520/imagenes_clasificadas'   # Carpeta base con las carpetas "benigno" y "maligno"
benigno_folder = BASE_DATASET+'/clasificadas_benigno/'
maligno_folder = BASE_DATASET+'/clasificadas_maligno/'

SPLITTED_DATASET = 'splitted_dataset'  # Carpeta para los datos divididos
SAVE_MODELS_PATH = '/content/drive/My Drive/trained_models'  # Carpeta para guardar los modelos
image_folder = '/content/imagenes520/imagenes520'



# Crear las carpetas si no existen
os.makedirs(benigno_folder, exist_ok=True)
os.makedirs(maligno_folder, exist_ok=True)

# Contadores
benigno_count = 0
maligno_count = 0
no_encontrado_count = 0

# Clasificar las imágenes
for index, row in df.iterrows():
    image_name = row['isic_id'] + '.jpg'
    label = row['diagnosis_1']
    image_path = os.path.join(image_folder, image_name)

    if os.path.exists(image_path):
        if label == 'Benign':
            shutil.move(image_path, os.path.join(benigno_folder, image_name))
            benigno_count += 1
        elif label == 'Malignant':
            shutil.move(image_path, os.path.join(maligno_folder, image_name))
            maligno_count += 1
    else:
        #print(f"Imagen no encontrada: {image_name}")
        no_encontrado_count += 1

    # Mostrar progreso en tiempo real
    print(f"Movidas - Benigno: {benigno_count}, Maligno: {maligno_count}, No encontradas: {no_encontrado_count}")

# Resumen final
print("\nProceso finalizado.")
print(f"Total movidas a Benigno: {benigno_count}")
print(f"Total movidas a Maligno: {maligno_count}")
print(f"Total imágenes no encontradas: {no_encontrado_count}")


#6. División del Dataset en Train/Val/Test

In [None]:
# Dividir el dataset en entrenamiento, validación y prueba
if not os.path.exists(SPLITTED_DATASET):
    splitfolders.ratio(BASE_DATASET, output=SPLITTED_DATASET, seed=123, ratio=(1 - VAL_SPLIT - TEST_SPLIT, VAL_SPLIT, TEST_SPLIT))

#7. Construcción del Modelo ResNet50 Base

In [None]:
base_model = ResNet50(weights='imagenet', include_top=False)
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dense(1, activation='sigmoid')(x)  # Para clasificación binaria
model = Model(inputs=base_model.input, outputs=x)

for layer in base_model.layers:
    layer.trainable = False  # Congelar las capas preentrenadas de ResNet50


#8. Augmentación con Albumentations

In [None]:
transform = A.Compose([
    A.RandomCrop(width=256, height=256),
    A.HorizontalFlip(),
    A.RandomBrightnessContrast(),
    A.Rotate(limit=45),
    A.Normalize(mean=[0, 0, 0], std=[1, 1, 1], always_apply=True)
])


In [None]:
def load_and_preprocess_image(image_path):
    img = image.load_img(image_path, target_size=(256, 256))  # Cambia el tamaño según sea necesario
    img_array = image.img_to_array(img)
    augmented = transform(image=img_array)
    img_array = augmented['image']
    img_array = np.expand_dims(img_array, axis=0)
    return img_array


#9. Conteo de Imágenes por Conjunto

In [None]:
def count_images_in_folders(base_path):
    folder_counts = {}
    for folder in os.listdir(base_path):
        folder_path = os.path.join(base_path, folder)
        if os.path.isdir(folder_path):
            num_images = len([f for f in os.listdir(folder_path) if f.lower().endswith(('png', 'jpg', 'jpeg', 'bmp', 'gif'))])
            folder_counts[folder] = num_images
    return folder_counts

training_path = 'splitted_dataset/train'
validation_path = 'splitted_dataset/val'
test_path = 'splitted_dataset/test'

training_counts = count_images_in_folders(training_path)
validation_counts = count_images_in_folders(validation_path)
test_counts = count_images_in_folders(test_path)

#10. Cálculo de Pesos de Clases

In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
# Definir clases
classes = np.array([0, 1])  # 0 = Benigno, 1 = Maligno


num_benign = training_counts["clasificadas_benigno"]
num_malignant = training_counts["clasificadas_maligno"]

# Calcular los pesos de cada clase
class_weights = compute_class_weight(class_weight="balanced", classes=classes,
                                     y=np.concatenate([np.zeros(num_benign), np.ones(num_malignant)]))

# Convertir a diccionario
class_weights = {i: weight for i, weight in enumerate(class_weights)}

print("Pesos de clase:", class_weights)

print("Training set image counts:")
for folder, count in training_counts.items():
    print(f"{folder}: {count} images")

print("\nValidation set image counts:")
for folder, count in validation_counts.items():
    print(f"{folder}: {count} images")

print("\nTest set image counts:")
for folder, count in test_counts.items():
    print(f"{folder}: {count} images")

#11. Generadores de Imágenes

In [None]:
# Generadores de imágenes

def preprocess(images):
      # Using the preprocess function of the selected model
      # To ensure the new data is in the same format as the original data the model was trained on
      return preprocess_input(images)

seed=123

train_datagen = ImageDataGenerator(fill_mode='wrap',
                                    preprocessing_function=preprocess)

train_generator = train_datagen.flow_from_directory(
    training_path,
    target_size=(INPUT_SIZE, INPUT_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary'
)

validation_generator = train_datagen.flow_from_directory(
    validation_path,
    target_size=(INPUT_SIZE, INPUT_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary'
)

test_datagen = ImageDataGenerator(
    fill_mode='wrap',
    preprocessing_function=preprocess
)

test_generator = test_datagen.flow_from_directory(
    test_path,  # ← asegúrate de definir esta variable correctamente
    target_size=(INPUT_SIZE, INPUT_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False  # Muy importante para evaluar correctamente
)


#12. Compilación y Entrenamiento del Modelo

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping
from tensorflow.keras.metrics import AUC, Precision, Recall

# EarlyStopping: Detener el entrenamiento si no hay mejora en la precisión de validación
early_stopping = EarlyStopping(monitor='val_accuracy',  # Monitorear la precisión de validación
                               patience=5,             # Número de épocas sin mejora antes de detener
                               restore_best_weights=True)  # Restaurar los mejores pesos

# CSVLogger: Guardar el historial de entrenamiento en un archivo CSV
csv_logger = CSVLogger('/content/drive/My Drive/trained_models/training_log.csv', append=True)


model.compile(optimizer=Adam(learning_rate=0.0001),
              loss='binary_crossentropy',
              metrics=['accuracy', AUC(), Precision(), Recall()])

history = model.fit(
    train_generator,
    epochs=10,
    batch_size=32,
    validation_data=validation_generator,
    callbacks=[early_stopping, csv_logger]  # Agrega los callbacks aquí
)


  self._warn_if_super_not_called()


Epoch 1/10
[1m1260/1260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9238s[0m 7s/step - accuracy: 0.8913 - auc: 0.9084 - loss: 0.2593 - precision: 0.6928 - recall: 0.5522 - val_accuracy: 0.9097 - val_auc: 0.9414 - val_loss: 0.2107 - val_precision: 0.7396 - val_recall: 0.6568
Epoch 2/10
[1m1260/1260[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6s/step - accuracy: 0.9128 - auc: 0.9433 - loss: 0.2069 - precision: 0.7692 - recall: 0.6383

#13. Búsqueda de Hiperparámetros con Keras Tuner

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from keras_tuner import HyperModel

class HyperResNet(HyperModel):
    def build(self, hp):
        base_model = ResNet50(weights='imagenet', include_top=False)
        x = base_model.output
        x = GlobalAveragePooling2D()(x)

        # Hiperparámetro: número de unidades en la capa densa
        units = hp.Int('dense_units', min_value=512, max_value=2048, step=512)
        x = Dense(units, activation='relu')(x)

        # Hiperparámetro: aplicar BatchNormalization o no
        if hp.Boolean('batch_norm'):
            x = BatchNormalization()(x)

        # Hiperparámetro: tasa de Dropout
        dropout_rate = hp.Float('dropout_rate', min_value=0.0, max_value=0.5, step=0.1)
        x = Dropout(dropout_rate)(x)

        x = Dense(1, activation='sigmoid')(x)

        # Hiperparámetro: learning rate
        lr = hp.Float('lr', min_value=1e-5, max_value=1e-3, sampling='LOG')

        model = Model(inputs=base_model.input, outputs=x)
        model.compile(optimizer=Adam(learning_rate=lr),
                      loss='binary_crossentropy',
                      metrics=['accuracy'])
        return model


tuner = RandomSearch(HyperResNet(), objective='val_accuracy', max_trials=5)
tuner.search(train_generator, epochs=10, validation_data=validation_generator)


#14. Guardado de Métricas y Resultados

In [None]:
import pandas as pd

# Convertir el historial de métricas en un DataFrame de pandas
history_df = pd.DataFrame(history.history)

# Guardar las métricas en un archivo CSV en tu Google Drive
history_csv_path = '/content/drive/My Drive/trained_models/resnet50_full_metrics.csv'
history_df.to_csv(history_csv_path, index=False)
print(f"Métricas completas guardadas en: {history_csv_path}")



15. Evaluación y Matriz de Confusión

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np

# Predecir en el conjunto de test
y_true = test_generator.classes  # Las verdaderas etiquetas
y_pred = model.predict(test_generator)  # Las predicciones del modelo

# Convertir las probabilidades a clases (0 o 1) usando un umbral de 0.5
y_pred_classes = (y_pred > 0.5).astype("int32")

# Calcular la matriz de confusión
conf_matrix = confusion_matrix(y_true, y_pred_classes)

# Guardar la matriz de confusión en un archivo CSV
conf_matrix_df = pd.DataFrame(conf_matrix, columns=['Predicted Benign', 'Predicted Malignant'],
                              index=['True Benign', 'True Malignant'])

conf_matrix_path = '/content/drive/My Drive/trained_models/confusion_matrix.csv'
conf_matrix_df.to_csv(conf_matrix_path)
print(f"Matriz de confusión guardada en: {conf_matrix_path}")


#16. Visualización del Progreso de Entrenamiento

In [None]:
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.legend()
plt.show()


# 17. Guardado del Modelo y del Historial

In [None]:
# Ruta donde quieres guardar el modelo en tu Google Drive
model_save_path = '/content/drive/My Drive/trained_models/resnet50_albumentations.h5'

# Guarda el modelo completo (estructura y pesos)
model.save(model_save_path)
print(f"Modelo guardado en: {model_save_path}")
# Convertir el historial a un DataFrame de pandas
hist_df = pd.DataFrame(history.history)

# Guardar como CSV en Drive
hist_csv_path = '/content/drive/My Drive/trained_models/resnet50_albumentations_history.csv'
hist_df.to_csv(hist_csv_path, index=False)
print(f"Historial de entrenamiento guardado en: {hist_csv_path}")


18. Evaluación Final del Modelo

In [None]:
# Evaluar el modelo en el conjunto de test
test_loss, test_accuracy = model.evaluate(test_generator)
print(f"Test accuracy: {test_accuracy:.4f}, Test loss: {test_loss:.4f}")

# Guardar resultados en CSV
test_results_df = pd.DataFrame({'Test Accuracy': [test_accuracy], 'Test Loss': [test_loss]})
test_results_path = '/content/drive/My Drive/trained_models/resnet50_test_results.csv'
test_results_df.to_csv(test_results_path, index=False)
print(f"Resultados de test guardados en: {test_results_path}")


#19. Mejores Hiperparámetros del Tuner

In [None]:
# Obtener el mejor modelo y los mejores hiperparámetros
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Crear un diccionario de los mejores hiperparámetros
best_hps_values = {param: best_hps.get(param) for param in best_hps.values}

# Guardar los hiperparámetros óptimos como un archivo CSV
hparams_df = pd.DataFrame([best_hps_values])
hparams_path = '/content/drive/My Drive/trained_models/best_hyperparams_resnet50.csv'
hparams_df.to_csv(hparams_path, index=False)

# Confirmar que se guardaron los hiperparámetros
print(f"Hiperparámetros óptimos guardados en: {hparams_path}")
