In [None]:
import os
from typing import Literal

import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from joblib import parallel_backend
from sklearn.base import ClassifierMixin
from sklearn.metrics import (
    accuracy_score, 
    classification_report, 
    confusion_matrix, 
    f1_score, 
    precision_score, 
    recall_score,
) 
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from tqdm_joblib import tqdm_joblib
from tqdm.notebook import tqdm

In [None]:
INPUT_PATH = "Broken_terrains_datasets"
#EXCLUDE_FILES = ["KSH", "params"]
REAL_DATA_FILE = "KSH_input_output_0.txt"

RANDOM_STATE = 42

SVM = "SVM"

SS_MODE: Literal[1, 2, 3] = 2

X_C_LOWER = 922000
X_C_UPPER = 923500
Y_C_LOWER = 249500
Y_C_UPPER = 251100

In [None]:
if SS_MODE != 2:
    raise ValueError("Only SS_MODE == 2 is supported in this notebook.")

In [None]:
pd.set_option('display.max_columns', None)

np.random.seed(RANDOM_STATE)
random_state = np.random.RandomState(RANDOM_STATE)

In [None]:
files = os.listdir(INPUT_PATH)

# dfs = [
#     pd.read_csv(os.path.join(INPUT_PATH, file), decimal='.', sep=';') 
#     for file in files 
#     if file.endswith('.txt')
#     and not any(exclude in file for exclude in EXCLUDE_FILES)
# ]

dfs = [pd.read_csv(os.path.join(INPUT_PATH, str(file) + ".txt"), decimal='.', sep=';') for file in range(1000)]

In [None]:
df_0 = dfs[4]

fig = plt.figure()
ax = fig.add_subplot(projection='3d')
#ax.scatter(df_0['X_C'], df_0['Y_C'], df_0["Z_C"])
ax.scatter(df_0['X_C'], df_0['Y_C'], df_0["Z_C"], c=df_0['Fault'], cmap='viridis')

In [None]:
df = pd.concat(
    dfs,
    ignore_index=True
)

In [None]:
df.describe()

In [None]:
df

In [None]:
filtered_df=df[
    (df.X_C_Neighbor1!='undefined') 
    & (df.X_C_Neighbor2!='undefined')
    & (df.X_C_Neighbor3!='undefined') 
    & (df.Z_N!=0) 
    & (df.n1_zn!=0) 
    & (df.n2_zn!=0) 
    & (df.n3_zn!=0) 
    & (df.DOC<0.90)  
].reset_index(drop=True)

In [None]:
filtered_df.head(5)

In [None]:
filtered_df.describe()

In [None]:
filtered_df

In [None]:
euclidean_n = ['EuclideanNeighbor1_N', 'EuclideanNeighbor2_N','EuclideanNeighbor3_N']
euclidean_d = ['EuclideanNeighbor1_D', 'EuclideanNeighbor2_D','EuclideanNeighbor3_D']
cosine_n = ['CosineNeighbor1_N', 'CosineNeighbor2_N','CosineNeighbor3_N']
cosine_d = ['CosineNeighbor1_D', 'CosineNeighbor2_D','CosineNeighbor3_D']
angle_n = ['AngleNeighbor1_N', 'AngleNeighbor2_N','AngleNeighbor3_N']
angle_d = ['AngleNeighbor1_D', 'AngleNeighbor2_D','AngleNeighbor3_D']

euclidean_n_sorted = ['Euclidean_N_Max', 'Euclidean_N_Min', 'Euclidean_N_Intermediate']
euclidean_d_sorted = ['Euclidean_D_Max', 'Euclidean_D_Min', 'Euclidean_D_Intermediate']
cosine_n_sorted = ['Cosine_N_Max', 'Cosine_N_Min', 'Cosine_N_Intermediate']
cosine_d_sorted = ['Cosine_D_Max', 'Cosine_D_Min', 'Cosine_D_Intermediate']
angle_n_sorted = ['Angle_N_Max', 'Angle_N_Min', 'Angle_N_Intermediate']
angle_d_sorted = ['Angle_D_Max', 'Angle_D_Min', 'Angle_D_Intermediate']

