## Common functions for CNN model training


In [1]:
########################################################################
# Module and library imports
########################################################################

# Import custom activity constants
from activity_constants import *


# Garbage collector for memory management
import gc

# Operating system library for file handling
import os

# Numerical computing libraries
import numpy as np
import pandas as pd
import csv

# Image processing utilities from TensorFlow/Keras
from tensorflow.keras.preprocessing.image import array_to_img, img_to_array, load_img

# Data preprocessing tools from scikit-learn
from sklearn import preprocessing

# TensorFlow and Keras libraries for building neural networks
from tensorflow import keras
from tensorflow.keras import backend as K 
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.optimizers import RMSprop

# Visualization libraries
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Utilities for date and time handling
from datetime import datetime

# Metrics computation tools from scikit-learn
from sklearn.metrics import confusion_matrix

2025-06-01 16:36:30.439139: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-06-01 16:36:31.035895: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-06-01 16:36:32.060756: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/lib/oracle/11.2/client64/lib
2025-06-01 16:36:32.060830: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so

In [2]:
########################################################################
# Defining subset region
########################################################################

# Define window width
dataRegionEspNumColumns = 1350

# Define window height
dataRegionEspNumRows = 295

dataRegionEspFromX = 0
dataRegionEspFromY = 0


default_data_training_num_epocs = 5
default_positive_perc_th = 0.9

default_batch_size = 15

# Incidamos que se use cache de lectura de datos
use_cache = False
file_cache = {}

default_global_overview_file_name = "global_overview.csv"

In [3]:
########################################################################
# print_trace: Prints a timestamped log message.
#
# Inputs:
#   - trace: Message string to print.
#
# Returns:
#   - None
########################################################################
def print_trace(trace):
    print ("[", datetime.now().strftime("%m/%d/%Y, %H:%M:%S"), "]:", trace)
    

########################################################################
# get_assay_file_name: Constructs an assay file name from a prefix and assay codes.
#
# Inputs:
#   - prefix: Optional string prefix for the file name.
#   - assay_codes: List of assay codes to include in the file name.
#
# Returns:
#   - Generated assay file name as a string.
########################################################################
def get_assay_file_name (prefix, assay_codes):
    file_name = ""  
    
    if (prefix is not None):
        file_name = file_name + prefix
    
    first_added = False
    for assay_code in assay_codes:
        if (first_added):
            file_name = file_name + "#"
            
        file_name = file_name + str(assay_code)
        
        first_added = True
        
    return file_name

########################################################################
#  get_assay_folder_dataset_name: Generates the full path for an assay 
# dataset folder.
#
# Inputs:
#   - dest_folder: Destination folder path.
#   - prefix: Optional string prefix.
#   - assay_codes: List of assay codes.
#
# Returns:
#   - Complete dataset folder path as a string.
########################################################################
def get_assay_folder_dataset_name (dest_folder, prefix, assay_codes):
    return dest_folder + "/" + get_assay_file_name(prefix, assay_codes)

########################################################################
#  get_assay_activity_file_name: Constructs full CSV file path for 
# assay activities.
#
# Inputs:
#   - dest_folder: Destination folder path.
#   - prefix: Optional prefix for naming.
#   - assay_codes: List of assay codes.
#
# Returns:
#   - Complete CSV file path as a string.
########################################################################
def get_assay_activity_file_name (dest_folder, prefix, assay_codes):
    return get_assay_folder_dataset_name(dest_folder, prefix, assay_codes) + ".csv"   

########################################################################
# get_assay_model_file_name: Constructs full model file path (.h5).
#
# Inputs:
#   - dest_folder: Destination folder path.
#   - prefix: Optional prefix for naming.
#   - assay_codes: List of assay codes.
#
# Returns:
#   - Complete model file path as a string.
########################################################################
def get_assay_model_file_name (dest_folder, prefix, assay_codes):
    return get_assay_folder_dataset_name(dest_folder, prefix, assay_codes) + ".h5"   



