In [3]:
########---IMPORTS---#################

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
!pip install scikeras

Collecting scikeras
  Downloading scikeras-0.13.0-py3-none-any.whl.metadata (3.1 kB)
Downloading scikeras-0.13.0-py3-none-any.whl (26 kB)
Installing collected packages: scikeras
Successfully installed scikeras-0.13.0


In [6]:
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import make_scorer, accuracy_score, f1_score, mean_absolute_percentage_error
import numpy as np
from sklearn.model_selection import KFold
from scipy.stats import pearsonr
from sklearn.dummy import DummyRegressor, DummyClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LinearRegression
from scikeras.wrappers import KerasClassifier, KerasRegressor
import pandas as pd
import glob
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dropout, Dense, LSTM
from joblib import Parallel, delayed

In [7]:
#########---PATHS---#############
random = '/content/drive/MyDrive/data_driven_shm/random_data'
balanced = '/content/drive/MyDrive/data_driven_shm/Balanced_data'
test = '/content/drive/MyDrive/data_driven_shm/test_classification'


#########---INPUTS---#############

n_points_list = [375, 460, 750]
transformation_list = ['none', 'fourier']
noise_levels = [2, 5, 10]
damage_percentage = [0.3 , 0.5 ,1]

In [8]:
####---FOURIER---######
def fourier(sample_sensor):
    '''
    The input is a signal
    The output is the amplitude and the frequency of the fft of the signal
    '''
    import numpy as np
    fs = 1/1000
    fourier = np.fft.fft(sample_sensor)
    freqs = np.fft.fftfreq(sample_sensor.size,d=fs)
    power_spectrum = np.abs(fourier)
    power_spectrum = np.log(power_spectrum)

    return power_spectrum,freqs

In [9]:
def random_forest_reg():
    from sklearn.ensemble import RandomForestRegressor
    rf = RandomForestRegressor()
    return rf



def linear_regression():

    from sklearn.linear_model import LinearRegression
    lr = LinearRegression()

    return lr



def svc():
    from sklearn.svm import SVC
    svm =SVC(C=100,gamma=0.001,kernel='rbf')

    return svm

def random_forest_clf():
    from sklearn.ensemble import RandomForestClassifier
    rf = RandomForestClassifier(n_estimators=500,criterion='entropy')

    return rf



##################----------- NNs-----------##################
from scikeras.wrappers import KerasClassifier, KerasRegressor



def keras_mlp_regressor(input_shape):
    '''
    paizei kala mono gia scaled !!!!!!!!!!!!!!!!!!!!!

    '''
    import tensorflow as tf
    from tensorflow import keras
    from keras.models import Sequential
    from keras.layers import Flatten,Dense

    mlp = Sequential()
    mlp.add(Dense(256, activation='sigmoid', input_shape=input_shape))
    # Dense layer 2 (128 neurons)
    mlp.add(Dense(128, activation='sigmoid'))
    mlp.add(Dense(64, activation='sigmoid'))
    #mlp.add(Dense(32, activation='sigmoid'))
    # Output layer (10 classes)
    mlp.add(Dense(10, activation='sigmoid'))
    mlp.add(Dense(1, activation='linear'))

    mlp.compile(loss="mean_absolute_error", optimizer="adam")

    return mlp

def keras_mlp_classifier(input_shape):
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense

    model = Sequential()
    model.add(Dense(256, activation='sigmoid', input_shape=input_shape))
    model.add(Dense(128, activation='sigmoid'))
    model.add(Dense(64, activation='sigmoid'))
    model.add(Dense(4))  # No activation for logits
    model.compile(optimizer='adam',
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])


    return model


def keras_cnn_regressor(input_shape):
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dropout, Dense

    model = Sequential([
        Conv1D(16, 3, padding='same', activation='relu', input_shape=input_shape),
        MaxPooling1D(),
        Conv1D(32, 3, padding='same', activation='relu'),
        MaxPooling1D(),
        Conv1D(64, 3, padding='same', activation='relu'),
        MaxPooling1D(),
        Flatten(),
        Dropout(0.2),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss="mean_absolute_error")

    return model


def keras_cnn_classifier(input_shape):
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dropout, Dense

    model = Sequential([
        Conv1D(16, 3, padding='same', activation='relu', input_shape=input_shape),
        MaxPooling1D(),
        Conv1D(32, 3, padding='same', activation='relu'),
        MaxPooling1D(),
        Conv1D(64, 3, padding='same', activation='relu'),
        MaxPooling1D(),
        Flatten(),
        Dropout(0.2),
        Dense(4)
    ])
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

    return model


def keras_lstm_regressor(input_shape):
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dropout, Dense, LSTM
    model = Sequential([
        LSTM(100, return_sequences=True, input_shape=input_shape),
        Dropout(0.3),
        LSTM(50),
        Dropout(0.3),
        Dense(50, activation="relu"),
        Dense(1)
    ])
    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['mae'])

    return model


