In [None]:
#Libraries import
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import Layer, Input, Conv2D, BatchNormalization, LeakyReLU, AveragePooling2D, Flatten, Dropout, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.losses import BinaryFocalCrossentropy
from sklearn.metrics import f1_score as f1_s
from sklearn.metrics import confusion_matrix, classification_report
import gc
import sys

### Data loading

In [2]:
#loading data from source
data = np.load('../data/matrices.npz')
X = data['matriz_a']  #X values 
y = data['matriz_b']  #y values

In [3]:
#Split of data in a train, validation, and test sets. Observe the use of non-random to care for the time-series overlapping.
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.15, shuffle=False)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, shuffle=False)

In [None]:
#step to convert each set in valid tensors.
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
y_train = tf.convert_to_tensor(y_train, dtype=tf.float32)
X_val = tf.convert_to_tensor(X_val, dtype=tf.float32)
y_val = tf.convert_to_tensor(y_val, dtype=tf.float32)

### PINNs definition section

In [5]:
# Define a set of trainable parameters for each physics rule.
# These lists contain trainable TensorFlow variables (l1 and l2) for 12 different physics rules.
# Each variable is initialized to 1.0 and is of type float32.
# The 'trainable=True' argument ensures that these variables will be optimized during the training process.
# The 'name' argument assigns a unique name to each variable, using the format "l1_i" and "l2_i",
# where 'i' is the index of the physics rule.
l1 = [tf.Variable(initial_value=1.0, dtype=tf.float32, trainable=True, name=f"l1_{i}") for i in range(12)]
l2 = [tf.Variable(initial_value=1.0, dtype=tf.float32, trainable=True, name=f"l2_{i}") for i in range(12)]

In [6]:
# Define the sigmoid activation function.
# This function computes the sigmoid of the input tensor 'z'.
# The sigmoid function is given by the formula: 1 / (1 + exp(-z)).
# It maps any real-valued number into the range (0, 1).
def sigmoid(z):
    """
    Parameters:
    z (tensor): Input tensor.
    Returns:
    tensor: The sigmoid of the input tensor.
    """
    return 1 / (1 + tf.exp(-z))

In [7]:
# Define the normalize function.
# This function scales the input pairs (alpha, beta) based on the maximum absolute value
# among the elements in each pair. The purpose of this function is to normalize
# the inputs to a common scale to improve numerical stability during computation.
# The maximum absolute value between x and y is computed and used as the scaling factor.
# Additionally, a small constant epsilon is added to avoid division by zero.
def normalize(alpha, beta):
    """
    Parameters:
    alpha (tensor): First input tensor.
    beta (tensor): Second input tensor.
    
    Returns:
    tuple: A tuple containing the normalized tensors (x, y).
    """
    max_value = tf.maximum(tf.abs(alpha), tf.abs(beta))
    return alpha / (max_value + tf.keras.backend.epsilon()), beta / (max_value + tf.keras.backend.epsilon())

In [8]:
# Define the slope_penalizer function.
# This function penalizes the slopes of the input pairs (alpha, beta).
# It first normalizes the input pairs using the normalize function.
# Then, it computes the penalization condition based on the absolute values of the
# trainable parameters (l1[i] and l2[i]) and applies the sigmoid function to these values.
# The penalization condition is rounded to obtain binary values (0 or 1).
# Finally, the penalization value is determined based on the penalization condition:
# if the condition is greater than 0, the penalization value is set to 1.0, otherwise to 0.0.
def slope_penalizer(alpha, beta, i):
    x_norm, y_norm = normalize(alpha, beta)
    penalization_condition = sigmoid(-tf.abs(l1[i]) * x_norm) * sigmoid(tf.abs(l2[i]) * y_norm)
    penalization_condition = tf.round(penalization_condition)
    penalization_value = tf.where(penalization_condition > 0, 1.0, 0.0)
    return penalization_value

