# PLS model to predict NTP composition during IVT reaction

Using FTIR and a pH sensor, we can monitor the NTP composition and concentration during the IVT reaction.

<div style="text-align: center;">
  <img src="../assets/pls_gp_ntp_monitoring.png" alt="PLS model" width="400"/>
</div>

In [None]:
from scipy.signal import savgol_filter
import pandas as pd
import numpy as np
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import mean_squared_error, r2_score
import os
from matplotlib import pyplot as plt
import matplotlib.cm as cm


# Function to apply Standard Normal Variate (SNV)
def standard_normal_variate(X):
    """
    Apply Standard Normal Variate (SNV) transformation row-wise.
    """
    X_snv = (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True)
    return X_snv


CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
CONSIDER_REPLICATES = True
REMOVE_OUTLIERS = False
VISUALISE_PREPROCESSED_DATA = True
VISUALISE_RESULTS = False

# Get the data and drop the wavenumber column
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Get the data and drop the wavenumber column
data = pd.read_csv(
    os.path.join(
        CURRENT_DIR, "wiz-app", "data_sets", "240403 4 NTP combinations Graph Data.csv"
    )
)
wavenumber = data["Wavenumber (1/cm)"].values
data.drop(columns=["Wavenumber (1/cm)"], inplace=True)

# Create the X and Y arrays
X_list = []
Y_list = []
last_col_name = ""
for col in data.columns:
    if "Unnamed" not in col:
        X_list.append(data[col].values.tolist())
        Y_list.append([float(val) for val in col.replace("'", ".").strip().split(" ")])
        last_col_name = col
    else:
        if last_col_name and CONSIDER_REPLICATES:
            X_list.append(data[col].values.tolist())
            Y_list.append(
                [
                    float(val)
                    for val in last_col_name.replace("'", ".").strip().split(" ")
                ]
            )
X = np.array(X_list)
Y = np.array(Y_list)

# generate a dataframe with the clean X and Y data and save it
# pd.DataFrame(data=np.hstack((X, Y))).to_csv(
#     "cleaned_data.csv", index=False
# )


if VISUALISE_RESULTS:
    df = pd.DataFrame(data={f"sample {i}": X[i, :] for i in range(X.shape[0])})
    df.plot()
    plt.show()

# Apply Savitzky-Golay filter row-wise
X_savgol = savgol_filter(X, window_length=11, polyorder=3, axis=1)

# Apply Standard Normal Variate (SNV) row-wise
X_snv = standard_normal_variate(X_savgol)


X = X_snv

if REMOVE_OUTLIERS:
    for i in range(3):
        max_diff = [0, 0]
        for i in range(X.shape[0]):
            sample = X[i, :]
            diff = sample.max() - sample.min()
            if diff > max_diff[0]:
                max_diff[0] = diff
                max_diff[1] = i
        X = np.delete(X, max_diff[1], axis=0)
        Y = np.delete(Y, max_diff[1], axis=0)

if VISUALISE_PREPROCESSED_DATA:
    df = pd.DataFrame(data={f"sample {i}": X[i, :] for i in range(X.shape[0])})
    df.plot()
    plt.title("Raw Samples")
    plt.show()

    # Create a new figure
    fig, ax = plt.subplots()

    # Get a colormap
    cmap = cm.get_cmap("viridis")  # or any other colormap

    # Plot each sample with color based on sample number
    for i in range(X.shape[0]):
        color = cmap(i / X.shape[0])
        ax.plot(wavenumber, X[i, :], color=color)

    plt.title("Samples After Processing")
    plt.show()


# Split the data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)

# Create a PLS model
pls = PLSRegression()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_best = PLSRegression(n_components=best_n_components)
pls_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred = pls_best.predict(X_test)

# Inverse transform the predictions and the test targets to original scale
Y_pred_original = Y_pred
Y_test_original = Y_test

