In [None]:
import numpy as np
import pandas as pd
from joblib import dump, load
from tqdm import tqdm

from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

import itertools

In [None]:
def shuffle_arrays(arr1, arr2):
    assert len(arr1) == len(arr2)
    permutation = np.random.permutation(len(arr1))
    return arr1[permutation], arr2[permutation]

# Testing Dataset

In [None]:
df = pd.read_csv("../files/data/FINAL_CSV/SURFACE_CRACK_TEST.csv")
df

In [None]:
# Get unique combinations of the first four columns
SC_test_combinations = df.iloc[:, 1:4].drop_duplicates().to_numpy()

print(len(SC_test_combinations))

print("Different a/c values: ", np.unique(SC_test_combinations[:,0], axis=0))
print("Different a/t values: ", np.unique(SC_test_combinations[:,1], axis=0))
print("Different c/b values: ", np.unique(SC_test_combinations[:,2], axis=0))

In [None]:
phi_values = df.iloc[:, 4].drop_duplicates().to_numpy()
print(phi_values)
print(len(phi_values))

In [None]:
d = df.to_numpy()[:,1:]
d.shape

# RFR

In [None]:
X_test = d[:,:-1]

In [None]:
rfr = load('../files/trained_models/SURFACE_CRACK/rfr.joblib')
y_pred_rfr = rfr.predict(X_test)

# SVR

In [None]:
X_test = d[:,:-1]

In [None]:
svr = load('../files/trained_models/SURFACE_CRACK/svr.joblib')
y_pred_svr = svr.predict(X_test)

# NN

In [None]:
X_test = d[:,:-1]
y_test = d[:,:-1]

In [None]:
import src.nn as custom_nn

In [None]:
device = 'cuda'

In [None]:
X_test_gpu = torch.FloatTensor(X_test).to(device)

X_test_gpu.shape

In [None]:
net = custom_nn.Net10(X_test_gpu.shape[1], 100).to(device)
FILENAME = "../files/trained_models/SURFACE_CRACK/nn10.pt"
net.load_state_dict(torch.load(FILENAME, weights_only=False))

with torch.no_grad():
    y_pred_nn = net.forward(X_test_gpu).cpu().numpy()[:,0]

# DeepONet

In [None]:
from src.deeponet import DeepONet

In [None]:
X_deeponet = np.zeros((len(SC_test_combinations), 3))
phi_deeponet = np.zeros((128, 1))
y_deeponet = np.zeros((len(SC_test_combinations), 128))

for (i,combination) in enumerate(SC_test_combinations):
    indices = np.where((d[:, 0] == combination[0]) & 
                    (d[:, 1] == combination[1]) &
                    (d[:, 2] == combination[2])) 
    indices = indices[0]

    X_deeponet[i,0] = combination[0]
    X_deeponet[i,1] = combination[1]
    X_deeponet[i,2] = combination[2]

    if i == 0:
        phi_deeponet = d[indices][:,-2]
    else:
        assert (phi_deeponet[:] == d[indices][:,-2]).all()

    y_deeponet[i,:] = d[indices][:,-1]

In [None]:
import matplotlib.pyplot as plt
np.random.seed(0)
fig, axs = plt.subplots(2, 5, figsize=(30,12))
for i in range(2):
    for j in range(5):
        idx = np.random.randint(0, len(SC_test_combinations))
        a_c = SC_test_combinations[idx][0]
        a_t = SC_test_combinations[idx][1]
        c_b = SC_test_combinations[idx][2]

        indices = np.where((d[:, 0] == a_c) & 
                    (d[:, 1] == a_t) &
                    (d[:, 2] == c_b)) 
        indices = indices[0]

        axs[i,j].scatter(d[indices][:,-2], d[indices][:,-1], color='purple', s=10)
        axs[i,j].plot(phi_deeponet, y_deeponet[idx], label="K-T", color='purple', linestyle=":")

        axs[i,j].set_title("a/c:{} a/t:{} c/b:{}".format(a_c, a_t, c_b))
        axs[i,j].set_ylabel("K-T")
        axs[i,j].set_xlabel(r"$\phi$")
        
        if i == 0 and j == 0:
            axs[i,j].legend()

plt.show()

In [None]:
device = 'cuda'

In [None]:
X_test_gpu = torch.FloatTensor(X_deeponet).to(device)

y_test_gpu = torch.FloatTensor(y_deeponet).to(device)

phi_gpu = torch.FloatTensor(np.expand_dims(phi_deeponet, axis=-1)).to(device)

X_test_gpu.shape, y_test_gpu.shape, phi_gpu.shape

In [None]:
branch_layers = [X_test_gpu.shape[1]] + [100, 100, 100, 100, 100, 100, 100, 100, 100]
trunk_layers = [1] + [100, 100, 100, 100, 100, 100, 100, 100, 100]

EPOCHS = 250000
lr = 1e-4

model_loaded = DeepONet(branch_layers, trunk_layers, 128, device, True, 
                        "../files/trained_models/SURFACE_CRACK/branch_deeponet.pt", "../files/trained_models/SURFACE_CRACK/trunk_deeponet.pt")