In [9]:
class PhysicRules(tf.keras.layers.Layer):
    def __init__(self):
        super(PhysicRules, self).__init__()

    # Define the forward pass (call method) for the Physics loss function.
    # This method takes in the inputs alpha, beta, and y_pred.
    # It applies a series of penalizer terms calculated by the slope_penalizer function
    # for each pair (alpha[i], beta[i]).
    # The maximum penalizer term across all pairs is computed and expanded to match the shape of y_pred.
    # The y_pred tensor is binarized using a threshold of 0.5.
    # The final loss is calculated as the product of the maximum penalizer term and the binarized y_pred.
    def call(self, alpha, beta, y_pred):
        """
        Parameters:
        alpha (tensor): First input tensor.
        beta (tensor): Second input tensor.
        y_pred (tensor): Predicted output tensor.
        
        Returns:
        tensor: The calculated physics-informed loss.
        """
        penalized_terms = [slope_penalizer(alpha[i], beta[i], i) for i in range(len(alpha))]
        reg_max = tf.reduce_max(tf.stack(penalized_terms, axis=0), axis=0)
        reg_max = tf.expand_dims(reg_max, axis=-1)  # Expandir dims para hacer compatible con y_pred
        y_pred = tf.where(y_pred >= 0.5, 1.0, 0.0)
        loss_ = reg_max * y_pred
        return loss_

In [10]:
# Define the calculate_slope function.
# This function calculates the mean slopes of the input tensor over time.
# Inputs:
# - inputs: A tensor of shape (batch_size, timesteps, features).
# The function computes the differences between consecutive timesteps,
# then divides these differences by the respective time intervals.
# Finally, it averages the slopes over time for each feature and each sample in the batch.
def calculate_slope(inputs):
    """
    Parameters:
    inputs (tensor): Input tensor of shape (batch_size, timesteps, features).
    
    Returns:
    tensor: The mean slopes calculated from the input tensor over time.
    """
    timesteps = tf.shape(inputs)[1]
    diffs = inputs[:, 1:, :] - inputs[:, :-1, :]
    time_intervals = tf.range(1, timesteps, dtype=tf.float32)
    time_intervals = tf.reshape(time_intervals, (1, -1, 1))
    slopes = diffs / time_intervals
    mean_slopes = tf.reduce_mean(slopes, axis=1)
    return mean_slopes

In [11]:
physic_rules = PhysicRules()  # Create an instance of the PhysicRules class to use its methods and encapsulate its state.