# Calculate mean squared error and root mean squared error for the predictions
mse = mean_squared_error(Y_test_original, Y_pred_original)
rmse = np.sqrt(mse)
# Calculate R-squared for the predictions
r2 = r2_score(Y_test_original, Y_pred_original)


# Print results
print(f"R-squared: {r2}")
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

# Visualise the results
if VISUALISE_RESULTS:
    number_of_samples = 4

    # Display the prediction results
    X_test_sample = X_test[:number_of_samples]
    Y_test_sample = Y_test_original[:number_of_samples]
    Y_pred_sample = Y_pred_original[:number_of_samples]

    print("\nSample Test Data (X):")
    print(X_test_sample)
    print("\nActual Concentrations (Y Test):")
    print(Y_test_sample)
    print("\nPredicted Concentrations (Y Pred):")
    print(Y_pred_sample)

    components = ["ATP", "UTP", "CTP", "GTP"]
    # x_labels = ["Actual", "Predicted"]
    colors = ["b", "g", "r", "c"]

    fig, axs = plt.subplots(4, 1, figsize=(14, 20))
    fig.suptitle("Actual vs Predicted Concentrations for 4 Samples", fontsize=16)

    for i in range(number_of_samples):
        bar_width = 0.4
        indices = np.arange(len(components))
        actual_bars = axs[i].bar(
            indices, Y_test_sample[i], bar_width, label="Actual", color=colors
        )
        predicted_bars = axs[i].bar(
            indices + bar_width,
            Y_pred_sample[i],
            bar_width,
            label="Predicted",
            color=colors,
            alpha=0.6,
        )

        axs[i].set_title(f"Sample {i}")
        axs[i].set_xticks(indices + bar_width / 2)
        axs[i].set_xticklabels(components)
        axs[i].legend()

    plt.tight_layout(rect=(0, 0.03, 1, 0.95))
    plt.show()


In [None]:
"""
# Predict new samples with the current model
"""
from scipy.signal import savgol_filter
import pandas as pd
import numpy as np
from sklearn.cross_decomposition import PLSRegression
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import mean_squared_error, r2_score
import os
from sklearn.ensemble import RandomForestRegressor


# Function to apply Standard Normal Variate (SNV)
def standard_normal_variate(X):
    """
    Apply Standard Normal Variate (SNV) transformation row-wise.
    """
    X_snv = (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True)
    return X_snv

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

# =========================#
#                          #
#   DATA PREPROCESSING     #
#                          #
# =========================#


# HISTORICAL
# Get the data and drop the wavenumber column
data = pd.read_csv(
    os.path.join(
        CURRENT_DIR, "wiz-app", "data_sets", "240403 4 NTP combinations Graph Data.csv"
    )
)

wavenumber = data["Wavenumber (1/cm)"].values
data.drop(columns=["Wavenumber (1/cm)"], inplace=True)

# Create the X and Y arrays
X_list = []
Y_list = []
last_col_name = ""
for col in data.columns:
    if "Unnamed" not in col:
        X_list.append(data[col].values.tolist())
        Y_list.append([float(val) for val in col.replace("'", ".").strip().split(" ")])
        last_col_name = col
    else:
        if last_col_name:
            X_list.append(data[col].values.tolist())
            Y_list.append(
                [
                    float(val)
                    for val in last_col_name.replace("'", ".").strip().split(" ")
                ]
            )


X_raw = np.array(X_list)
X_savgol = savgol_filter(X_raw, window_length=11, polyorder=3, axis=1)
X_snv = standard_normal_variate(X_savgol)
X = X_snv
Y = np.array(Y_list)

df = pd.DataFrame(X)
df.to_csv("X.csv", index=False)
df.to_csv("Y.csv", index=False)


# NEW
new_data = pd.read_csv(os.path.join(CURRENT_DIR, "wiz-app", "data_sets", "new new.csv"))
wavenumber_of_new = new_data["Wavenumber (1/cm)"].values
new_data.drop(columns=["Wavenumber (1/cm)"], inplace=True)