with torch.no_grad():
    y_pred_deeponet = model_loaded.forward(X_test_gpu, phi_gpu)

# FNO

In [None]:
from src.fno import FNO

In [None]:
X_fno = np.zeros((len(SC_test_combinations), 128, 4))
y_fno = np.zeros((len(SC_test_combinations), 128))

for (i, combination) in enumerate(SC_test_combinations):
    indices = np.where((d[:, 0] == combination[0]) & 
                    (d[:, 1] == combination[1]) &
                    (d[:, 2] == combination[2])) 
    indices = indices[0]

    assert (phi_values == d[indices][:,-2]).all()
    
    X_fno[i,:,:-1] = combination
    X_fno[i,:,-1] = phi_values

    y_fno[i,:] = d[indices][:,-1]

In [None]:
import matplotlib.pyplot as plt
np.random.seed(0)
fig, axs = plt.subplots(2, 5, figsize=(30,12))
for i in range(2):
    for j in range(5):
        idx = np.random.randint(0, len(SC_test_combinations))
        a_c = SC_test_combinations[idx][0]
        a_t = SC_test_combinations[idx][1]
        c_b = SC_test_combinations[idx][2]

        indices = np.where((d[:, 0] == a_c) & 
                    (d[:, 1] == a_t) &
                    (d[:, 2] == c_b)) 
        indices = indices[0]

        axs[i,j].scatter(d[indices][:,-2], d[indices][:,-1], color='purple', s=10)
        axs[i,j].plot(phi_values, y_fno[idx], label="K-T", color='purple', linestyle=":")

        axs[i,j].set_title("a/c:{} a/t:{} c/b:{}".format(a_c, a_t, c_b))
        axs[i,j].set_ylabel("K-T")
        axs[i,j].set_xlabel(r"$\phi$")
        
        if i == 0 and j == 0:
            axs[i,j].legend()

plt.show()

In [None]:
modes = 64
width = 64

device = 'cuda'

In [None]:
X_test_gpu = torch.FloatTensor(X_fno).to(device)

y_test_gpu = torch.FloatTensor(y_fno).to(device)

X_test_gpu.shape, y_test_gpu.shape

In [None]:
from src.fno import FNO1d

fno_loaded_model = FNO1d(len(phi_values), X_test_gpu.shape[-1], modes, width).to(device)
if device == "cpu":
    fno_loaded_model.load_state_dict(torch.load("../files/trained_models/SURFACE_CRACK/fno_sc.pt", map_location=torch.device('cpu'), weights_only=False))
else:
    fno_loaded_model.load_state_dict(torch.load("../files/trained_models/SURFACE_CRACK/fno_sc.pt",weights_only=False))
with torch.no_grad():
    y_pred_fno = fno_loaded_model(X_test_gpu)

# POD-DeepONet

In [None]:
from src.pod_deeponet import POD_DeepONet

In [None]:
X_deeponet = np.zeros((len(SC_test_combinations), 3))
y_deeponet = np.zeros((len(SC_test_combinations), 128))

for (i,combination) in enumerate(SC_test_combinations):
    indices = np.where((d[:, 0] == combination[0]) & 
                    (d[:, 1] == combination[1]) &
                    (d[:, 2] == combination[2])) 
    indices = indices[0]

    X_deeponet[i,0] = combination[0]
    X_deeponet[i,1] = combination[1]
    X_deeponet[i,2] = combination[2]

    y_deeponet[i,:] = d[indices][:,-1]

In [None]:
branch_layers = [SC_test_combinations.shape[1]] + [128, 128, 128]
K = 128
device = 'cuda'

In [None]:
y_train_pod = np.load("../files/trained_models/SURFACE_CRACK/y_train.npy")
y_train_gpu = torch.FloatTensor(y_train_pod).to(device)

X_test_gpu = torch.FloatTensor(X_deeponet).to(device)

y_test_gpu = torch.FloatTensor(y_deeponet).to(device)

X_test_gpu.shape, y_test_gpu.shape

In [None]:
model_loaded = POD_DeepONet(y_train_gpu, branch_layers, K, device, 
                        True, "../files/trained_models/SURFACE_CRACK/pod_branch_sc.pt")

with torch.no_grad():
    y_pred_pod = model_loaded.forward(X_test_gpu)

# Results

In [None]:
def absolute_perctentage_error(y_true, y_pred):
    return np.abs((y_true -  y_pred) / y_true)

def perctentage_error(y_true, y_pred):
    return (y_true -  y_pred) / y_true

def mean_normalized_l2(y_true, y_pred):
    return np.mean(np.linalg.norm(y_true -  y_pred, ord=2, axis=1) / np.linalg.norm(y_true, ord=2, axis=1))