In [12]:
# Define the PhysicLoss class, which extends the tf.keras.losses.Loss class.
# This class implements a custom loss function for a neural network incorporating
# physics-informed constraints. The loss function combines a binary cross-entropy
# loss with a physics-based penalty term.
class PhysicLoss(tf.keras.losses.Loss):
    def __init__(self, name="SAGLoss", lambda_factor=1):
        """
        Parameters:
        name (str): Name of the loss function.
        lambda_factor (float): Factor to scale the physics-based penalty term.
        """
        super(PhysicLoss, self).__init__(name=name)
        self.bce = BinaryFocalCrossentropy()
        self.calculate_slope = calculate_slope
        self.lambda_factor = lambda_factor

    # Calculate the gradients (slopes) of the input tensor.
    # The gradients are used to determine the physics-based penalty terms.
    def getGradients(self, inputs):
        """
        Parameters:
        inputs (tensor): Input data for the model.
        
        Returns:
        tuple: A tuple containing lists of alpha and beta gradients.
        """
        gradients = self.calculate_slope(inputs)
        alphas = [
            gradients[:, 4],
            gradients[:, 6],
            gradients[:, 0],
            gradients[:, 3],
            gradients[:, 0],
            gradients[:, 1],
            gradients[:, 4],
            gradients[:, 2]
        ]
        betas = [
            gradients[:, 3],
            gradients[:, 3],
            gradients[:, 1],
            gradients[:, 7],
            gradients[:, 2],
            gradients[:, 2],
            gradients[:, 7],
            gradients[:, 4]
        ]
        rules = [
            [1, 1],  #up & down (default relation) 1
            [1, 1],  #up & down 2
            [-1, -1],  #down & up 3
            [-1, -1],  #down & up 4
            [-1, 1],  #down & down 5
            [1, 1],  #up & down 6
            [1, -1],  #up & up 7
            [1, -1]  #up & up 8
        ]
        for i in range(len(rules)):
            alphas[i] *= rules[i][0]
            betas[i] *= rules[i][1]

        return alphas, betas

    # Calculate the physics-based loss.
    # This loss penalizes violations of the physics-based constraints.
    def getPhysicsLoss(self, inputs, y_pred):
        """
         Parameters:
         inputs (tensor): Input data for the model.
         y_pred (tensor): Predicted output tensor.
        
         Returns:
         tensor: The calculated physics-based loss.
         """
        alphas, betas = self.getGradients(inputs)
        penalized_max_loss_terms = physic_rules(alphas, betas, y_pred)
        penalized_max_loss_terms = tf.reduce_mean(penalized_max_loss_terms)
        return penalized_max_loss_terms

    # Return all components of the loss for logging or analysis.
    def getAllLosses(self):
        """
        Returns:
        tuple: A tuple containing the total loss, focal loss, and physics loss.
        """
        return self.total_loss, self.focal_loss, self.physic_loss

    # Calculate the total loss, which is a combination of the focal loss and the physics-based loss.
    def getTotalLoss(self, inputs, y_true, y_pred):
        """
        Parameters:
        inputs (tensor): Input data for the model.
        y_true (tensor): True labels corresponding to the input data.
        y_pred (tensor): Predicted output tensor.
        
        Returns:
        tensor: The total loss for this training step.
        """
        focal_loss = self.bce(y_true, y_pred)
        physic_loss = self.getPhysicsLoss(inputs, y_pred)
        total_loss = focal_loss + self.lambda_factor * physic_loss
        self.total_loss = total_loss
        self.focal_loss = focal_loss
        self.physic_loss = physic_loss
        return total_loss

    # Define the call method, which computes the total loss given the actual data and the predictions.
    def call(self, actual_data, y_pred):
        """
        Parameters:
        actual_data (tuple): A tuple containing the input data and the true labels.
        y_pred (tensor): Predicted output tensor.
        
        Returns:
        tensor: The total loss for this training step.
        """
        inputs, y_true = actual_data
        total_loss = self.getTotalLoss(inputs, y_true, y_pred)
        return total_loss

In [13]:
pinn_loss = PhysicLoss("PhysicLoss")  # Create an instance of the PhysicLoss class to use its custom loss function.

### Gramm matrices definition section

In [14]:
# Define the CalculateGramMatrix class, which extends the tf.keras.layers.Layer class.
# This class computes the Gram matrix based on the input pairs of angles.
class CalculateGramMatrix(Layer):
    def __init__(self, **kwargs):
        """
        Parameters:
        pairs_n (list): List of pairs of indices used to calculate angular differences.
        """
        acceptable_kwargs = {k: v for k, v in kwargs.items() if
                             k in ['name', 'trainable', 'dtype', 'dynamic', 'input_shape']}
        super(CalculateGramMatrix, self).__init__(**acceptable_kwargs)
        self.pairs_n = kwargs.get('pairs_n',
                                  [[4, 3], [6, 4], [6, 3], [3, 5], [0, 1], [3, 7], [5, 7], [0, 2], [1, 2], [4, 7],
                                   [2, 4]])

    # Define the forward pass (call method) for the CalculateGramMatrix layer.
    # This method takes in the inputs and computes the Gram matrix based on angular differences.
    def call(self, inputs):
        """
        Parameters:
        inputs (tensor): Input tensor.
        
        Returns:
        tensor: The Gram matrix calculated from the angular differences.
        """
        angles = tf.math.acos(inputs)

        matrix = []
        for i, j in self.pairs_n:
            angles_j1 = angles[:, :, i]  # Angles of the first feature in the pair.
            angles_j2 = angles[:, :, j]  # Angles of the second feature in the pair.

            #expand the matrices to allow the next add operations
            angles_j1_expanded = tf.expand_dims(angles_j1, axis=2)
            angles_j2_expanded = tf.expand_dims(angles_j2, axis=1)

            # Calculate the angular difference matrices (30x30)
            angular_difference_matrix = angles_j1_expanded - angles_j2_expanded
            matrix.append(angular_difference_matrix)
        m = tf.stack(matrix, axis=-1)
        return m

    # Define the get_config method to serialize the layer configuration.
    def get_config(self):
        """
        Returns:
        dict: Configuration dictionary for serializing the layer.
        """
        config = super().get_config()
        config.update({'pairs_n': self.pairs_n})
        return config