X_list_new = []
last_col_name_new = ""
for col in new_data.columns:
        X_list_new.append(new_data[col].values.tolist())
        last_col_name_new = col
X_raw_new = np.array(X_list_new)
X_new_savgol = savgol_filter(X_raw_new, window_length=11, polyorder=3, axis=1)
X_new = standard_normal_variate(X_new_savgol)


# =========================#
#                          #
#   PLS REGRESSION MODEL   #
#                          #
# =========================#

# Split the data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)

# Create a PLS model
pls = PLSRegression()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_best = PLSRegression(n_components=best_n_components)
pls_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred = pls_best.predict(X_new)

print(Y_pred)


# # =======================================================#
# #                                                        #
# #   LEARN THE RESIDUALS USING NON-NEGATIVE REGRESSION    #
# #                                                        #
# # =======================================================#

from scipy.optimize import nnls


# Custom wrapper to enforce non-negative predictions
class NonNegativePLS(PLSRegression):
    def predict(self, X_train_nn):
        Y_pred = super().predict(X_train_nn)
        # Apply non-negative constraint
        Y_pred_nn = np.array([nnls(np.eye(Y_pred.shape[1]), y)[0] for y in Y_pred])
        return Y_pred_nn


# Create a PLS model
pls_nn = NonNegativePLS()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls_nn, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_nn_best = NonNegativePLS(n_components=best_n_components)
pls_nn_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred_nn = pls_nn_best.predict(X_new)

print(Y_pred_nn)

# # ============================================================#
# #                                                             #
# #   LEARN THE RESIDUALS OF NNLS-PLS USING RANDOM FOREST       #
# #                                                             #
# # ============================================================#


# Calculate residuals
residuals = Y_train - pls_nn_best.predict(X_train)

# Secondary Model: RandomForestRegressor to learn residuals
residual_model_rf = RandomForestRegressor(n_estimators=100, random_state=0)
residual_model_rf.fit(X_train, residuals)

# Correction of PLS predictions
Y_pred_test_pls = pls_nn_best.predict(X_new)
Y_residuals_pred = residual_model_rf.predict(X_new)
Y_pred_corrected_rf = Y_pred_test_pls + Y_residuals_pred

print(Y_pred_corrected_rf)


In [None]:
from scipy.signal import savgol_filter
import pandas as pd
import numpy as np
from sklearn.cross_decomposition import PLSRegression
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import mean_squared_error, r2_score
import os
from sklearn.ensemble import RandomForestRegressor


# Function to apply Standard Normal Variate (SNV)
def standard_normal_variate(X):
    """
    Apply Standard Normal Variate (SNV) transformation row-wise.
    """
    X_snv = (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True)
    return X_snv


# =========================#
#                          #
#   DATA PREPROCESSING     #
#                          #
# =========================#

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Get the data and drop the wavenumber column
data = pd.read_csv(
    os.path.join(
        CURRENT_DIR, "wiz-app", "data_sets", "240403 4 NTP combinations Graph Data.csv"
    )
)

wavenumber = data["Wavenumber (1/cm)"].values
data.drop(columns=["Wavenumber (1/cm)"], inplace=True)

# Create the X and Y arrays
X_list = []
Y_list = []
last_col_name = ""
for col in data.columns:
    if "Unnamed" not in col:
        X_list.append(data[col].values.tolist())
        Y_list.append([float(val) for val in col.replace("'", ".").strip().split(" ")])
        last_col_name = col
    else:
        if last_col_name:
            X_list.append(data[col].values.tolist())
            Y_list.append(
                [
                    float(val)
                    for val in last_col_name.replace("'", ".").strip().split(" ")
                ]
            )

X_raw = np.array(X_list)

# Apply Savitzky-Golay filter row-wise
X_savgol = savgol_filter(X_raw, window_length=11, polyorder=3, axis=1)

# Apply Standard Normal Variate (SNV) row-wise
X_snv = standard_normal_variate(X_savgol)

X = X_snv
Y = np.array(Y_list)