def keras_lstm_classifier(input_shape):
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dropout, Dense, LSTM
    model = Sequential([
        LSTM(100, return_sequences=True, input_shape=input_shape),
        Dropout(0.3),
        LSTM(50),
        Dropout(0.3),
        Dense(50, activation="relu"),
        Dense(4, activation="softmax")
    ])
    model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics=['accuracy'])

    return model

In [10]:
########---- X AND Y SET---########
########---- X AND Y SET---########


def X_set(path, transformation, n_points, noise_percent=None):
    '''
    transformations: 'none', 'fourier'
    - For 'none': returns truncated raw time-series (first n_points).
    - For 'fourier': returns truncated FFT amplitude and frequency vectors (first n_points).
    - Noise (if noise_percent is provided) is added before transformation.
    '''
    import os
    import glob
    import numpy as np
    import pandas as pd

    sensor_data_list = []
    name_list = []

    for filename in sorted(glob.glob(os.path.join(path, "data*"))):
        filename = filename.removesuffix('.csv')
        name_list.append(filename)

    sensor_data = pd.DataFrame({'name': name_list})
    sensor_data['sensor_index_number'] = [int(i.split('_')[-1]) for i in sensor_data['name']]
    sensor_data = sensor_data.sort_values(by='sensor_index_number')
    new_names = [name + '.csv' for name in sensor_data['name']]

    for filename in new_names:
        df = pd.read_csv(filename, sep=' |,', engine='python').dropna()
        sensor_data_list.append(df)

    freq_list = []
    power_spectrum_list = []
    sensor_names = ['s2', 's3', 's4']

    for sensor in sensor_names:
        for i in range(len(sensor_data_list)):
            sample_sensor = sensor_data_list[i][sensor].values[:n_points]

            if noise_percent is not None:
                sample_sensor = add_noiz(sample_sensor.reshape(1, -1), noise_percent).flatten()

            if transformation == 'fourier':
                amp, freq = fourier(sample_sensor)
                amp = amp[:n_points]
                power_spectrum = amp

                if sensor == 's2':  # Only append freq once per file
                    freq = freq[:n_points]
                    freq_list.append(freq)

            elif transformation == 'none':
                power_spectrum = sample_sensor

            power_spectrum_list.append(power_spectrum)

    num_samples = len(power_spectrum_list) // 3
    sensor2_vector = power_spectrum_list[0:num_samples]
    sensor3_vector = power_spectrum_list[num_samples:2 * num_samples]
    sensor4_vector = power_spectrum_list[2 * num_samples:3 * num_samples]

    X = np.concatenate((sensor2_vector, sensor3_vector, sensor4_vector), axis=1)

    return X, sensor2_vector, sensor3_vector, sensor4_vector, freq_list



def y_set(path):

    '''
    select column ['dmg'] which is the damage percentage for regression or ['defect'] which is the defect for classification

    '''
    import numpy as np
    import pandas as pd
    import os
    import glob

    dmg_list = []
    name_list = []
    case_list = []
    defect_list =[]
    for filename in glob.glob(os.path.join(path , "meta*")):
        df = pd.read_csv(filename,sep=' |,', engine='python')
        dmg_perc = df['Damage_percentage']
        case = df['caseStudey'][0]
        dmg_perc = dmg_perc[0]
        dmg_list.append(dmg_perc)
        filename = filename.removesuffix('.csv')

        df_defect = df['DamageLayer1'][0] + df['DamageLayer3'][0] + df['DamageLayer5'][0]
        dm_defect = df['DamageLayer1'][1] + df['DamageLayer3'][1] + df['DamageLayer5'][1]
        dd_defect = df['DamageLayer2'][0] + df['DamageLayer4'][0]

        if df_defect ==0 and dm_defect ==0 and dd_defect ==0:
            defect_list.append('clean')
        elif df_defect !=0 and dm_defect !=0 and dd_defect !=0:
            defect_list.append('all defect modes')
        elif df_defect !=0 and dm_defect ==0 and dd_defect ==0:
            defect_list.append('df')
        elif df_defect ==0 and dm_defect !=0 and dd_defect ==0:
            defect_list.append('dm')
        elif df_defect ==0 and dm_defect ==0 and dd_defect !=0:
            defect_list.append('dd')
        else:
            defect_list.append('all defect modes')

        name_list.append(filename)
        case_list.append(case)

    dmg_data = pd.DataFrame({'dmg':dmg_list,'damage_file_name':name_list,'caseStudey':case_list,'defect':defect_list})
    dmg_data['dmg_index_number'] = [int(i.split('_')[-1]) for i in dmg_data['damage_file_name']]
    dmg_data = dmg_data.sort_values(by=['dmg_index_number'])
    return dmg_data