### Temporal filter definition section

In [15]:
# Define the CustomTemporalFilter class, which extends the tf.keras.layers.Layer class.
# This class applies a custom temporal filter to the input tensor.
class CustomTemporalFilter(Layer):
    def __init__(self, filter_size, **kwargs):
        """
        Parameters:
        filter_size (int): Size of the filter to be applied.
        """
        super(CustomTemporalFilter, self).__init__(**kwargs)
        self.filter_size = filter_size

    # Define the get_config method to serialize the layer configuration.
    def get_config(self):
        """
        Returns:
        dict: Configuration dictionary for serializing the layer.
        """
        config = super(CustomTemporalFilter, self).get_config()
        config.update({
            "filter_size": self.filter_size
        })
        return config

    # Build the layer by creating an filter that decreases from the bottom-right
    # corner to the top-left corner.
    def build(self, input_shape):
        """
        Parameters:
        input_shape (tensor): Shape of the input tensor.
        """
        # Create a 2D matrix where each element is the value of its normalized index.
        # This creates a gradient that decreases towards the top-left corner.
        x = tf.linspace(1.0, 0.0, self.filter_size)
        y = tf.linspace(1.0, 0.0, self.filter_size)
        X, Y = tf.meshgrid(x, y)
        self.filter = 1.0 - ((X + Y) / 2.0)  # Normalize to have values from 0 to 1
        self.filter = tf.reshape(self.filter, (1, self.filter_size, self.filter_size, 1))
        self.filter = tf.cast(self.filter, dtype='float32')

    # Define the forward pass (call method) for the CustomTemporalFilter layer.
    # This method applies the custom temporal filter to the input tensor.
    def call(self, inputs):
        """
        Parameters:
        inputs (tensor): Input tensor.
        
        Returns:
        tensor: The input tensor after applying the custom temporal filter.
        """
        # Adjust the filter to match the batch size and number of channels of the inputs.
        filter_broadcasted = tf.tile(self.filter, [tf.shape(inputs)[0], 1, 1, tf.shape(inputs)[-1]])

        # Apply the filter to the inputs.
        return inputs * filter_broadcasted


### Model definition