sorting_pairs = [
    (euclidean_n, euclidean_n_sorted),
    (euclidean_d, euclidean_d_sorted),
    (cosine_n, cosine_n_sorted),
    (cosine_d, cosine_d_sorted),
    (angle_n, angle_n_sorted),
    (angle_d, angle_d_sorted)
]

In [None]:
def sort_values(row: pd.Series, output_columns: list) -> pd.Series:
    """
    Sort Neighbor values in descending order and return a Series with max, intermediate, and min values.

    Parameters
    ----------
    row : pd.Series
        A pandas Series containing Neighbor values.
    output_columns : list
        A list of column names for the output Series.
        Maximum value, intermediate value, minimum value.

    Returns
    -------
    pd.Series
        A pandas Series with the maximum, intermediate, and minimum values.
    """
    max_val = row.max()
    min_val = row.min()
    remaining_val = row.sum() - max_val - min_val
    return pd.Series([max_val, min_val, remaining_val], index=output_columns)

In [None]:
sorted_dfs = [
    filtered_df[list(cols)].apply(sort_values, axis=1, output_columns=list(sorted_cols))
    for cols, sorted_cols in sorting_pairs
]

In [None]:
sorted_df=pd.concat([
    filtered_df[['X_N']],
    filtered_df[['Y_N']],
    filtered_df[['Z_N']],
    filtered_df[['X_D']],
    filtered_df[['Y_D']],
    filtered_df[['Z_D']],   
    *sorted_dfs,
    filtered_df[['File_number']],
    filtered_df[['Fault']]   
    ], 
    axis=1
)
sorted_df.shape

In [None]:
sorted_df

In [None]:
if SS_MODE == 2:
    standardized_df = sorted_df.copy()
    scaler_2_A = StandardScaler()
    file_numbers = standardized_df["File_number"].unique()

    for file_number in file_numbers:
        file_mask = standardized_df["File_number"] == file_number
        standardized_df.loc[
            file_mask,
            standardized_df.columns.difference(["File_number", "Fault"]),
        ] = scaler_2_A.fit_transform(
            standardized_df.loc[
                file_mask,
                standardized_df.columns.difference(["File_number", "Fault"]),
            ]
    )
        
    df_for_downsampling = standardized_df.copy()
else:
    df_for_downsampling = sorted_df.copy()

In [None]:
dfs = []
for file_number in df_for_downsampling["File_number"].unique():
    sub_df = df_for_downsampling[df_for_downsampling["File_number"] == file_number]
    try:
        class_count_0, class_count_1 = sub_df['Fault'].value_counts()
    except ValueError:
        print(f"Skipping file number {file_number} due to only one class present.")
        continue
    class_0 = sub_df[sub_df['Fault'] == -1]
    class_1 = sub_df[sub_df['Fault'] == 1]# print the shape of the class
    print('class 0:', class_0.shape)
    print('class 1:', class_1.shape)

    class_0_under = class_0.sample(class_count_1, random_state=RANDOM_STATE)

    dfs.append(pd.concat([class_0_under, class_1], axis=0, ignore_index=True))
    
undersampled_df = pd.concat(dfs, axis=0, ignore_index=True)

