In [None]:
#import depencies
import os
import sys
from pathlib import Path

try:
    # Get the current working directory
    current_dir = os.getcwd()

    # Set the root directory to the parent of the current directory
    root_dir = Path(current_dir).parent

    # Add the root directory to sys.path so Python can find the utils module
    sys.path.append(str(root_dir))
    print(f"Added {root_dir} to Python path")

    # Standard libraries
    import numpy as np
    import pandas as pd
    import itertools
    import h5py

    # Data processing and visualization
    import matplotlib.pyplot as plt
    import seaborn as sns
    from scipy import signal, stats
    import pywt
    from tqdm import tqdm

    # Machine learning
    from sklearn.model_selection import train_test_split
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import classification_report, confusion_matrix
    from imblearn.over_sampling import SMOTE, SMOTENC
    import lightgbm as lgb
    

    # Custom utilities
    from utils import data_loader_utils
    from utils.feature_extraction import transform_data
    from utils.model_validation import perform_cross_validation

    from imblearn.over_sampling import SMOTE
    from imblearn.under_sampling import RandomUnderSampler

    from sklearn.datasets import make_classification
    from sklearn.metrics import f1_score
    from sklearn.svm import OneClassSVM
    # create and display confusion matrix
    from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
    import matplotlib.pyplot as plt


    print("Dependencies loaded successfully ✅")
except Exception as e:
    print(f"Error loading dependencies: {e}")


In [None]:
def load_data(exclude_processes=None):
    """
    Load data from all machines and processes, with option to exclude specific processes.

    Args:
        exclude_processes (list, optional): List of process names to exclude from loading.

    Returns:
        tuple: (X_data, y_data, y_binary) containing features, full labels, and binary labels
    """
    machines = ["M01","M02","M03"]
    process_names = ["OP00","OP01","OP02","OP03","OP04","OP05","OP06","OP07","OP08","OP09","OP10","OP11","OP12","OP13","OP14"]
    labels = ["good","bad"]
    
    # Filter out excluded processes if any
    if exclude_processes:
        process_names = [p for p in process_names if p not in exclude_processes]
    
    path_to_dataset = os.path.join(root_dir, "data")
    
    X_data = []
    y_data = []
    
    try:
        # Calculate total number of combinations
        total_combinations = len(process_names) * len(machines) * len(labels)
        
        # Create progress bar
        with tqdm(total=total_combinations, desc="Loading data") as pbar:
            for process_name, machine, label in itertools.product(process_names, machines, labels):
                data_path = os.path.join(path_to_dataset, machine, process_name, label)
                data_list, data_label = data_loader_utils.load_tool_research_data(data_path, label=label)
                X_data.extend(data_list)
                y_data.extend(data_label)
                pbar.update(1)
                pbar.set_postfix({"Samples": len(X_data)})
                
        print(f"Data loaded successfully ✅ - {len(X_data)} samples")
    except Exception as e:
        print(f"Error loading data: {e}")
    
    # Generate binary labels from full label strings
    y_binary = [0 if label_str.split("_")[-1] == "good" else 1 for label_str in y_data]

    return X_data, y_data, y_binary

import pandas as pd
from sklearn.metrics import f1_score, confusion_matrix

def create_results_df():
    """
    Initialize an empty DataFrame to store experiment results:
      – M01_pct, M02_pct, M03_pct: fractions used in the train split
      – train_normals, train_anomalies: counts before SMOTE
      – train_resampled_normals, train_resampled_anomalies: counts after SMOTE
      – test_normals, test_anomalies: counts in the test set
      – f1_score: F1 on the test set
      – tn, fp, fn, tp: confusion matrix entries
    """
    cols = [
        'M01_pct','M02_pct','M03_pct',
        'train_normals','train_anomalies',
        'train_resampled_normals','train_resampled_anomalies',
        'test_normals','test_anomalies',
        'f1_score','tn','fp','fn','tp', 'confusion_matrix', 'experiment_id'
    ]
    return pd.DataFrame(columns=cols)

def record_result(
    df,
    m01_pct, m02_pct, m03_pct,
    trainy, trainy_resampled,
    testy, f1, confusion_matrix,
    experiment_id=None
):
    tn, fp, fn, tp = confusion_matrix.ravel()

    train_normals  = sum(1 for y in trainy           if y == 0)
    train_anomalies = sum(1 for y in trainy           if y == 1)
    res_normals     = sum(1 for y in trainy_resampled if y == 0)
    res_anomalies   = sum(1 for y in trainy_resampled if y == 1)
    test_normals    = sum(1 for y in testy            if y == 0)
    test_anomalies  = sum(1 for y in testy            if y == 1)

    df.loc[len(df)] = [
        m01_pct, m02_pct, m03_pct,
        train_normals, train_anomalies,
        res_normals, res_anomalies,
        test_normals, test_anomalies,
        f1, tn, fp, fn, tp,
        confusion_matrix,
        experiment_id
    ]
    return df

In [None]:
#0 == good | 1 == bad |
X, y, y_binary = load_data()

Xtrain, Xtest, ytrain, ytest = train_test_split(X, y_binary, test_size=0.2, random_state=42, stratify=y_binary)



In [None]:
trainX, testX, trainy, testy = train_test_split(X, y_binary, test_size=0.2, random_state=42, stratify=y_binary)

print(f"Train set size: {len(trainX)} samples")
print(f"Test set size: {len(testX)} samples")

# transform and resample
trainX_tr, trainy_tr = transform_data(trainX, trainy, label_type='binary')

smote = SMOTE(k_neighbors=5, random_state=42)