In [16]:
# Define the function to create the model.
# This function builds a neural network model with specified numbers of convolutional and dense layers.
# The model includes a CalculateGramMatrix layer, a CustomTemporalFilter layer,
# and dynamically adds convolutional and dense layers based on the given parameters.
def create_model(num_conv_layers, num_dense_layers, num_neurons, dropout_rate, conv_filters):
    """
    Parameters:
    num_conv_layers (int): Number of convolutional layers to add.
    num_dense_layers (int): Number of dense layers to add.
    num_neurons (int): Number of neurons in the first dense layer.
    dropout_rate (float): Dropout rate to apply after each dense layer.
    conv_filters (int): Number of filters in the first convolutional layer.
    
    Returns:
    tf.keras.Model: The constructed neural network model.
    """
    input_layer = Input(shape=(30, 8), name="Input")
    m = CalculateGramMatrix(name="Gram_converter")(input_layer)
    m = CustomTemporalFilter(filter_size=30, name="Temporal_filter")(m)

    # Add convolutional layers dynamically
    for i in range(num_conv_layers):
        print(f'conv_filters: {conv_filters * (2 ** i)}')
        m = Conv2D(filters=conv_filters * (2 ** i), kernel_size=(3, 3), use_bias=False, kernel_initializer='he_normal')(
            m)
        m = BatchNormalization()(m)
        m = LeakyReLU(alpha=0.01)(m)
        m = AveragePooling2D(pool_size=(2, 2))(m)

    c = Flatten(name="Flattened_after_full")(m)

    # Add dense layers dynamically
    for j in range(num_dense_layers):
        print(f'num_neurons: {num_neurons // (2 ** j)}')
        c = Dense(num_neurons // (2 ** j), activation='relu', kernel_initializer='he_normal')(c)
        c = Dropout(dropout_rate)(c)
        c = BatchNormalization()(c)
    output_layer = Dense(1, activation='sigmoid', name="Output")(c)
    model = Model(inputs=input_layer, outputs=output_layer)
    return model

In [17]:
# Define the function to delete the model and clear the session.
# This function clears the current TensorFlow session, deletes the model and optimizer variables,
# and performs garbage collection to free up memory.
def del_model():
    """
    Parameters:
    None
    
    Returns:
    None
    """
    tf.keras.backend.clear_session()
    try:
        del model
        del optimizer
    except:
        None
    for i in range(15):
        gc.collect

In [18]:
# Define the F1Score class, which extends the tf.keras.metrics.Metric class.
# This class calculates the F1 score, which is the harmonic mean of precision and recall.
class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        """        
        Parameters:
        name (str): Name of the metric.
        kwargs (dict): Additional keyword arguments.
        """
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = Precision()
        self.recall = Recall()

    # Update the state of the metric with the true and predicted values.
    def update_state(self, y_true, y_pred, sample_weight=None):
        """        
        Parameters:
        y_true (tensor): True labels.
        y_pred (tensor): Predicted labels.
        sample_weight (tensor, optional): Optional weighting of each example.
        
        Returns:
        None
        """
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    # Compute the result of the metric.
    def result(self):
        """
        Returns:
        tensor: The computed F1 score.
        """
        precision = self.precision.result()
        recall = self.recall.result()
        return 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))

    # Reset the states of the precision and recall metrics.
    def reset_states(self):
        """  
        Returns:
        None
        """
        self.precision.reset_states()
        self.recall.reset_states()

In [19]:
# Perform a single training step.
# This function computes the model predictions, calculates the custom PINN loss,
# computes gradients, and applies these gradients to update the model's weights.
@tf.function
def train_step(inputs, y_true, model, optimizer):
    """
    Parameters:
    inputs (tensor): Input data for the model.
    y_true (tensor): True labels corresponding to the input data.
    model (tf.keras.Model): The model to be trained.
    optimizer (tf.keras.optimizers.Optimizer): The optimizer to use for updating the model's weights.
    
    Returns:
    tensor: The total loss for this training step.
    """
    with tf.GradientTape() as tape:
        y_pred = model(inputs, training=True)
        total_loss = pinn_loss((inputs, y_true), y_pred)  # Compute the custom PINN loss
    grads = tape.gradient(total_loss, model.trainable_variables)  # Calculate gradients
    optimizer.apply_gradients(zip(grads, model.trainable_variables))  # Apply gradients to update model weights
    return total_loss  # Return the total loss for this training step

In [20]:
# These parameters are the result of a grid search.
# The grid search was conducted to find the optimal hyperparameters
# for the neural network model.
params = {
    'num_conv_layers': 2,
    'num_dense_layers': 3,
    'num_neurons': 1024,
    'dropout_rate': 0.3,
    'lambda_factor': 1.0,  # Put in 0.0 to deactivate PINN loss calculation
    'conv_filters': 64  # Different base numbers of filters for the convolutional layers
}

In [21]:
model = create_model(
    num_conv_layers=params['num_conv_layers'],
    num_dense_layers=params['num_dense_layers'],
    num_neurons=params['num_neurons'],
    dropout_rate=params['dropout_rate'],
    conv_filters=params['conv_filters']
)

conv_filters: 64
conv_filters: 128
num_neurons: 1024
num_neurons: 512
num_neurons: 256


