In [None]:
import sys
sys.path.append('../src')

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras import Sequential, Input, layers
from keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow_addons.metrics import FBetaScore
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout, Input, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall
import tensorflow as tf
# from tensorflow.keras.metrics import Recall, Precision

from utils.data import get_best_slice, select_tomo_ids, get_csv_from_bq
from utils.render_motor_position import get_motor_coordinates, get_slice_file_path
from ml_logic.interface import train_classification


In [2]:
path_train_csv= '../data/csv_raw/train_labels.csv'
# path_image= '../data/pictures_raw/train'
path_image = '../data/pictures_process/adaptequal_1_padded'


df = pd.read_csv(path_train_csv).copy()


In [3]:
# pd.serie with list of tomo (tomo_2dd6bd)
tomogram_id= select_tomo_ids(df,
                             number_of_slices= list(df['Array_shape_axis_0'].unique()) #[500, 800, 600, 300, 400, 494]
                             )


In [4]:
# convert the img in array, ready for input the model

def load_images_by_ids(folder_path,
                       tomo_ids,
                       color_mode='grayscale',
                       target_size=None
                       ):

    images = []
    folder = Path(folder_path)

    for tomo_id in tomo_ids:
        img_file = folder / f"{tomo_id}.jpg"
        if img_file.exists():
            img = load_img(img_file, color_mode=color_mode, target_size=target_size)
            img_array = img_to_array(img) / 255.0
            images.append(img_array)
        else:
            print(f"[⚠️] Imagen no encontrada: {img_file}")

    return np.array(images)


In [5]:
# X and y
X = load_images_by_ids(path_image, tomogram_id)

df_filtered = df[df['tomo_id'].isin(tomogram_id)]
y = df_filtered['Number_of_motors']


In [6]:
# train - Val - test

random_state=42

X_train, X_test, y_train, y_test = train_test_split(X, y,
                                                    test_size=0.1,
                                                    random_state=random_state)

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train,
                                                  test_size=0.1,
                                                  random_state=random_state)

In [7]:

def init_model_densenet(X):
    original_input_shape = X.shape[1:]

    # Forzar input a 3 canales si es gris (1 canal)
    if original_input_shape[-1] == 1:
        base_input_shape = original_input_shape[:-1] + (3,)
    else:
        base_input_shape = original_input_shape

    # Cargar modelo base DenseNet121 sin la capa de clasificación final
    base_model = DenseNet121(include_top=False,
                             weights='imagenet',
                             input_shape=base_input_shape
                             )
    base_model.trainable = False

    # Input layer
    inputs = Input(shape=original_input_shape)

    # Convertir a RGB si es necesario
    if original_input_shape[-1] == 1:
        x = Lambda(lambda x: tf.image.grayscale_to_rgb(x))(inputs)
    else:
        x = inputs


    x = base_model(x, training=False)
    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.5)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)

    model = Model(inputs, outputs)

    model.compile(
        loss='binary_crossentropy',
        optimizer=Adam(learning_rate=1e-4),
        metrics=[
            'accuracy',
            FBetaScore(num_classes=1, average='micro', beta=2.0),
            Recall()
        ]
    )

    return model


In [None]:
model_densenet = init_model_densenet(X_train)
history_densenet = train_classification(model=model_densenet,
                                        X_train = X_train,
                                        y_train = y_train,
                                        X_val = X_val,
                                        y_val = y_val,
                                        model_type = 'classification',
                                        preprocess_type ='adaptequela_1',
                                        model_name= 'model_densenet121',
                                        batch_size= 16,
                                        patience = 2
                                        )


In [None]:
model_densenet.evaluate(X_test,y_test)

In [None]:
history_densenet.history

In [None]:
# Fonction of grafic all the metrics and its val's
def plot_training_metrics(history):
    """
   plot the metrics, even if there is not one of them into 'history'
    """
    history_dict = history.history
    plotted = False

    def safe_plot(metric_name, color, linestyle='-'):
        nonlocal plotted
        if metric_name in history_dict:
            label = metric_name.replace('_', ' ')
            plt.plot(history_dict[metric_name], c=color, linestyle=linestyle, label=label)
            plotted = True

    # fbeta
    safe_plot('fbeta_score', 'r')
    safe_plot('val_fbeta_score', 'r', '--')

    # recall
    safe_plot('recall', 'g')
    safe_plot('val_recall', 'g', '--')
    safe_plot('recall_1', 'g')  # alternativa por si usa nombre automático
    safe_plot('val_recall_1', 'g', '--')

    # accuracy
    safe_plot('accuracy', 'black')
    safe_plot('val_accuracy', 'black', '--')

    # precision
    safe_plot('precision', 'blue')
    safe_plot('val_precision', 'blue', '--')

    if plotted:
        plt.title("Training Metrics")
        plt.xlabel("Epoch")
        plt.ylabel("Metric Value")
        plt.legend()
        plt.grid(True)
        plt.show()
    else:
        print("⚠️ No se encontró ninguna métrica conocida para graficar.")

# Ejemplo de uso
plot_training_metrics(history_densenet)

In [None]:
from sklearn.metrics import fbeta_score

y_pred= model_densenet.predict(X_test )
y_pred
# fbeta_score(y_test, y_pred, beta=2, average='binary')
