In [5]:
import matplotlib.pyplot as plt
import tensorflow as tf
from  tensorflow.keras import models, layers, losses, optimizers, activations, regularizers
from  sklearn.model_selection import train_test_split
from  tensorflow.keras.layers import Input, BatchNormalization, Conv1D, MaxPooling1D, Dropout
from  tensorflow.keras.layers import LeakyReLU
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error,mean_absolute_percentage_error,r2_score

In [6]:
class CNN_Model:

    def __init__(self, num_samples, input_volume,filters = 5, kernel_size = 5, pool_size = 2 ,use_bias = False):
        
        self.model = models.Sequential()
        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_1', use_bias=use_bias, input_shape=(num_samples, input_volume)))
        self.model.add(BatchNormalization(name='norm_1'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_1'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_1'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_2', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_2'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_2'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_2'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_3', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_3'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_3'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_3'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same',name='conv_4', use_bias=use_bias))
        self.model.add(BatchNormalization(name = 'norm_4'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_4'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_4'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_5', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_5'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_5'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_5'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_6', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_6'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_6'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_6'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same',name='conv_7', use_bias=use_bias))
        self.model.add(BatchNormalization(name = 'norm_7'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_7'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_7'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_8', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_8'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_8'))
        self.model.add(MaxPooling1D(pool_size=pool_size, name='pool_8'))

        self.model.add(Conv1D(filters=filters, kernel_size=kernel_size, padding='same', name='conv_9', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_9'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_9'))
        
        self.model.add(layers.Flatten())

        self.model.add(layers.Dense(units=200, activation=activations.linear, name='dense_5', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_dense_5'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_dense_5'))

        self.model.add(layers.Dense(units=50, activation=activations.linear, name='dense_6', use_bias=use_bias))
        self.model.add(BatchNormalization(name='norm_dense_6'))
        self.model.add(LeakyReLU(alpha=0.1, name='leaky_relu_dense_6'))

        self.model.add(layers.Dense(units=2, activation=activations.linear, name='dense_7', use_bias=use_bias))

    def get_model(self, learning_rate=0.001, decay=0.0001):
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate=learning_rate,
            decay_steps=5850,  
            decay_rate=decay
        )
        self.model.compile(optimizer=optimizers.Adam(learning_rate=lr_schedule), loss=losses.mean_squared_error)
        return self.model


In [7]:
class Evaluator:
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred

    def normalized_mean_squared_error(self):
        err_pmus = np.array(self.y_true)
        err_pmus_hat = np.array(self.y_pred)
        nmse = np.sqrt(np.sum((err_pmus - err_pmus_hat)**2))/np.sqrt(np.sum((err_pmus - np.average(err_pmus))**2))
        return nmse
    
    def r_square(self):
        r2 = r2_score(self.y_true, self.y_pred)
        return r2



    def evaluate(self):
        mse = mean_squared_error(self.y_true, self.y_pred)
        mae = mean_absolute_error(self.y_true, self.y_pred)
        nmse = self.normalized_mean_squared_error()
        r2 = self.r_square()
        mape = mean_absolute_percentage_error(self.y_true, self.y_pred)*100

        return mse, mae, nmse, mape, r2



In [8]:
class BlandAltman:
    def __init__(self, output_data_unscaled, pre_data_unscaled):
        self.output_data_unscaled = output_data_unscaled
        self.pre_data_unscaled = pre_data_unscaled

    def plot(self, x_label, start, xlim_range,start_y,ylim_range):
        # Convert the values to numpy arrays
        output_data = np.asarray(self.output_data_unscaled)
        pre_data = np.asarray(self.pre_data_unscaled)

        # Calculate the difference and mean
        diff = output_data - pre_data
        mean = (output_data + pre_data) / 2

        # Calculate the percentage difference between predictions and targets
        percentage_diff = (diff / mean) * 100

        # Calculate the mean and standard deviation of the percentage difference
        mean_diff = np.mean(percentage_diff)
        std_diff = np.std(percentage_diff)

        overestimate_indices = []
        underestimate_indices = []
        # Find indices of outliers
        outlier_indices = []

        for i, percentage_error in enumerate(percentage_diff):
            if percentage_error > 20:  # overestimate
                outlier_indices.append(i)
                overestimate_indices.append(i)
            elif percentage_error < -20:  # underestimate
                outlier_indices.append(i)
                underestimate_indices.append(i)

        # Plot the Bland-Altman plot with percentage difference
        fig, ax = plt.subplots(figsize=(8, 5.5))
        for i in range(len(percentage_diff)):
            if i in outlier_indices:
                ax.scatter(output_data[i], percentage_diff[i], color='red', s=15)
            else:
                ax.scatter(output_data[i], percentage_diff[i], color='blue', s=15)
        ax.axhline(mean_diff, color='gray', linestyle='--',lw = 2)
        ax.axhline(mean_diff + 1.96 * std_diff, color='gray', linestyle='--',lw = 2)
        ax.axhline(mean_diff - 1.96 * std_diff, color='gray', linestyle='--',lw = 2)
        ax.axhline(20, color='green', linestyle='-',lw = 2.5)
        ax.axhline(-20, color='green', linestyle='-',lw = 2.5)
        ax.set_xlabel(x_label, fontsize=14)
        ax.set_ylabel('Percentage Difference between Predictions and Targets', fontsize=11)
        ax.set_title('Bland-Altman Plot', fontsize=16)
        ax.tick_params(axis='both', which='major', labelsize=12)
        ax.set_xlim(start, xlim_range)
        ax.set_ylim(start_y, ylim_range)
        #yticks = np.arange(start_y, ylim_range + 1, 10)  
        #plt.yticks(yticks)
        ax.set_yticks([-40,-20, 0, 20, 40])
        plt.show()

        # Print indices of outliers
        print("Indices of outliers: ", outlier_indices)
        print("Indices of overestimations: ", overestimate_indices)
        print("Indices of underestimations: ", underestimate_indices)
        return outlier_indices, overestimate_indices, underestimate_indices


In [None]:
def normalize_data(data, minimum = None,maximum = None):
    if minimum is None:
        minimum = np.min(np.min(data))
    if maximum is None:
        maximum = np.max(np.max(data))
    data_norm = (data - minimum) / (maximum - minimum)
    return minimum, maximum, data_norm


def denormalize_data(data, minimum, maximum):
    return data * (maximum - minimum) + minimum