In [22]:
# Initialize the evaluation metrics and loss function.
# These metrics and loss function will be used to assess the performance of the model.

precision_metric = Precision()  # Precision metric to measure the proportion of true positives among all positive predictions
recall_metric = Recall()  # Recall metric to measure the proportion of true positives among all actual positives
bce = BinaryFocalCrossentropy()  # Binary Focal Crossentropy loss function to handle class imbalance
f1_metric = F1Score()  # F1 Score metric to combine precision and recall into a single measure


### Training stage

In [23]:
# Set up the training parameters.
patience = 200  # Number of epochs with no improvement after which training will be stopped
epochs = 5000  # Total number of epochs to train the model
cont_patience = 0  # Counter to keep track of epochs with no improvement
min_patience = 0  # Variable to store the minimum patience value observed
best_model = None  # Variable to store the best model observed during training
batch_size = 2048  # Size of the mini-batches used during training
del_model()  # Clear any existing models and free up memory
lambda_factor = params['lambda_factor']  # Retrieve the lambda factor from the parameters dictionary


In [24]:
# Initialize the optimizer and build it with the model's trainable variables.
optimizer = tf.keras.optimizers.Adam()  # Use Adam optimizer for training
optimizer.build(model.trainable_variables)  # Build the optimizer with the trainable variables of the model


In [25]:
improvement_threshold = 10e-4  # Decimal precision for replacing the better model

for epoch in range(epochs):
    for step in range(0, len(X_train), batch_size):
        X_batch = X_train[step:step + batch_size]
        y_batch = y_train[step:step + batch_size]
        loss_value = train_step(X_batch, y_batch, model, optimizer)  # Perform a training step

    y_pred_val = model(X_val)  # Predict on the validation set
    total_loss = pinn_loss((X_val, y_val), y_pred_val)  # Compute the total loss on the validation set
    _, bce_loss, physics_loss = pinn_loss.getAllLosses()  # Get the individual loss components

    f1_metric.update_state(y_val, y_pred_val)  # Update the F1 metric with the validation predictions
    f1_score = f1_metric.result().numpy()  # Calculate the F1 score
    f1_metric.reset_states()  # Reset the F1 metric state for the next epoch
    # Check if the current F1 score is better than the previous best
    if (tf.abs(f1_score - min_patience) < improvement_threshold) or (f1_score > min_patience):
        print(
            f"\rEpoch {epoch}, IMPROVED <total_loss: {total_loss.numpy():.4f}> <bce_loss: {bce_loss.numpy():.4f}> <physics_loss: {physics_loss.numpy():.4f}> {min_patience:.4f}->{f1_score:.4f} ({cont_patience + 1})     ")
        cont_patience = 0  # Reset patience counter
        if f1_score > min_patience:
            best_model = model  # Update the best model
            min_patience = f1_score
    else:
        cont_patience += 1  # Increment patience counter
        print(
            f"\rEpoch {epoch}, NOT IMPROVED <total_loss: {total_loss.numpy():.4f}> <bce_loss: {bce_loss.numpy():.4f}> <physics_loss: {physics_loss.numpy():.4f}> {min_patience:.4f}->{f1_score:.4f} ({cont_patience})    ")
        if cont_patience > patience:  # Check if patience threshold is exceeded
            break

#Output interpretation:
    #Epoch [EPOCH ID], [IMPROVED/NOT IMPROVED MESSAGE] <total_loss: [TOTAL_LOSS_VALUE]> <bce_loss: [FOCAL_LOSS_VALUE]> <physics_loss: [PHYSIC_LOSS_VALUE]> [BETTER_F1_SCORE]->[CURRENT_F1_SCORE] ([PATIENCE_COUNT])