In [4]:
######################################################################################
#  get_data_from_file_by_act: Obtencion de datos a partir de los 
# ficheros que se ecuenten en la carpeta input_path
#  Los datos obtenido se catalogarán por la actividad marcada por el 
# parametro activity
#   Inputs:
#    - input_path: Path donde se encuentran los dicheros con los raw data
#    - activity: Matriz de actividades de los raw data pasados
#    - X: Matriz de entrada de entrenamiento (formatted raw data)
#    - y: Matriz de salida de entrenamiento (formatted activities)
#    - fromX: Desplazamiento en el eje X aplicable a la lectura de los raw data
#    - fromY: Desplazamiento sobre el eje Y aplicable a la lectura de los raw data
#    - numCols: Numero de columnas a leer a partir de fromX
#    - numRows: Numero de filas a leer a partir de fromY
######################################################################################
def get_data_from_file_by_act(input_path, activity, X, y, fromX, fromY, numCols, numRows, prefix = None):
    map_rows = [i+fromY for i in range (numRows)]
    map_columns = [i+fromX for i in range (numCols)]

    for e,i in enumerate(os.listdir(input_path)):
        if (prefix is None or i.startswith(prefix)):
            if (use_cache and i in file_cache.keys()):
                x_scaled = file_cache[i]
            else:
                #Leemos el dato desde el fichero
                df = pd.read_csv(input_path + '/' + i,skiprows = 2)

                #Realizamos la normalizaicón para esta muestra
                min_max_scaler = preprocessing.MinMaxScaler(feature_range=(0, 1))
                x_scaled = min_max_scaler.fit_transform(df.values)
                x_scaled = np.expand_dims(x_scaled, axis=2)

                if (use_cache):
                    file_cache[i] = x_scaled

            # Si hay datos en el reango de ventana seleccinado, incorporamos la lectura en la matriz X e y
            if (x_scaled.shape[1] >= fromX+numCols and x_scaled.shape[0] >= fromY+numRows):
                x_scaled_cut = x_scaled[np.ix_(map_rows, map_columns)]
                X += [x_scaled_cut]
                y += [activity]
            #else:
            #    print ("x_scaled.shape[1] ", x_scaled.shape[1])
            #    print ("fromX+numCols ", fromX+numCols)
            #    print ("x_scaled.shape[0] ", x_scaled.shape[0])
            #    print ("fromY+numRows ", fromY+numRows)
     

######################################################################################
#  get_data_from_file: Obtencion de datos a partir de los ficheros que se ecuenten en 
# la carpeta input_path para activos e inactivos
#   Inputs:
#    - input_path: Path donde se encuentran los dicheros con los raw data
#    - fromX: Desplazamiento en el eje X aplicable a la lectura de los raw data
#    - fromY: Desplazamiento sobre el eje Y aplicable a la lectura de los raw data
#    - numCols: Numero de columnas a leer a partir de fromX
#    - numRows: Numero de filas a leer a partir de fromY
#  Return:
#    - X vector: Matriz de entrada del entrenamiento
#    - y vector: Vector de salida del entrenamiento
######################################################################################
def get_data_from_file(input_path, fromX, fromY, numCols, numRows, prefix = None):
    X = []
    y = []

    get_data_from_file_by_act (input_path + '/' + sub_folder_act, 1, X, y, fromX, fromY, numCols, numRows, prefix)
    get_data_from_file_by_act (input_path + '/' + sub_folder_inact, 0, X, y, fromX, fromY, numCols, numRows, prefix)
            
    return np.array(X), np.array(y)