In [11]:
#### NOISE#####


def add_noiz(X, noise_percent):
    import numpy as np
    '''
    Adds Gaussian noise to X.
    Handles both 2D numpy arrays and lists of 1D arrays (e.g. from X_set with 'fourier' or 'none').
    '''
    if isinstance(X, np.ndarray):
        # Case 1: 2D numpy array (samples x features)
        std_dev = np.std(X, axis=0)
        noise = np.random.randn(*X.shape) * (noise_percent / 100.0) * std_dev
        return X + noise

    elif isinstance(X, list) or isinstance(X, tuple):
        X_noisy = []
        for sample in X:
            sample = np.asarray(sample)
            std_dev = np.std(sample)
            noise = np.random.randn(*sample.shape) * (noise_percent / 100.0) * std_dev
            noisy_sample = sample + noise
            X_noisy.append(noisy_sample)
        return X_noisy

In [12]:
######---PEARSON CORRELATION---##########

def p_val(y_true, y_pred):
    return pearsonr(y_true, y_pred)[1]

In [13]:
#### --- DATA MIXER---####

def data_mixer(X_1,y_1,X_2,y_2,first_percentage,second_percentage):
    from sklearn.model_selection import train_test_split
    import numpy as np
    if first_percentage == 1:
        X_1_half = X_1
        y_1_half = y_1
    else:
        X_1_half, X_drop, y_1_half, y_drop = train_test_split(X_1, y_1, test_size=1-first_percentage,shuffle=True)

    if second_percentage ==1:
        X_2_half = X_2
        y_2_half = y_2
    else:
        X_2_half, X_drop, y_2_half, y_drop = train_test_split(X_2, y_2, test_size=1-second_percentage,shuffle=True)

    X_train = np.concatenate((X_1_half,X_2_half),axis=0)
    y_train = np.concatenate((y_1_half,y_2_half),axis=0)
    return X_train,y_train


In [14]:
import tensorflow as tf
# --- Regression experiment ---
def run_regression_fold(model_fn, X, X_dl, y, train_idx, test_idx, is_dl):
    model = model_fn()
    X_train = X_dl[train_idx] if is_dl else X[train_idx]
    X_test = X_dl[test_idx] if is_dl else X[test_idx]
    model.fit(X_train, y[train_idx])
    preds = model.predict(X_test)
    mape = mean_absolute_percentage_error(y[test_idx], preds)
    pval = p_val(y[test_idx], preds)
    return mape, pval, preds, y[test_idx]


def regression_experiment_run():
    for n_points in n_points_list:
        all_results = []

        for transformation in transformation_list:
            y_random = y_set(random)['dmg']
            y_data = y_set(balanced)['dmg']

            for noise_percent in noise_levels:
                X_random = X_set(random, transformation, n_points, noise_percent=noise_percent)[0]
                X_data = X_set(balanced, transformation, n_points, noise_percent=noise_percent)[0]

                for perc in damage_percentage:
                    X_mixed, y_mixed = data_mixer(X_data, y_data, X_random, y_random, perc, perc)

                    scaler = StandardScaler()
                    X= scaler.fit_transform(X_mixed)
                    y = y_mixed

                    X_dl = np.expand_dims(X, axis=-1)
                    input_shape = X.shape[1]

                    model_fns = {
                        'LinearRegression': lambda: linear_regression()
                        ,
                        'RandomForest': lambda: random_forest_reg(),
                        'MLP': lambda: KerasRegressor(model=keras_mlp_regressor, model__input_shape=(input_shape,), epochs=150, batch_size=64, verbose=0),
                        'CNN': lambda: KerasRegressor(model=keras_cnn_regressor, model__input_shape=(input_shape, 1), epochs=150, batch_size=64, verbose=0),
                        'LSTM': lambda: KerasRegressor(model=keras_lstm_regressor, model__input_shape=(input_shape, 1), epochs=150, batch_size=64, verbose=0),
                    }

                    cv = KFold(n_splits=5, shuffle=True, random_state=1)

                    for name, model_fn in model_fns.items():
                        is_dl = name in ['MLP', 'CNN', 'LSTM']
                        tasks = [
                            delayed(run_regression_fold)(model_fn, X, X_dl, y, train_idx, test_idx, is_dl)
                            for train_idx, test_idx in cv.split(X)
                        ]
                        results_fold = Parallel(n_jobs=2, backend='loky')(tasks)
                        mape_scores, pval_scores, preds_list, y_true_list = zip(*results_fold)

                        all_results.append({
                            'n_points': n_points,
                            'transformation': transformation,
                            'noise_percent': noise_percent,
                            'data_percentage': perc,
                            'model': name,
                            'mean_mape': np.mean(mape_scores),
                            'std_mape': np.std(mape_scores),
                            'pval': np.mean(pval_scores),
                            'last_fold_preds': preds_list[-1].tolist(),
                            'last_fold_true': y_true_list[-1].tolist()
                        })

            df = pd.DataFrame(all_results)
            df.to_csv(f'regression_results_n{n_points}.csv', index=False)