# =========================#
#                          #
#   PLS REGRESSION MODEL   #
#                          #
# =========================#

# Split the data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)

# Create a PLS model
pls = PLSRegression()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_best = PLSRegression(n_components=best_n_components)
pls_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred = pls_best.predict(X_test)

# Calculate mean squared error and root mean squared error for the predictions
mse = mean_squared_error(Y_test, Y_pred)
rmse = np.sqrt(mse)
# Calculate R-squared for the predictions
r2 = r2_score(Y_test, Y_pred)

# Print results
print(f"R-squared: {r2}")
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# =============================================#
#                                              #
#   LEARN THE RESIDUALS USING RANDOM FOREST    #
#                                              #
# =============================================#


# Calculate residuals on the training set
residuals = Y_train - pls_best.predict(X_train)

# Secondary Model: RandomForestRegressor to learn residuals
residual_model_rf = RandomForestRegressor(n_estimators=100, random_state=0)
residual_model_rf.fit(X_train, residuals)

# Correction of PLS predictions
Y_pred_test_pls = pls_best.predict(X_test)
Y_residuals_pred = residual_model_rf.predict(X_test)
Y_pred_corrected_rf = Y_pred_test_pls + Y_residuals_pred


mse_rf = mean_squared_error(Y_test, Y_pred_corrected_rf)
rmse_rf = np.sqrt(mse_rf)
r2_rf = r2_score(Y_test, Y_pred_corrected_rf)