2024-06-20 18:30:34.948129: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:442] Loaded cuDNN version 8700
2024-06-20 18:30:36.797887: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f59ad69c690 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2024-06-20 18:30:36.797919: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 3070 Ti, Compute Capability 8.6
2024-06-20 18:30:36.801618: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:269] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-06-20 18:30:36.863818: I ./tensorflow/compiler/jit/device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 0, IMPROVED <total_loss: 0.1115> <bce_loss: 0.1115> <physics_loss: 0.0000> 0.0000->0.0000 (1)     
Epoch 1, IMPROVED <total_loss: 0.1237> <bce_loss: 0.1237> <physics_loss: 0.0000> 0.0000->0.0000 (1)     
Epoch 2, IMPROVED <total_loss: 0.0781> <bce_loss: 0.0776> <physics_loss: 0.0005> 0.0000->0.0348 (1)     
Epoch 3, IMPROVED <total_loss: 0.0692> <bce_loss: 0.0676> <physics_loss: 0.0016> 0.0348->0.1160 (1)     
Epoch 4, NOT IMPROVED <total_loss: 0.0667> <bce_loss: 0.0659> <physics_loss: 0.0008> 0.1160->0.0907 (1)    
Epoch 5, IMPROVED <total_loss: 0.0663> <bce_loss: 0.0652> <physics_loss: 0.0011> 0.1160->0.1219 (2)     
Epoch 6, IMPROVED <total_loss: 0.0644> <bce_loss: 0.0633> <physics_loss: 0.0011> 0.1219->0.1271 (1)     
Epoch 7, IMPROVED <total_loss: 0.0592> <bce_loss: 0.0546> <physics_loss: 0.0046> 0.1271->0.3128 (1)     
Epoch 8, IMPROVED <total_loss: 0.0563> <bce_loss: 0.0509> <physics_loss: 0.0054> 0.3128->0.3404 (1)     
Epoch 9, IMPROVED <total_loss: 0.0576> <bce_loss: 0.

### Testing stage

In [33]:
y_pred = model.predict(X_test)  # Predict on the test set
y_pred = np.where(y_pred >= 0.5, 1, 0)  # Binarize the predictions based on a threshold of 0.5




In [34]:
cm = confusion_matrix(y_test, y_pred)  # Compute the confusion matrix
cr = classification_report(y_test, y_pred)  # Generate the classification report

print(cm)  # Print the confusion matrix
print(cr)  # Print the classification report

[[3347   19]
 [  25  305]]
              precision    recall  f1-score   support

         0.0       0.99      0.99      0.99      3366
         1.0       0.94      0.92      0.93       330

    accuracy                           0.99      3696
   macro avg       0.97      0.96      0.96      3696
weighted avg       0.99      0.99      0.99      3696



In [35]:
tn, fp, fn, tp = confusion_matrix(y_test,
                                  y_pred).ravel()  # Extract true negatives, false positives, false negatives, and true positives from the confusion matrix


In [36]:
# Function to calculate Sensitivity
def calculate_sensitivity(tp, fn):
    """
    Parameters:
    tp (int): Number of true positives.
    fn (int): Number of false negatives.
    
    Returns:
    float: The sensitivity (recall) value.
    """
    return tp / (tp + fn)


In [37]:

# Function to calculate Specificity
def calculate_specificity(tn, fp):
    """
    Parameters:
    tn (int): Number of true negatives.
    fp (int): Number of false positives.
    
    Returns:
    float: The specificity value.
    """
    return tn / (tn + fp)

In [38]:
# Calculate F1 score
f1 = f1_s(y_test, y_pred)

# Calculate Sensitivity (Recall)
sensitivity = tp / (tp + fn)

# Calculate Specificity
specificity = tn / (tn + fp)

# Calculate Recall
recall = tp / (tp + fn)

# Calculate Precision
precision = tp / (tp + fp)

# Calculate F1 Score
f1 = 2 * ((recall * precision) / (recall + precision))

# Print results
print("Sensitivity or Recall:", sensitivity)
print("Specificity:", specificity)
print("Precision:", precision)
print("F1 Score:", f1)


Sensitivity or Recall: 0.9242424242424242
Specificity: 0.9943553178847296
Precision: 0.941358024691358
F1 Score: 0.9327217125382261


In [32]:
model.save('../models/last_cnn_pinn_sag_overload.h5')  # Save the final trained model to the specified path