In [15]:
# --- Classification experiment ---
def run_classification_fold(model_fn, X, X_dl, y, train_idx, test_idx, is_dl):
    model = model_fn()
    X_train = X_dl[train_idx] if is_dl else X[train_idx]
    X_test = X_dl[test_idx] if is_dl else X[test_idx]
    model.fit(X_train, y[train_idx])
    preds = model.predict(X_test)
    acc = accuracy_score(y[test_idx], preds)
    f1 = f1_score(y[test_idx], preds, average='macro')
    return acc, f1, preds, y[test_idx]



def classification_experiment_run():
    for n_points in n_points_list:
        all_results = []

        for transformation in transformation_list:
            y_data = y_set(balanced)['defect']
            y_test = y_set(test)['defect']

            label_map = {label: i for i, label in enumerate(set(y_data) | set(y_test))}
            y_data = np.array([label_map[label] for label in y_data])
            y_test = np.array([label_map[label] for label in y_test])

            for noise_percent in noise_levels:
                X_data = X_set(balanced, transformation, n_points, noise_percent=noise_percent)[0]
                X_test = X_set(test, transformation, n_points, noise_percent=noise_percent)[0]
                for perc in damage_percentage:
                    X_mixed, y_mixed = data_mixer(X_data, y_data, X_test, y_test, perc, perc)

                    scaler = StandardScaler()
                    X = scaler.fit_transform(X_mixed)
                    y = y_mixed
                    X_dl = np.expand_dims(X, axis=-1)
                    input_shape = X.shape[1]

                    model_fns = {
                        'SVC': lambda: svc()
                       ,
                        'RandomForest': lambda: random_forest_clf(),
                        'MLP': lambda: KerasClassifier(model=keras_mlp_classifier, model__input_shape=(input_shape,), epochs=150, batch_size=64, verbose=0),
                        'CNN': lambda: KerasClassifier(model=keras_cnn_classifier, model__input_shape=(input_shape, 1), epochs=150, batch_size=64, verbose=0),
                        'LSTM': lambda: KerasClassifier(model=keras_lstm_classifier, model__input_shape=(input_shape, 1), epochs=150, batch_size=64, verbose=0),
                    }

                    cv = KFold(n_splits=5, shuffle=True, random_state=1)

                    for name, model_fn in model_fns.items():
                        is_dl = name in ['MLP', 'CNN', 'LSTM']
                        tasks = [
                            delayed(run_classification_fold)(model_fn, X, X_dl, y, train_idx, test_idx, is_dl)
                            for train_idx, test_idx in cv.split(X)
                        ]
                        results_fold = Parallel(n_jobs=2, backend='loky')(tasks)
                        acc_scores, f1_scores, preds_list, y_true_list = zip(*results_fold)

                        all_results.append({
                            'n_points': n_points,
                            'transformation': transformation,
                            'noise_percent': noise_percent,
                            'data_percentage': perc,
                            'model': name,
                            'mean_acc': np.mean(acc_scores),
                            'std_acc': np.std(acc_scores),
                            'f1_macro': np.mean(f1_scores),
                            'last_fold_preds': preds_list[-1].tolist(),
                            'last_fold_true': y_true_list[-1].tolist()
                        })

            df = pd.DataFrame(all_results)
            df.to_csv(f'classification_results_n{n_points}.csv', index=False)


In [None]:
regression_experiment_run()
classification_experiment_run()