print(f"RandomForest Corrected - R-squared: {r2_rf}, MSE: {mse_rf}, RMSE: {rmse_rf}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# =======================================================#
#                                                        #
#   LEARN THE RESIDUALS USING NON-NEGATIVE REGRESSION    #
#                                                        #
# =======================================================#

from scipy.optimize import nnls


# Custom wrapper to enforce non-negative predictions
class NonNegativePLS(PLSRegression):
    def predict(self, X_train_nn):
        Y_pred = super().predict(X_train_nn)
        # Apply non-negative constraint
        Y_pred_nn = np.array([nnls(np.eye(Y_pred.shape[1]), y)[0] for y in Y_pred])
        return Y_pred_nn


# Create a PLS model
pls_nn = NonNegativePLS()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls_nn, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_nn_best = NonNegativePLS(n_components=best_n_components)
pls_nn_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred_nn = pls_nn_best.predict(X_test)

# Calculate mean squared error and root mean squared error for the predictions
mse = mean_squared_error(Y_test, Y_pred_nn)
rmse = np.sqrt(mse)
# Calculate R-squared for the predictions
r2 = r2_score(Y_test, Y_pred_nn)

print(f"Non Negative Regression PLS - R-squared: {r2}, MSE: {mse}, RMSE: {rmse}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_nn[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# ============================================================#
#                                                             #
#   LEARN THE RESIDUALS OF NNLS-PLS USING RANDOM FOREST       #
#                                                             #
# ============================================================#


# Calculate residuals
residuals = Y_train - pls_nn_best.predict(X_train)

# Secondary Model: RandomForestRegressor to learn residuals
residual_model_rf = RandomForestRegressor(n_estimators=100, random_state=0)
residual_model_rf.fit(X_train, residuals)

# Correction of PLS predictions
Y_pred_test_pls = pls_nn_best.predict(X_test)
Y_residuals_pred = residual_model_rf.predict(X_test)
Y_pred_corrected_rf = Y_pred_test_pls + Y_residuals_pred


mse_rf = mean_squared_error(Y_test, Y_pred_corrected_rf)
rmse_rf = np.sqrt(mse_rf)
r2_rf = r2_score(Y_test, Y_pred_corrected_rf)

print(
    f"Non Negative Regression RandomForest Corrected - R-squared: {r2_rf}, MSE: {mse_rf}, RMSE: {rmse_rf}"
)

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)

import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error

# Set the global font to be DejaVu Sans, size 10 (or any other preferred font and size)
plt.rcParams["font.size"] = "10"
plt.rcParams["font.family"] = "DejaVu Sans"


def round_to_sig_figs(num, sig_figs):
    """
    Rounds a number to a specified number of significant figures.

    Parameters:
    - num: The number to be rounded.
    - sig_figs: The number of significant figures to round to.

    Returns:
    - Rounded number to the specified significant figures.
    """
    if num == 0:
        return 0
    else:
        return round(num, sig_figs - int(np.floor(np.log10(abs(num)))) - 1)


# Calculate MAE for each model
mae_pls = round_to_sig_figs(mean_absolute_error(Y_test, Y_pred), 2)
mae_rf_corrected_pls = round_to_sig_figs(
    mean_absolute_error(Y_test, Y_pred_corrected_rf), 2
)
mae_nn_pls = round_to_sig_figs(mean_absolute_error(Y_test, Y_pred_nn), 2)
mae_rf_corrected_nn_pls = round_to_sig_figs(
    mean_absolute_error(Y_test, Y_pred_corrected_rf), 2
)

# Collect metrics for each model
metrics = {
    "PLS": {
        "RMSE": round_to_sig_figs(rmse, 2),
        "R²": round_to_sig_figs(r2, 2),
        "MAE": mae_pls,
    },
    "RF Corrected PLS": {
        "RMSE": round_to_sig_figs(rmse_rf, 2),
        "R²": round_to_sig_figs(r2_rf, 2),
        "MAE": mae_rf_corrected_pls,
    },
    "Non-Neg PLS": {
        "RMSE": round_to_sig_figs(rmse, 2),
        "R²": round_to_sig_figs(r2, 2),
        "MAE": mae_nn_pls,
    },
    "RF Corrected Non-Neg PLS": {
        "RMSE": round_to_sig_figs(rmse_rf, 2),
        "R²": round_to_sig_figs(r2_rf, 2),
        "MAE": mae_rf_corrected_nn_pls,
    },
}

# Extract data for plotting
models = list(metrics.keys())
rmse_values = [metrics[model]["RMSE"] for model in models]
r2_values = [metrics[model]["R²"] for model in models]
mae_values = [metrics[model]["MAE"] for model in models]

# Plot RMSE, R², and MAE in a horizontal bar chart
fig, ax = plt.subplots(figsize=(14, 6))

y = np.arange(len(models))  # the label locations
height = 0.2  # the height of the bars

rects1 = ax.barh(y - height, rmse_values, height, label="RMSE")
rects2 = ax.barh(y, r2_values, height, label="R²")
rects3 = ax.barh(y + height, mae_values, height, label="MAE")

# Add some text for labels, title and axes ticks
ax.set_ylabel("Model")
ax.set_title("Comparison of RMSE, R², and MAE for Different Models")
ax.set_yticks(y)
ax.set_yticklabels(models)
ax.legend()

ax.bar_label(rects1, padding=3)
ax.bar_label(rects2, padding=3)
ax.bar_label(rects3, padding=3)

fig.tight_layout()
plt.show()

# Plot Predicted vs. Measured for the best model
best_model_name = (
    "RF Corrected Non-Neg PLS"  # Assuming this is the best based on metrics
)
Y_pred_best = Y_pred_corrected_rf  # Replace with the best model's predictions

plt.figure(figsize=(8, 8))
plt.scatter(Y_test.flatten(), Y_pred_best.flatten(), edgecolors=(0, 0, 0))
plt.plot([Y_test.min(), Y_test.max()], [Y_test.min(), Y_test.max()], "k--", lw=4)
plt.xlabel("Measured NTP Composition")
plt.ylabel("Predicted NTP Composition")
plt.title(f"Predicted vs Measured NTP composition for {best_model_name}")


number_of_samples = 4

# Display the prediction results
Y_test_sample = [
    Y_test[:1][0],
    Y_test[:1][0],
    Y_test[:1][0],
    Y_test[:1][0],
]
Y_pred_sample = [
    [Y_pred[:1][0], "PLS"],
    [Y_pred_corrected_rf[:1][0], "RF Corrected PLS"],
    [Y_pred_nn[:1][0], "Non-Neg PLS"],
    [Y_pred_corrected_rf[:1][0], "RF Corrected Non-Neg PLS"],
]


components = ["ATP", "UTP", "CTP", "GTP"]
# x_labels = ["Actual", "Predicted"]
colors = ["b", "g", "r", "c"]

fig, axs = plt.subplots(4, 1, figsize=(14, 20))
fig.suptitle("Actual vs Predicted Concentrations for 4 Samples", fontsize=16)

for i in range(number_of_samples):
    bar_width = 0.4
    indices = np.arange(len(components))
    actual_bars = axs[i].bar(
        indices, Y_test_sample[i], bar_width, label="Actual", color=colors
    )
    predicted_bars = axs[i].bar(
        indices + bar_width,
        Y_pred_sample[i][0],
        bar_width,
        label=f"{Y_pred_sample[i][1]} Prediction",
        color=colors,
        alpha=0.6,
    )

    axs[i].set_title("Sample OOD")
    axs[i].set_xticks(indices + bar_width / 2)
    axs[i].set_xticklabels(components)
    axs[i].legend()

plt.tight_layout(rect=(0, 0.03, 1, 0.95))


plt.show()


In [None]:
from scipy.signal import savgol_filter
import pandas as pd
import numpy as np
from sklearn.cross_decomposition import PLSRegression
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import mean_squared_error, r2_score
import os
from sklearn.ensemble import RandomForestRegressor


# Function to apply Standard Normal Variate (SNV)
def standard_normal_variate(X):
    """
    Apply Standard Normal Variate (SNV) transformation row-wise.
    """
    X_snv = (X - X.mean(axis=1, keepdims=True)) / X.std(axis=1, keepdims=True)
    return X_snv


# =========================#
#                          #
#   DATA PREPROCESSING     #
#                          #
# =========================#

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Get the data and drop the wavenumber column
data = pd.read_csv(
    os.path.join(CURRENT_DIR, "wiz-app", "data_sets", "240403 4 NTP combinations Graph Data.csv")
)
wavenumber = data["Wavenumber (1/cm)"].values
data.drop(columns=["Wavenumber (1/cm)"], inplace=True)

# Create the X and Y arrays
X_list = []
Y_list = []
last_col_name = ""
for col in data.columns:
    if "Unnamed" not in col:
        X_list.append(data[col].values.tolist())
        Y_list.append([float(val) for val in col.replace("'", ".").strip().split(" ")])
        last_col_name = col
    else:
        if last_col_name:
            X_list.append(data[col].values.tolist())
            Y_list.append(
                [
                    float(val)
                    for val in last_col_name.replace("'", ".").strip().split(" ")
                ]
            )

X_raw = np.array(X_list)

# Apply Savitzky-Golay filter row-wise
X_savgol = savgol_filter(X_raw, window_length=11, polyorder=3, axis=1)

# Apply Standard Normal Variate (SNV) row-wise
X_snv = standard_normal_variate(X_savgol)

X = X_snv
Y = np.array(Y_list)

# =========================#
#                          #
#   PLS REGRESSION MODEL   #
#                          #
# =========================#

# Split the data into training and testing sets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)

# Create a PLS model
pls = PLSRegression()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_best = PLSRegression(n_components=best_n_components)
pls_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred = pls_best.predict(X_test)

# Calculate mean squared error and root mean squared error for the predictions
mse = mean_squared_error(Y_test, Y_pred)
rmse = np.sqrt(mse)
# Calculate R-squared for the predictions
r2 = r2_score(Y_test, Y_pred)

# Print results
print(f"R-squared: {r2}")
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# =============================================#
#                                              #
#   LEARN THE RESIDUALS USING RANDOM FOREST    #
#                                              #
# =============================================#


# Calculate residuals on the training set
residuals = Y_train - pls_best.predict(X_train)

# Secondary Model: RandomForestRegressor to learn residuals
residual_model_rf = RandomForestRegressor(n_estimators=100, random_state=0)
residual_model_rf.fit(X_train, residuals)

# Correction of PLS predictions
Y_pred_test_pls = pls_best.predict(X_test)
Y_residuals_pred = residual_model_rf.predict(X_test)
Y_pred_corrected_rf = Y_pred_test_pls + Y_residuals_pred


mse_rf = mean_squared_error(Y_test, Y_pred_corrected_rf)
rmse_rf = np.sqrt(mse_rf)
r2_rf = r2_score(Y_test, Y_pred_corrected_rf)

print(f"RandomForest Corrected - R-squared: {r2_rf}, MSE: {mse_rf}, RMSE: {rmse_rf}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# =======================================================#
#                                                        #
#   LEARN THE RESIDUALS USING NON-NEGATIVE REGRESSION    #
#                                                        #
# =======================================================#

from scipy.optimize import nnls


# Custom wrapper to enforce non-negative predictions
class NonNegativePLS(PLSRegression):
    def predict(self, X_train_nn):
        Y_pred = super().predict(X_train_nn)
        # Apply non-negative constraint
        Y_pred_nn = np.array([nnls(np.eye(Y_pred.shape[1]), y)[0] for y in Y_pred])
        return Y_pred_nn


# Create a PLS model
pls_nn = NonNegativePLS()

# Use GridSearchCV to find the best number of components using KFold cross-validation
param_grid = {"n_components": list(range(1, 21))}
kf = KFold(n_splits=5, shuffle=True, random_state=0)
grid_search = GridSearchCV(pls_nn, param_grid, cv=kf, scoring="neg_mean_squared_error")
grid_search.fit(X_train, Y_train)

# Best number of components
best_n_components = grid_search.best_params_["n_components"]
print(f"Best number of components: {best_n_components}")

# Fit the PLS model with the best number of components
pls_nn_best = NonNegativePLS(n_components=best_n_components)
pls_nn_best.fit(X_train, Y_train)

# Predict on the test set
Y_pred_nn = pls_nn_best.predict(X_test)

# Calculate mean squared error and root mean squared error for the predictions
mse = mean_squared_error(Y_test, Y_pred_nn)
rmse = np.sqrt(mse)
# Calculate R-squared for the predictions
r2 = r2_score(Y_test, Y_pred_nn)

print(f"Non Negative Regression PLS - R-squared: {r2}, MSE: {mse}, RMSE: {rmse}")

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_nn[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)


# ============================================================#
#                                                             #
#   LEARN THE RESIDUALS OF NNLS-PLS USING RANDOM FOREST       #
#                                                             #
# ============================================================#


# Calculate residuals
residuals = Y_train - pls_nn_best.predict(X_train)

# Secondary Model: RandomForestRegressor to learn residuals
residual_model_rf = RandomForestRegressor(n_estimators=100, random_state=0)
residual_model_rf.fit(X_train, residuals)

# Correction of PLS predictions
Y_pred_test_pls = pls_nn_best.predict(X_test)
Y_residuals_pred = residual_model_rf.predict(X_test)
Y_pred_corrected_rf = Y_pred_test_pls + Y_residuals_pred


mse_rf = mean_squared_error(Y_test, Y_pred_corrected_rf)
rmse_rf = np.sqrt(mse_rf)
r2_rf = r2_score(Y_test, Y_pred_corrected_rf)

print(
    f"Non Negative Regression RandomForest Corrected - R-squared: {r2_rf}, MSE: {mse_rf}, RMSE: {rmse_rf}"
)

number_of_samples = 4

# Display the prediction results
Y_test_sample = Y_test[:number_of_samples]
Y_pred_sample = Y_pred_corrected_rf[:number_of_samples]

print("\nActual Concentrations (Y Test):")
print(Y_test_sample)
print("\nPredicted Concentrations (Y Pred):")
print(Y_pred_sample)
