In [None]:
import pandas as pd
import os
import missingno as msno
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from sklearn.cluster import DBSCAN
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.model_selection import ParameterGrid
from sklearn.preprocessing import StandardScaler


charpy_columns = ["Charpy impact toughness (J)"]
stress_columns = ["Yield strength (MPa)", "Ultimate tensile strength (MPa)", "Elongation (%)", "Reduction of Area (%)"]
target_columns = charpy_columns + stress_columns

data = pd.read_csv('../../data/data_no_reg_no_dup.csv')
X = data.drop(target_columns,axis=1)
y = data[target_columns]

def predict_confidence(model, X):
    return 1 / (1 + np.abs(model.predict(X)))

class MLP(nn.Module):
    def __init__(self, size_vector):
        super(MLP, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(len(size_vector)-1):
            self.layers.append(nn.Linear(size_vector[i],size_vector[i+1]))
            self.layers.append(nn.Tanh())
        self.layers.pop(-1)


    def forward(self,x):
        for layer in self.layers:
            x = layer(x)
        return x
target_columns_bis = ["Charpy impact toughness (J)","Yield strength (MPa)", "Ultimate tensile strength (MPa)"]
for target_column in target_columns_bis:

    print("=========================================================================================\n")
    print(f"====================={target_column}========================\n")
    print("=========================================================================================\n")
    X_supervised = X[y[target_column].notna()]
    y_supervised = y[y[target_column].notna()]

    X_unlabeled = np.array(X[y[target_column].isna()])
    y_unlabeled = np.array(y.loc[y[target_column].isna(),target_column])

    X_tensor = torch.tensor(X_supervised.values, dtype=torch.float32)
    y_tensor = torch.tensor(y_supervised[target_column].values, dtype=torch.float32)
    dataset = TensorDataset(X_tensor,y_tensor)

    train_size = int(0.6 * len(dataset))  # 70% for training
    val_size = int(0.2 * len(dataset))    # 15% for validation
    test_size = len(dataset) - train_size - val_size  # Remaining for test

    # Step 2: Split the dataset
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    training_data = torch.utils.data.ConcatDataset([train_dataset, val_dataset])
    data_list = []
    target_list = []
    for data, target in training_data:
        data_list.append(data.numpy())
        target_list.append(target.numpy())

    X_training = np.array(data_list)
    y_training = np.array(target_list)

    print("==================================KNN==================================\n")

    neib_vect = [1,2,3,4,5,6,7,8,9,10,15,20,25,30]
    mean_error = []
    var_error = []
    train_mse = []
    for n_neib in tqdm(neib_vect):
        current_errors = []
        for i in range(50):
            X_train, X_val, y_train, y_val = train_test_split(X_training, y_training, test_size=0.1415)
            Xscaler = StandardScaler()
            yscaler = StandardScaler()
            X_train = Xscaler.fit_transform(X_train)
            X_val = Xscaler.transform(X_val)
            y_train = yscaler.fit_transform(y_train.reshape(-1,1)).reshape(-1)
            y_val = yscaler.transform(y_val.reshape(-1,1)).reshape(-1)
            
            knn = KNeighborsRegressor(n_neighbors=n_neib)

            knn.fit(X_train, y_train)

            y_pred = knn.predict(X_val)

            train_mse.append(mean_squared_error(y_train,knn.predict(X_train)))
            mse = mean_squared_error(y_val, y_pred)
            current_errors.append(mse)
        var_error.append(np.var(current_errors))
        mean_error.append(np.mean(current_errors))

    var_error = np.array(var_error)
    mean_error = np.array(mean_error)

    plt.plot(neib_vect,mean_error)
    plt.fill_between(neib_vect,mean_error-0.5*np.array(var_error),mean_error+0.5*var_error, alpha=0.5)
    plt.title("Error on the validation set")
    plt.ylabel("MSE")
    plt.xlabel("Number of neighbours k")
    plt.show()
    best_hp = neib_vect[np.argmin(mean_error)]
    print(f"Minimal validation loss: {min(mean_error)} for n_neib = {best_hp}")

    best_knn = KNeighborsRegressor(n_neighbors=best_hp)
    best_knn.fit(X_train,y_train)
    plt.plot(y_val,best_knn.predict(X_val),'ob')
    plt.plot(range(-3,4),range(-3,4))
    plt.title(f"Exemple of errors for n_neib = {best_hp}")
    plt.xlabel("Measured Yield Strength")
    plt.ylabel(f"Estimated Yield Strength")
    plt.show()

    print("==================================DECISION TREE==================================\n")

    depth_vect = [1,2,3,4,5,6,7,8,9,10,15,20,25,30]
    mean_error = []
    var_error = []
    train_mse = []
    for depth in tqdm(depth_vect):
        current_errors = []
        for i in range(50):
            X_train, X_val, y_train, y_val = train_test_split(X_training, y_training, test_size=0.1415)
            Xscaler = StandardScaler()
            yscaler = StandardScaler()
            X_train = Xscaler.fit_transform(X_train)
            X_val = Xscaler.transform(X_val)
            y_train = yscaler.fit_transform(y_train.reshape(-1,1)).reshape(-1)
            y_val = yscaler.transform(y_val.reshape(-1,1)).reshape(-1)
            dt = DecisionTreeRegressor(max_depth=depth)

            dt.fit(X_train, y_train)

            y_pred = dt.predict(X_val)

            train_mse.append(mean_squared_error(y_train,dt.predict(X_train)))
            mse = mean_squared_error(y_val, y_pred)
            current_errors.append(mse)
        var_error.append(np.var(current_errors))
        mean_error.append(np.mean(current_errors))

    var_error = np.array(var_error)
    mean_error = np.array(mean_error)

    plt.plot(depth_vect,mean_error)
    plt.fill_between(depth_vect,mean_error-0.5*np.array(var_error),mean_error+0.5*var_error,alpha=0.5,)
    plt.title("Error on the validation set")
    plt.ylabel("MSE")
    plt.xlabel("Max depth of the tree")
    plt.show()
    best_hp = depth_vect[np.argmin(mean_error)]
    print(f"Minimal validation loss: {min(mean_error)} for max depth = {best_hp}")

    best_dt = DecisionTreeRegressor(max_depth=best_hp)
    best_dt.fit(X_train,y_train)
    plt.plot(y_val,best_dt.predict(X_val),'ob')
    plt.plot(range(-3,4),range(-3,4))
    plt.title(f"Exemple of errors for max depth = {best_hp}")
    plt.xlabel("Measured Yield Strength")
    plt.ylabel(f"Estimated Yield Strength")
    plt.show()

    print("==================================RANDOM FOREST==================================\n")

    depth_vect = [1,2,3,4,5,6,7,8,9,10,15,20]
    mean_error = []
    var_error = []
    train_mse = []
    for depth in tqdm(depth_vect):
        current_errors = []
        for i in range(10):
            X_train, X_val, y_train, y_val = train_test_split(X_training, y_training, test_size=0.1415)
            Xscaler = StandardScaler()
            yscaler = StandardScaler()
            X_train = Xscaler.fit_transform(X_train)
            X_val = Xscaler.transform(X_val)
            y_train = yscaler.fit_transform(y_train.reshape(-1,1)).reshape(-1)
            y_val = yscaler.transform(y_val.reshape(-1,1)).reshape(-1)
            rf = RandomForestRegressor(max_depth=depth)

            rf.fit(X_train, y_train)

            y_pred = rf.predict(X_val)

            train_mse.append(mean_squared_error(y_train,rf.predict(X_train)))
            mse = mean_squared_error(y_val, y_pred)
            current_errors.append(mse)
        var_error.append(np.var(current_errors))
        mean_error.append(np.mean(current_errors))

    var_error = np.array(var_error)
    mean_error = np.array(mean_error)

    plt.plot(depth_vect,mean_error)
    plt.fill_between(depth_vect,mean_error-0.5*np.array(var_error),mean_error+0.5*var_error,alpha=0.5,)
    plt.title("Error on the validation set")
    plt.ylabel("MSE")
    plt.xlabel("Max depth of the trees")
    plt.show()
    best_hp = depth_vect[np.argmin(mean_error)]
    print(f"Minimal validation loss: {min(mean_error)} for max depth = {best_hp}")

    best_rf = RandomForestRegressor(max_depth=best_hp)
    best_rf.fit(X_train,y_train)
    plt.plot(y_val,best_rf.predict(X_val),'ob')
    plt.plot(range(-3,4),range(-3,4))
    plt.title(f"Exemple of errors for max depth = {best_hp}")
    plt.xlabel("Measured Yield Strength")
    plt.ylabel(f"Estimated Yield Strength")
    plt.show()

    print("==================================SUPPORT VECTOR REGRESSION==================================\n")

    mean_error = []
    var_error = []
    train_mse = []

    param_grid = {
        'C': [5, 10, 15, 20,25,30],
        'epsilon': [0.01,0.05, 0.1,0.15,0.25,0.5, 1],
    }

    for params in tqdm(ParameterGrid(param_grid)):
        current_errors = []
        for i in range(50):
            X_train, X_val, y_train, y_val = train_test_split(X_training, y_training, test_size=0.1415)
            Xscaler = StandardScaler()
            yscaler = StandardScaler()
            X_train = Xscaler.fit_transform(X_train)
            X_val = Xscaler.transform(X_val)
            y_train = yscaler.fit_transform(y_train.reshape(-1,1)).reshape(-1)
            y_val = yscaler.transform(y_val.reshape(-1,1)).reshape(-1)
            sv = SVR(C=params['C'],epsilon=params['epsilon'],kernel='rbf')

            sv.fit(X_train, y_train)

            y_pred = sv.predict(X_val)

            train_mse.append(mean_squared_error(y_train,sv.predict(X_train)))
            mse = mean_squared_error(y_val, y_pred)
            current_errors.append(mse)
        var_error.append(np.var(current_errors))
        mean_error.append(np.mean(current_errors))

    var_error = np.array(var_error)
    mean_error = np.array(mean_error)

    mean_error_matrix = mean_error.reshape(len(param_grid['C']),len(param_grid['epsilon']))

    heatmap_data = pd.DataFrame(mean_error_matrix, index=param_grid['C'], columns=param_grid['epsilon'])

    plt.figure(figsize=(8, 6))
    sns.heatmap(heatmap_data, annot=True, cmap="YlGnBu", cbar_kws={'label': 'Mean Error'})
    plt.title('Hyperparameter Tuning Heatmap (Mean Error)')
    plt.xlabel('epsilon')
    plt.ylabel('C')
    plt.show()

    min_error_index = np.unravel_index(np.argmin(mean_error_matrix, axis=None), mean_error_matrix.shape)
    # Retrieve the best hyperparameters
    best_C = param_grid['C'][min_error_index[0]]  # Row index corresponds to C values
    best_epsilon = param_grid['epsilon'][min_error_index[1]]

    print(f'Minimal validation loss: {np.min(mean_error_matrix,axis=None)} for C = {best_C} and epsilon = {best_epsilon}')

    best_sv = SVR(C=best_C,epsilon=best_epsilon)
    best_sv.fit(X_train,y_train)
    plt.plot(y_val,best_sv.predict(X_val),'ob')
    plt.plot(range(-3,4),range(-3,4))
    plt.title(f"Exemple of errors for C = {best_C} and eps = {best_epsilon}")
    plt.xlabel("Measured Yield Strength")
    plt.ylabel(f"Estimated Yield Strength")
    plt.show()

    print("==================================SUPPORT VECTOR REGRESSION SEMI SUPERVISED==================================\n")
    if "Charpy temperature (deg C)" not in X.columns:
        mean_error = []
        var_error = []
        train_mse = []

        confidence_threshold = 0.9

        param_grid = {
            'C': [5, 10, 15, 20,25,30],
            'epsilon': [0.01,0.05, 0.1,0.15,0.25,0.5, 1],
        }
        best_mse = 1000
        for params in tqdm(ParameterGrid(param_grid)):
            current_errors = []
            for i in range(10):
                X_train, X_val, y_train, y_val = train_test_split(X_training, y_training, test_size=0.1415)
                Xscaler = StandardScaler()
                yscaler = StandardScaler()
                X_train = Xscaler.fit_transform(X_train)
                X_val = Xscaler.transform(X_val)
                y_train = yscaler.fit_transform(y_train.reshape(-1,1)).reshape(-1)
                y_val = yscaler.transform(y_val.reshape(-1,1)).reshape(-1)
                X_unlabeled_reg = Xscaler.transform(X_unlabeled)
                sv = SVR(C=params['C'],epsilon=params['epsilon'],kernel='rbf')

                for iteration in range(10):
                    sv.fit(X_train, y_train)

                    y_unlabeled_pred = sv.predict(X_unlabeled_reg)
                    confidence_scores = predict_confidence(sv, X_unlabeled_reg)
                    high_confidence_mask = confidence_scores > confidence_threshold
                    if np.sum(high_confidence_mask) == 0:
                        break
                    
                    X_train = np.vstack([X_train,X_unlabeled_reg[high_confidence_mask]])
                    y_train = np.hstack([y_train,y_unlabeled_pred[high_confidence_mask]])
                    X_unlabeled_reg = X_unlabeled_reg[~high_confidence_mask]

                y_pred = sv.predict(X_val)

                train_mse.append(mean_squared_error(y_train,sv.predict(X_train)))
                mse = mean_squared_error(y_val, y_pred)
                if mse< best_mse:
                    best_model = sv
                    best_mse = mse
                    X_val_disp = X_val
                    y_val_disp = y_val
                current_errors.append(mse)
            var_error.append(np.var(current_errors))
            mean_error.append(np.mean(current_errors))

        var_error = np.array(var_error)
        mean_error = np.array(mean_error)

        mean_error_matrix = mean_error.reshape(len(param_grid['C']),len(param_grid['epsilon']))

        heatmap_data = pd.DataFrame(mean_error_matrix, index=param_grid['C'], columns=param_grid['epsilon'])

        plt.figure(figsize=(8, 6))
        sns.heatmap(heatmap_data, annot=True, cmap="YlGnBu", cbar_kws={'label': 'Mean Error'})
        plt.title('Hyperparameter Tuning Heatmap (Mean Error)')
        plt.xlabel('epsilon')
        plt.ylabel('C')
        plt.show()

        min_error_index = np.unravel_index(np.argmin(mean_error_matrix, axis=None), mean_error_matrix.shape)
        # Retrieve the best hyperparameters
        best_C = param_grid['C'][min_error_index[0]]  # Row index corresponds to C values
        best_epsilon = param_grid['epsilon'][min_error_index[1]]

        print(f'Minimal validation loss: {np.min(mean_error_matrix,axis=None)} for C = {best_C} and epsilon = {best_epsilon}')
        plt.plot(y_val_disp,best_model.predict(X_val_disp),'ob')
        plt.plot(range(-3,4),range(-3,4))
        plt.title(f"Exemple of errors for C = {best_C} and eps = {best_epsilon}")
        plt.xlabel("Measured Yield Strength")
        plt.ylabel(f"Estimated Yield Strength")
        plt.show()

    if 'Charpy temperature (deg C)' in X.columns:
        X = X.drop('Charpy temperature (deg C)',axis=1)