In [None]:
import matplotlib.pyplot as plt
np.random.seed(0)
fig, axs = plt.subplots(2, 5, figsize=(30,12))
for i in range(2):
    for j in range(5):
        idx = np.random.randint(0, len(SC_test_combinations))
        a_c = SC_test_combinations[idx][0]
        a_t = SC_test_combinations[idx][1]
        c_b = SC_test_combinations[idx][2]

        indices = np.where((d[:, 0] == a_c) & 
                    (d[:, 1] == a_t) &
                    (d[:, 2] == c_b)) 
        indices = indices[0]

        axs[i,j].scatter(d[indices][:,-2], d[indices][:,-1], color='black', s=10, label="True")
        axs[i,j].plot(phi_values, y_pred_rfr[idx*128:idx*128+128], label="RFR", color='red', linestyle="-")
        axs[i,j].plot(phi_values, y_pred_svr[idx*128:idx*128+128], label="SVR", color='green', linestyle="--")
        axs[i,j].plot(phi_values, y_pred_nn[idx*128:idx*128+128], label="NN", color='blue', linestyle="-.")
        axs[i,j].plot(phi_values, y_pred_deeponet[idx].cpu(), label="DeepONet", color='purple', linestyle=":")
        axs[i,j].plot(phi_values, y_pred_fno[idx].cpu(), label="FNO", color='orange', linestyle="dashdot")
        axs[i,j].plot(phi_values, y_pred_pod[idx].cpu(), label="POD-DeepONet", color='brown', linestyle="solid")
        

        axs[i,j].set_title("a/c:{} a/t:{} c/b:{}".format(a_c, a_t, c_b))
        axs[i,j].set_ylabel("K-T")
        axs[i,j].set_xlabel(r"$\phi$")
        
        if i == 0 and j == 0:
            axs[i,j].legend()

plt.show()

In [None]:
rfr_err_test = absolute_perctentage_error(d[:,-1], y_pred_rfr)
rfr_err_test = rfr_err_test.reshape(-1)

svr_err_test = absolute_perctentage_error(d[:,-1], y_pred_svr)
svr_err_test = svr_err_test.reshape(-1)

nn_err_test = absolute_perctentage_error(d[:,-1], y_pred_nn)
nn_err_test = nn_err_test.reshape(-1)

deeponet_err_test = absolute_perctentage_error(y_deeponet, y_pred_deeponet.cpu().numpy())
deeponet_err_test = deeponet_err_test.reshape(-1)

fno_err_test = absolute_perctentage_error(y_fno, y_pred_fno.cpu().numpy()[:,:,0])
fno_err_test = fno_err_test.reshape(-1)

pod_err_test = absolute_perctentage_error(y_deeponet, y_pred_pod.cpu().numpy())
pod_err_test = pod_err_test.reshape(-1)

plt.rcParams["figure.figsize"] = (8,5)

x = np.sort(rfr_err_test) 
y = np.arange(len(rfr_err_test)) / float(len(rfr_err_test)) 
plt.plot(x,1-y, color='red', label="RFR")

x = np.sort(svr_err_test) 
y = np.arange(len(svr_err_test)) / float(len(svr_err_test)) 
plt.plot(x,1-y, color='green', label="SVR")

x = np.sort(nn_err_test) 
y = np.arange(len(nn_err_test)) / float(len(nn_err_test)) 
plt.plot(x,1-y, color='blue', label="NN")

x = np.sort(deeponet_err_test) 
y = np.arange(len(deeponet_err_test)) / float(len(deeponet_err_test)) 
plt.plot(x,1-y, color='purple', label="DeepONet")

x = np.sort(fno_err_test) 
y = np.arange(len(fno_err_test)) / float(len(fno_err_test)) 
plt.plot(x,1-y, color='orange', label="FNO")

x = np.sort(pod_err_test) 
y = np.arange(len(pod_err_test)) / float(len(pod_err_test)) 
plt.plot(x,1-y, color='brown', label="POD-DeepONet")

plt.grid(which = "major", linewidth = 1)
plt.grid(which = "minor", linewidth = 0.2)
plt.minorticks_on()
plt.xlabel("Normalized Absolute Error", fontsize=25)
plt.ylabel("Probability", fontsize=25)
plt.semilogy()
# plt.xticks([0, 0.005, 0.01, 0.015], fontsize=25)
plt.yticks(fontsize=25)
plt.legend(fontsize=25)
plt.title("Surface Crack (DeepONet)", fontsize=25)
plt.show()

In [None]:
# rfr_l2_test = mean_normalized_l2(d[:,-1], y_pred_rfr)

# svr_l2_test = mean_normalized_l2(d[:,-1], y_pred_svr)

# nn_l2_test = mean_normalized_l2(d[:,-1], y_pred_nn)

deeponet_l2_test = mean_normalized_l2(y_deeponet, y_pred_deeponet.cpu().numpy())

fno_l2_test = mean_normalized_l2(y_fno, y_pred_fno.cpu().numpy()[:,:,0])

pod_l2_test = mean_normalized_l2(y_deeponet, y_pred_pod.cpu().numpy())

# print("RFR: ", rfr_l2_test)
# print("SVR: ", svr_l2_test)
# print("NN: ", nn_l2_test)
print("DeepONet: ", deeponet_l2_test)
print("FNO: ", fno_l2_test)
print("POD-DeepONet: ", pod_l2_test)