rus = RandomUnderSampler(sampling_strategy=0.5, random_state=42)
trainX_tr, trainy_tr = rus.fit_resample(trainX_tr, trainy_tr)
trainX_tr_resampled, trainy_tr_resampled = smote.fit_resample(trainX_tr, trainy_tr)
testX_tr, testy_tr = transform_data(testX, testy, label_type='binary')

    # Train Random Forest classifier with optimized hyperparameters
RF = RandomForestClassifier(max_features='log2', 
                            n_estimators=150,
                            max_depth=15,
                            min_samples_leaf=1,
                            min_samples_split=2,
                            random_state=42)

RF.fit(trainX_tr_resampled, trainy_tr_resampled)

# Evaluate the model
yhat = RF.predict(testX_tr)
score = f1_score(testy_tr, yhat, pos_label=1, average='binary')
cm = confusion_matrix(testy_tr, yhat)
# record results
record_result(result_df_RF, 0, 0, 0, trainy, trainy_tr_resampled, testy, score, cm, experiment_id='exp0_no_machine_adoption_random_split')


In [None]:
result_df_RF = create_results_df()

In [None]:
def run_experiment(m01, m02, m03):
    '''Run one-class SVM experiment with specified machine fractions.'''   
    Xtrain, Xtest, ytrain, ytest = train_test_split(X, y_binary, test_size=0.2, random_state=42, stratify=y_binary)

    machine_train_frac = {'M01': m01, 'M02': m02, 'M03': m03}
    trainX, trainy, testX, testy = [], [], [], []
    
    for machine, frac in machine_train_frac.items():
        # filter samples for this machine
        data = [(x_i, y_i) for x_i, y_i in zip(Xtrain, ytrain)]
        X_m = [d[0] for d in data if d[1].split('_')[0] == machine]
        y_m = [0 if d[1].split('_')[-1] == 'good' else 1 for d in data if d[1].split('_')[0] == machine]
        if frac == 1.0:
            trainX.extend(X_m); trainy.extend(y_m)
        elif frac == 0.0:
            testX.extend(X_m); testy.extend(y_m)
        else:
            X_tr, X_te, y_tr, y_te = train_test_split(
                X_m, y_m, train_size=frac, stratify=y_m, random_state=42
            )
            trainX.extend(X_tr); trainy.extend(y_tr)
            testX.extend(X_te); testy.extend(y_te)

    # transform and resample
    trainX_tr, trainy_tr = transform_data(trainX, trainy, label_type='binary')
    testX_tr, testy_tr = transform_data(Xtest, ytest, label_type='binary')
    # Print class distribution before resampling
    print(f"Class distribution before resampling: {pd.Series(trainy_tr).value_counts()}")
    
    # Apply RandomUnderSampler before SMOTE for better balance
    # This helps reduce the majority class before applying SMOTE
    rus = RandomUnderSampler(random_state=42)
    trainX_tr, trainy_tr = rus.fit_resample(trainX_tr, trainy_tr)
    
    # Print class distribution after undersampling
    print(f"Class distribution after undersampling: {pd.Series(trainy_tr).value_counts()}")
    smote = SMOTE(random_state=42)
    trainX_tr_resampled, trainy_tr_resampled = smote.fit_resample(trainX_tr, trainy_tr)
    

        # Train Random Forest classifier with optimized hyperparameters
    RF = RandomForestClassifier(max_features='log2', 
                                n_estimators=150,
                                max_depth=15,
                                min_samples_leaf=1,
                                min_samples_split=2,
                                random_state=42,
                                n_jobs=-1)

    RF.fit(trainX_tr_resampled, trainy_tr_resampled)
    # Evaluate the model
    yhat = RF.predict(testX_tr)
    score = f1_score(testy_tr, yhat, pos_label=1, average='binary')
    cm = confusion_matrix(testy_tr, yhat)
    # record results
    record_result(result_df_RF, m01, m02, m03, trainy, trainy_tr_resampled, testy, score, cm)
    
# e.g. experiment 1: vary M02
for m02 in np.arange(0, 0.51, 0.05):
    run_one_class_experiment(1.0, m02, 0.0)
    result_df_RF.loc[result_df_RF.index[-1], 'experiment_id'] = 'exp1_vary_M02'

# e.g. experiment 2: vary M03
for m03 in np.arange(0.05, 0.51, 0.05):
    run_one_class_experiment(1.0, 0.0, m03)
    result_df_RF.loc[result_df_RF.index[-1], 'experiment_id'] = 'exp2_vary_M03'

# e.g. experiment 3: vary M02 and M03 together
for frac in np.arange(0.05, 0.51, 0.05):
    run_one_class_experiment(1.0, frac, frac)
    result_df_RF.loc[result_df_RF.index[-1], 'experiment_id'] = 'exp3_vary_both'

# Display the compiled results
result_df_RF

In [None]:
path = os.path.join(os.path.dirname(os.getcwd()), 'export')
result_df_RF.to_csv(path + '/results/result_df_RF_0.csv', index=False)

In [None]:
result_df_RF

In [None]:
    for machine, frac in machine_train_frac.items():
        # filter samples for this machine
        data = [(x_i, y_i) for x_i, y_i in zip(Xtrain, ytrain)]
        X_m = [d[0] for d in data if d[1].split('_')[0] == machine]
        y_m = [0 if d[1].split('_')[-1] == 'good' else 1 for d in data if d[1].split('_')[0] == machine]
        if frac == 1.0:
            trainX.extend(X_m); trainy.extend(y_m)
        elif frac == 0.0:
            testX.extend(X_m); testy.extend(y_m)
        else:
            X_tr, X_te, y_tr, y_te = train_test_split(
                X_m, y_m, train_size=frac, stratify=y_m, random_state=42
            )
            trainX.extend(X_tr); trainy.extend(y_tr)
            testX.extend(X_te); testy.extend(y_te)