######################################################################################
#  Obtencion de datos a partir de los ficheros que se ecuenten en 
# la carpeta de training, validation y test
#   Inputs:
#    - fromX: Desplazamiento en el eje X aplicable a la lectura de los raw data
#    - fromY: Desplazamiento sobre el eje Y aplicable a la lectura de los raw data
#    - numCols: Numero de columnas a leer a partir de fromX
#    - numRows: Numero de filas a leer a partir de fromY
#    - custom_verbose: 0 = Sin trazas  1 = Con trazas
#  Return:
#    - x_train: Matriz de entrada del entrenamiento
#    - y_train: Vector de salida del entrenamiento
#    - x_val: Matriz de entrada de validacion
#    - y_val: Vector de salida de validacion
#    - x_test: Matriz de entrada de la fase de tests
#    - y_test: Vector de salida de la fase de tests
######################################################################################
def get_data_from_files(dataset_folder, fromX, fromY, numCols, numRows, custom_verbose = 1, prefix = None):
    if (custom_verbose > 0):
        print_trace("Loading " + dataset_folder + "/" + sub_folder_train)
    x_train, y_train = get_data_from_file(dataset_folder + "/" + sub_folder_train, fromX, fromY, numCols, numRows, prefix)
    
    if (custom_verbose > 0):
        print_trace ("Loading " + dataset_folder + "/" + sub_folder_val)
    x_val, y_val = get_data_from_file(dataset_folder + "/" + sub_folder_val, fromX, fromY, numCols, numRows, prefix)
    
    if (custom_verbose > 0):
        print_trace ("Loading " + dataset_folder + "/" + sub_folder_test)
    x_test, y_test = get_data_from_file(dataset_folder + "/" + sub_folder_test, fromX, fromY, numCols, numRows, prefix)
    
    return x_train, y_train, x_val, y_val, x_test, y_test