In [None]:
def ordered_train_test_split(
        X: pd.DataFrame,
        y: pd.Series,
        test_size: float = 0.25,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Splits the dataset into training and testing sets while preserving the order of samples.
    It splits based on unique 'File_number' values to avoid data leakage.

    Parameters
    ----------
    X : pd.DataFrame
        Feature set.
    y : pd.Series
        Target labels.
    test_size : float, optional
        Proportion of the dataset to include in the test split (default is 0.25).
    random_state : int, optional
        Random seed for reproducibility (default is RANDOM_STATE).

    Returns
    -------
    tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]
        X_train, X_test, y_train, y_test
    """
    n_terrains = X['File_number'].nunique()
    terrains = X['File_number'].unique()
    train_size = int((1 - test_size) * n_terrains)
    train_terrains = terrains[:train_size]
    test_terrains = terrains[train_size:]

    train_mask = X['File_number'].isin(train_terrains)
    test_mask = X['File_number'].isin(test_terrains)

    X_train = X[train_mask].drop(columns=['File_number'])
    X_test = X[test_mask].drop(columns=['File_number'])
    y_train = y[train_mask]
    y_test = y[test_mask]
    return X_train, X_test, y_train, y_test

In [None]:
X = undersampled_df.drop(columns=['Fault'])
y = undersampled_df['Fault']
y[y == -1] = 0  # Change labels from -1, 1 to 0, 1

X_train, X_test, y_train, y_test = ordered_train_test_split(X, y, test_size=0.25)

In [None]:
if SS_MODE == 1:
    scaler_1_A = StandardScaler()
    X_train = scaler_1_A.fit_transform(X_train)
    
    scaler_1_B = StandardScaler()
    X_test = scaler_1_B.fit_transform(X_test)

if SS_MODE == 3:
    scaler_3 = StandardScaler()
    X_train = scaler_3.fit_transform(X_train)
    X_test = scaler_3.transform(X_test)
    

In [None]:
models = {}
models[SVM] = SVC(kernel="rbf", C=0.05, random_state=RANDOM_STATE)

In [None]:
accuracy, precision, recall, confusion, classification_rep, f1 = {}, {}, {}, {}, {}, {}

for key in models.keys():
    
    # Fit the classifier
    models[key].fit(X_train, y_train)
    
    # Make predictions
    predictions = models[key].predict(X_test)
    
    # Calculate metrics
    accuracy[key] = accuracy_score(predictions, y_test)
    precision[key] = precision_score(predictions, y_test)
    recall[key] = recall_score(predictions, y_test)
    f1[key] = f1_score(predictions, y_test)
    confusion[key]=confusion_matrix(predictions, y_test)
    classification_rep[key]=classification_report(predictions, y_test, target_names=['homocline', 'Fault'])

In [None]:
def print_metrics(model: str) -> None:
    print(f"Metrics for {model}:")
    print(f"Accuracy: {accuracy[model]:.4f}")
    print(f"Precision: {precision[model]:.4f}")
    print(f"Recall: {recall[model]:.4f}")
    print(f"F1 Score: {f1[model]:.4f}")
    print(f"Confusion Matrix:\n{confusion[model]}")
    tn, fp, fn, tp = confusion[model].ravel()
    print(f"TN: {tn}, FP: {fp}, FN: {fn}, TP: {tp}")
    print(f"Classification Report:\n{classification_rep[model]}")

In [None]:
for model in models.keys():
    print_metrics(model)
    print("================")

In [None]:
def grid_search(model: ClassifierMixin, param_grid: dict, scoring: str, cv: int=5) -> ClassifierMixin:
    """
    Perform grid search to find the best hyperparameters for a given model.

    Parameters
    ----------
    model : ClassifierMixin
        The machine learning model to be optimized.
    param_grid : dict
        A dictionary where keys are hyperparameter names and values are lists of possible values.
    scoring : str
        The scoring metric to optimize (e.g., 'f1', 'accuracy').
    cv : int, optional
        The number of cross-validation folds (default is 5).

    Returns
    -------
    ClassifierMixin
        The model with the best found hyperparameters.
    """
    grid_search = GridSearchCV(model, param_grid, scoring=scoring, cv=cv, n_jobs=-1, refit=True, verbose=True)
    with parallel_backend('loky'):
        with tqdm_joblib(tqdm(desc="GridSearch Progress")):
            grid_search.fit(X_train, y_train)
    print(f"Best parameters for {model.__class__.__name__}: {grid_search.best_params_}")
    return grid_search.best_estimator_

In [None]:
param_grid = {}

param_grid[SVM]= {
    'C': [0.1, 1, 10, 100, 1000],  
    'gamma': [1, 0.1, 0.01, 0.001, 0.0001], 
    'kernel': ['rbf', 'linear'],
}  

In [None]:
models_tuning = {}

#models_tuning[SVM] = grid_search(SVC(random_state=RANDOM_STATE), param_grid[SVM], scoring='f1')
models_tuning[SVM] = SVC(kernel="rbf", C=10, gamma=0.1, random_state=RANDOM_STATE)

In [None]:
accuracy_tuning, precision_tuning, recall_tuning, confusion_tuning, classification_rep_tuning, f1_tuning = {}, {}, {}, {}, {}, {}

In [None]:
for key in models_tuning.keys():
    
    # Fit the classifier
    models_tuning[key].fit(X_train, y_train)
    
    # Make predictions
    predictions_tuning = models_tuning[key].predict(X_test)
    
    # Calculate metrics
    accuracy_tuning[key] = accuracy_score(predictions_tuning, y_test)
    precision_tuning[key] = precision_score(predictions_tuning, y_test)
    recall_tuning[key] = recall_score(predictions_tuning, y_test)
    f1_tuning[key] = f1_score(predictions_tuning, y_test)
    confusion_tuning[key]=confusion_matrix(predictions_tuning, y_test)
    classification_rep_tuning[key]=classification_report(predictions_tuning, y_test, target_names=['homocline', 'Fault'])


In [None]:
def print_metrics_tuning(model: str) -> None:
    print(f"Metrics for {model} after tuning:")
    print("Model parameters:", models_tuning[model].get_params())
    print(f"Accuracy: {accuracy_tuning[model]:.4f}")
    print(f"Precision: {precision_tuning[model]:.4f}")
    print(f"Recall: {recall_tuning[model]:.4f}")
    print(f"F1 Score: {f1_tuning[model]:.4f}")
    print(f"Confusion Matrix:\n{confusion_tuning[model]}")
    tn, fp, fn, tp = confusion_tuning[model].ravel()
    print(f"TN: {tn}, FP: {fp}, FN: {fn}, TP: {tp}")
    print(f"Classification Report:\n{classification_rep_tuning[model]}")

In [None]:
for model in models_tuning.keys():
    print_metrics_tuning(model)
    print("================")

In [None]:
#Testing real data

In [None]:
real_raw_df = pd.read_csv(INPUT_PATH + "/" + REAL_DATA_FILE, decimal=".", sep=";")
real_raw_df.shape

In [None]:
real_raw_df

In [None]:
real_raw_df.describe()

In [None]:
real_filtered_df=real_raw_df[
    (real_raw_df.X_C_Neighbor1!='undefined') 
    & (real_raw_df.X_C_Neighbor2!='undefined')
    & (real_raw_df.X_C_Neighbor3!='undefined') 
    & (real_raw_df.Z_N!=0) 
    & (real_raw_df.n1_zn!=0) 
    & (real_raw_df.n2_zn!=0) 
    & (real_raw_df.n3_zn!=0) 
    & (real_raw_df.DOC<0.90)  
].reset_index(drop=True)
real_filtered_df.shape

In [None]:
real_sorted_dfs = [
    real_filtered_df[list(cols)].apply(sort_values, axis=1, output_columns=list(sorted_cols))
    for cols, sorted_cols in sorting_pairs
]

In [None]:
real_sorted_df=pd.concat([
    real_filtered_df[["X_C","Y_C","Z_C"]],
    real_filtered_df[['X_N']],
    real_filtered_df[['Y_N']],
    real_filtered_df[['Z_N']],
    real_filtered_df[['X_D']],
    real_filtered_df[['Y_D']],
    real_filtered_df[['Z_D']],   
    *real_sorted_dfs,
    ], 
    axis=1
)
real_sorted_df.shape

In [None]:
real_X = real_sorted_df.drop(columns=["X_C", "Y_C", "Z_C"])

if SS_MODE == 1:
    scaler_1_C = StandardScaler()
    real_X = scaler_1_C.fit_transform(real_X)
elif SS_MODE == 2:
    scaler_2_B = StandardScaler()
    real_X = scaler_2_B.fit_transform(real_X)
elif SS_MODE == 3:
    real_X = scaler_3.transform(real_X)

In [None]:
real_predictions = {
    model: models_tuning[model].predict(real_X)
    for model in models_tuning.keys()
}

In [None]:
results_df = pd.concat(
    [
        real_sorted_df[["X_C", "Y_C", "Z_C"]],
        pd.DataFrame(real_predictions),
    ],
    axis=1
)

results_df

In [None]:
results_df.SVM.mean()

In [None]:
real_sorted_df_subset = real_sorted_df.query(f'{X_C_LOWER} <= X_C <= {X_C_UPPER} and {Y_C_LOWER} <= Y_C <= {Y_C_UPPER}')
real_X_subset = real_sorted_df_subset.drop(columns=["X_C", "Y_C", "Z_C"])

if SS_MODE == 1:
    scaler_1_C = StandardScaler()
    real_X_subset  = scaler_1_C.fit_transform(real_X_subset )
elif SS_MODE == 2:
    scaler_2_B = StandardScaler()
    real_X_subset  = scaler_2_B.fit_transform(real_X_subset )
elif SS_MODE == 3:
    real_X_subset  = scaler_3.transform(real_X_subset)

In [None]:
real_predictions_subset = {
    model: models_tuning[model].predict(real_X_subset)
    for model in models_tuning.keys()
}

In [None]:
preds_subset = pd.DataFrame(real_predictions_subset)
preds_subset.index = real_sorted_df_subset.index

results_df_subset = pd.concat(
    [
        real_sorted_df_subset[["X_C", "Y_C", "Z_C"]],
        preds_subset,
    ],
    axis=1
)

results_df_subset

In [None]:
inserted_df = results_df.copy()
inserted_df.loc[results_df_subset.index, SVM] = results_df_subset[SVM]

In [None]:
changes_df = results_df.copy()
changes = results_df[SVM] != inserted_df[SVM]
changes_df.loc[changes, SVM] = 2

In [None]:
model = SVM

cmap = {0: 'aqua', 1: 'magenta', 2: 'yellow'}
legend_labels_1 = ['homocline', 'fault']
legend_labels_2 = ['homocline', 'fault', '\'unstable\'']
legend_colors_1 = [mpatches.Patch(color=cmap[i], label=legend_labels_1[i]) for i in range(2)]
legend_colors_2 = [mpatches.Patch(color=cmap[i], label=legend_labels_2[i]) for i in range(3)]

fig, ax = plt.subplots(2, 2, figsize=(6.4*2, 4.8*2))

axis = ax.ravel()

axis[0].scatter(results_df["Y_C"], results_df["X_C"], s=2, color=results_df[model].map(cmap))
axis[1].scatter(results_df_subset["Y_C"], results_df_subset["X_C"], s=2, color=results_df_subset[model].map(cmap))
axis[2].scatter(inserted_df["Y_C"], inserted_df["X_C"], s=2, color=inserted_df[model].map(cmap))
axis[3].scatter(changes_df["Y_C"], changes_df["X_C"], s=2, color=changes_df[model].map(cmap))

axis[0].set_title("Predictions for the entire dataset")
axis[1].set_title("Predictions for the subset")
axis[2].set_title("Predictions for the entire dataset with inserted subset")
axis[3].set_title("Changes after inserting the subset")

for i in range(len(axis)):
    axis[i].set_xlabel("Y_C")
    axis[i].set_ylabel("X_C")
    axis[i].tick_params(axis='x', rotation=45)
    axis[i].axhline(y=X_C_LOWER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axhline(y=X_C_UPPER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axvline(x=Y_C_LOWER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axvline(x=Y_C_UPPER, color='k', linestyle='--', linewidth=0.5)
    axis[i].legend(handles=legend_colors_1 if i != 3 else legend_colors_2, loc='upper right', fontsize='small', title='classes')

fig.suptitle(f"{model} predictions, StandardScaler method: {SS_MODE}")

plt.tight_layout()

fig.savefig(f"ordered_split_scaling_method_{SS_MODE}.svg", format="svg", dpi=1200)

In [None]:
model = SVM

cmap = {0: 'aqua', 1: 'magenta', 2: 'yellow'}
legend_labels_1 = ['homocline', 'fault']
legend_labels_2 = ['homocline', 'fault', '\'unstable\'']
legend_colors_1 = [mpatches.Patch(color=cmap[i], label=legend_labels_1[i]) for i in range(2)]
legend_colors_2 = [mpatches.Patch(color=cmap[i], label=legend_labels_2[i]) for i in range(3)]

fig, ax = plt.subplots(2, 2, figsize=(6.4*2, 4.8*2))

axis = ax.ravel()

axis[0].scatter(results_df["Y_C"], results_df["X_C"], s=2, color=results_df[model].map(cmap))
axis[1].scatter(results_df_subset["Y_C"], results_df_subset["X_C"], s=2, color=results_df_subset[model].map(cmap))
axis[2].scatter(inserted_df["Y_C"], inserted_df["X_C"], s=2, color=inserted_df[model].map(cmap))
axis[3].scatter(changes_df["Y_C"], changes_df["X_C"], s=2, color=changes_df[model].map(cmap))

axis[0].set_title("Predictions for the entire dataset")
axis[1].set_title("Predictions for the subset")
axis[2].set_title("Predictions for the entire dataset with inserted subset")
axis[3].set_title("Changes after inserting the subset")

for i in range(len(axis)):
    axis[i].set_xlabel("Y_C")
    axis[i].set_ylabel("X_C")
    axis[i].tick_params(axis='x', rotation=45)
    axis[i].axhline(y=X_C_LOWER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axhline(y=X_C_UPPER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axvline(x=Y_C_LOWER, color='k', linestyle='--', linewidth=0.5)
    axis[i].axvline(x=Y_C_UPPER, color='k', linestyle='--', linewidth=0.5)
    axis[i].legend(handles=legend_colors_1 if i != 3 else legend_colors_2, loc='upper right', fontsize='small', title='classes')

    rect = mpatches.Rectangle(
        (Y_C_LOWER, X_C_LOWER),
        Y_C_UPPER - Y_C_LOWER,
        X_C_UPPER - X_C_LOWER,
        linewidth=1.5,
        edgecolor='r',
        facecolor='none'
    )
    axis[i].add_patch(rect)


center_y = (Y_C_LOWER + Y_C_UPPER) / 2
center_x = (X_C_LOWER + X_C_UPPER) / 2

ax_start = axis[1]
ax_end = axis[2]

arrow = mpatches.ConnectionPatch(
    xyA=(center_y, center_x),
    xyB=(center_y, center_x),
    coordsA="data",
    coordsB="data",
    axesA=ax_start,
    axesB=ax_end,
    color="red",
    arrowstyle="->,head_length=4,head_width=3",
    linewidth=2
)
fig.add_artist(arrow)
fig.suptitle(f"{model} predictions, StandardScaler method: {SS_MODE}")

plt.tight_layout()

fig.savefig(f"ordered_split_scaling_method_{SS_MODE}_arrow.svg", format="svg", dpi=1200)