In [5]:
########################################################################
# precision: Computes the precision metric.
#
# Inputs:
#   - y_true: True labels tensor.
#   - y_pred: Predicted labels tensor.
#
# Returns:
#   - Precision score as a float tensor.
########################################################################
def precision(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision_keras = true_positives / (predicted_positives + K.epsilon())
    
    return precision_keras

########################################################################
# recall: Computes the recall metric.
#
# Inputs:
#   - y_true: True labels tensor.
#   - y_pred: Predicted labels tensor.
#
# Returns:
#   - Recall score as a float tensor.
########################################################################
def recall(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall_keras = true_positives / (possible_positives + K.epsilon())
    return recall_keras

########################################################################
# f1: Computes the F1-score metric.
#
# Inputs:
#   - y_true: True labels tensor.
#   - y_pred: Predicted labels tensor.
#
# Returns:
#   - F1-score as a float tensor.
########################################################################
def f1(y_true, y_pred):
    p = precision(y_true, y_pred)
    r = recall(y_true, y_pred)
    return 2 * ((p * r) / (p + r + K.epsilon()))


In [6]:
########################################################################
# create_model: Creación del modelo de entrenamiento
#   Inputs:
#    - numCols: Numero de columnas de los datos de entrada
#    - numRows: Numero de filas de los datos de entrada
#  Return:
#    - model: Modelo a entrenar
########################################################################
def create_model (numCols
                , numRows
                , custom_verbose = 0
                , p_metrix_list=['acc'
                               , precision
                               , recall
                               , f1                        
                          ]
                , p_conv_extr = [[16, 3], [32, 3], [64,3]]
                , p_dense = [[512, 0.3], [128, 0.2]]
                   ):

    # Nuestro feature map será de numRowsxnumCols
    img_input = layers.Input(shape=(numRows, numCols, 1))

    x = img_input
    for conv_extr in p_conv_extr:        
        print_trace("x = layers.Conv2D({0}, {1}, activation='relu')(img_input) ".format(conv_extr[0], conv_extr[1]))    
        x = layers.Conv2D(conv_extr[0], conv_extr[1], activation='relu')(x)
        x = layers.MaxPooling2D(2)(x)

    # Flatten feature map to a 1-dim tensor
    x = layers.Flatten()(x)
    
    for dense in p_dense:        
        # Create a fully connected layer with ReLU activation and N hidden units
        print_trace("x = layers.Dense({0}, activation='relu')(x) ".format(dense[0]))    
        x = layers.Dense(dense[0], activation='relu')(x)

        if (dense[1] > 0):
            # Add a dropout rate
            print_trace("x = layers.Dropout({0})(x) ".format(dense[1]))    
            x = layers.Dropout(dense[1])(x)
        
    
    # Create output layer with a single node and sigmoid activation
    output = layers.Dense(1, activation='sigmoid')(x)
        
    # Configure and compile the model
    model = Model(img_input, output)
    
    model.compile(loss='binary_crossentropy',
                  optimizer=RMSprop(lr=0.001),
                  metrics=p_metrix_list
                  )
        
    if (custom_verbose == 1):
        model.summary()        
    
    return model

In [7]:
########################################################################
# fit_model: Trains the provided model based on the training data.
# Inputs:
#  - model: Training model
#  - x_train: Input matrix for training
#  - y_train: Output vector for training
#  - x_val: Input matrix for validation
#  - y_val: Output vector for validation
#  - x_test: Input matrix for testing phase
#  - y_test: Output vector for testing phase
#  - custom_verbose: 0 = No logs, 1 = With logs
#  - p_epochs: Number of epochs for training (default = 200)
# Return:
#  - history: Historical data from the training process
#  - score: Results of training evaluation score
########################################################################
def fit_model (model, x_train, y_train, x_val, y_val, x_test, y_test, custom_verbose = 0, p_epochs = 200):
    history = model.fit(x_train, y_train,
                    batch_size=default_batch_size,
                    epochs=p_epochs,
                    verbose=custom_verbose,
                    validation_data=(x_val, y_val))
    score = model.evaluate(x_test, y_test, verbose=custom_verbose)
    
    return history, score

In [8]:
########################################################################
# save_history_logs: Saves training history data logs into CSV files.
#
# Inputs:
#   - history: Training history object from model training.
#   - assay_item_name_full_path: Path and base name for saving logs.
#   - graph_list: List of metrics to save (default: ['acc', 'precision', 'recall', 'f1']).
#
# Returns:
#   - None
########################################################################
def save_history_logs (history
                         , assay_item_name_full_path
                         , graph_list = ['acc', 'precision', 'recall', 'f1']):
    
    for graph in graph_list:         
        if (not history.history[graph] is None):
            with open(assay_item_name_full_path + "##hist_{}.log".format(graph), mode='w', newline='') as csv_file:
                writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                writer.writerow(history.history[graph])
        
########################################################################
# save_history_pictures: Saves plots of training and validation metrics.
#
# Inputs:
#   - history: Training history object from model training.
#   - assay_item_name_full_path: Path and base name for saving plots.
#   - graph_list: List of metrics to plot (default: ['acc', 'precision', 'recall', 'f1']).
#
# Returns:
#   - None
########################################################################
def save_history_pictures (history
                         , assay_item_name_full_path
                         , graph_list = ['acc', 'precision', 'recall', 'f1']):
    
    for graph in graph_list:         
        if (not history.history[graph] is None):
            plt.clf()
            plt.plot(history.history[graph])
            plt.plot(history.history['val_{}'.format(graph)])
            plt.title('model {}'.format(graph))
            plt.ylabel(graph)
            plt.xlabel('epoch')
            plt.legend(['train', 'validation'], loc='lower right')
            plt.savefig(assay_item_name_full_path + "_{}.jpg".format(graph))
   

########################################################################
# create_global_overview_file: Creates and initializes a global overview file.
#
# Inputs:
#   - data_model_folder: Folder path where the file will be saved.
#   - global_overview_file_name: Name of the overview file (default predefined).
#
# Returns:
#   - None
########################################################################
def create_global_overview_file (data_model_folder, global_overview_file_name = default_global_overview_file_name):
    if not os.path.exists(data_model_folder):
        os.makedirs(data_model_folder)
    
    with open(data_model_folder + "/" + global_overview_file_name, mode='w', newline='') as log_file:
        log_file.write("\"name\",\"Gobal ratio\",\"Negative precision\",\"Positive precision\",\"Precision\",\"Recall\",\"F1\",\"True Negatives\",\"False Positives\",\"False Negatives\",\"True Positives\",\"total elements\",\"te_pos\",\"te_neg\",\"Positive recall\",\"Negative recall\"\n") 

        
########################################################################
# save_score_traces: Saves model scoring metrics and confusion matrix.
#
# Inputs:
#   - model: Trained model object.
#   - score: Evaluation scores from testing.
#   - x_test: Test dataset input matrix.
#   - y_test: Test dataset output vector.
#   - data_model_folder: Folder to store score files.
#   - assay_item_name: Identifier for saving the results.
#   - global_overview_file_name: Overview filename (default predefined).
#
# Returns:
#   - None
######################################################################## 
def save_score_traces (model, score, x_test, y_test, data_model_folder, assay_item_name, global_overview_file_name = default_global_overview_file_name):
    assay_item_name_full_path = data_model_folder + "/" + assay_item_name   
    
    y_test_predict = model.predict(x_test)
    y_test_predict = [1 if y > default_positive_perc_th else 0 for y in y_test_predict]    
    cm=confusion_matrix(y_test,y_test_predict)
    
    negative_ratio = (cm[0][0]) / (cm[0][0] +  cm[1][0])
    
    positive_ratio = (cm[1][1]) / (cm[0][1] +  cm[1][1])
    
    with open(assay_item_name_full_path + "##global.log", mode='w', newline='') as log_file:
        log_file.write("Gobal ratio: " + str(score[1]) + "\n") 
        log_file.write("Negative ratio: " +  str(negative_ratio) + "\n") 
        log_file.write("Positive ratio: " +  str(positive_ratio) + "\n") 
        log_file.write(str(score) + "\n") 
        log_file.write(str(cm) + "\n")                 
    
    with open(data_model_folder + "/" + global_overview_file_name, mode='a', newline='') as log_file:
        log_file.write(assay_item_name)
        log_file.write(",")
        log_file.write(str(score[1]))
        log_file.write(",")
        log_file.write(str(negative_ratio))
        log_file.write(",")
        log_file.write(str(positive_ratio))
        log_file.write(",")
        
        log_file.write(str(score[2]))
        log_file.write(",")
        log_file.write(str(score[3]))
        log_file.write(",")
        log_file.write(str(score[4]))
        log_file.write(",")

        log_file.write(str(cm[0][0]))
        log_file.write(",")
        log_file.write(str(cm[0][1]))
        log_file.write(",")
        log_file.write(str(cm[1][0]))
        log_file.write(",")
        log_file.write(str(cm[1][1]))
        log_file.write(",")
        
        log_file.write(str(cm[0][0]+cm[0][1]+cm[1][0]+cm[1][1]))
        log_file.write(",")
        
        total_pos = cm[1][0]+cm[1][1]
        total_neg = cm[0][0]+cm[0][1]
        log_file.write(str(total_pos))
        log_file.write(",")
        log_file.write(str(total_neg))
        log_file.write(",")

        log_file.write(str(cm[1][1]/total_pos))
        log_file.write(",")
        log_file.write(str(cm[0][0]/total_neg))

        log_file.write("\n")
    
########################################################################
# train_assay_model: Manages the full training pipeline including dataset
# loading, model training, evaluation, and saving results.
#
# Inputs:
#   - dataset_folder: Root folder for datasets.
#   - assay_codes: Specific assay identifiers.
#   - data_model_folder: Directory to save trained model and logs.
#   - dataset: Pre-loaded datasets as (x_train, y_train, x_val, y_val, x_test, y_test).
#   - custom_verbose: Verbosity control (0 = silent, 1 = detailed logs).
#   - prefix: Optional dataset naming prefix.
#   - p_epochs: Training epochs (default predefined).
#   - p_metrix_list: List of metrics to evaluate.
#   - p_assay_item_name: Optional override name for the assay item.
#   - p_conv_extr: Convolutional layers configuration.
#   - p_dense: Dense layers configuration.
#
# Returns:
#   - None
########################################################################
def train_assay_model (dataset_folder
                     , assay_codes
                     , data_model_folder
                     , dataset
                     , custom_verbose = 0
                     , prefix = None
                     , p_epochs = default_data_training_num_epocs
                     , p_metrix_list=['acc'
                               , precision
                               , recall
                               , f1                        
                          ]
                      , p_assay_item_name = None 
                      , p_conv_extr = [[16, 3], [32, 3], [64,3]]
                      , p_dense = [[512, 0.3], [128, 0.2]]                       
                      ):
    
    assay_item_name = ""
    if (p_assay_item_name is None):
        #assay_item_name_full_path = get_assay_folder_dataset_name(data_model_folder, prefix_model_file, assay_codes)  
        assay_item_name = get_assay_file_name(prefix_model_file, assay_codes)
    else:
        #assay_item_name_full_path = data_model_folder + "/" + p_assay_item_name   
        assay_item_name = p_assay_item_name
    
    assay_item_name_full_path = data_model_folder + "/" + p_assay_item_name   
    
    gc.collect()
    K.clear_session()    
    
    print_trace("Training dataset for assays " + str(assay_codes))    
    
    #Creates the model
    print_trace("Creating model")
    model = create_model(dataRegionEspNumColumns
                       , dataRegionEspNumRows
                       , p_metrix_list = p_metrix_list
                       , p_conv_extr = p_conv_extr
                       , p_dense = p_dense)
    
    #Getting datasets
    dataset_folder = get_assay_folder_dataset_name(dataset_folder, prefix_dataset_folder, assay_codes)    
    print_trace("Getting datasets from " + dataset_folder)
    x_train = dataset[0]
    y_train = dataset[1]
    x_val = dataset[2]
    y_val = dataset[3]
    x_test = dataset[4]
    y_test = dataset[5]
    #x_train, y_train, x_val, y_val, x_test, y_test = get_data_from_files (dataset_folder, dataRegionEspFromX, dataRegionEspFromY, dataRegionEspNumColumns, dataRegionEspNumRows, custom_verbose, prefix)
    
    #Fit the model
    print_trace("Fitting the model for " + str(x_train.shape))
    history, score = fit_model (model
                              , x_train
                              , y_train
                              , x_val
                              , y_val
                              , x_test
                              , y_test
                              , custom_verbose
                              , p_epochs
                              )
        
    #Saving the model
    destination_model = assay_item_name_full_path + '.h5'
    print_trace("Saving model in  " + destination_model)
    model.save(destination_model)

    #Saving traiing traces
    save_history_pictures (history, assay_item_name_full_path)
    save_history_logs (history, assay_item_name_full_path)
    save_score_traces (model, score, x_test, y_test, data_model_folder, assay_item_name)    
    
    print_trace("Model creation finished")  
    
    print_trace("Clear keras session")      
    K.clear_session()    
    
    print_trace("Purge memory for model")      
    del model
    
    print_trace("Purge memory for x_train")  
    #del x_train
    
    print_trace("Purge memory for y_train")  
    #del y_train
    
    print_trace("Purge memory for x_val")  
    #del x_val
    
    print_trace("Purge memory for y_val")  
    #del y_val
    
    print_trace("Purge memory for x_test")  
    #del x_test
    
    print_trace("Purge memory for y_test")  
    #del y_test    
    
    print_trace("Recollect free memory")      
    gc.collect()
    