In [15]:
import mne
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import glob
from typing import Tuple
from pathlib import Path

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import StratifiedKFold
from lightgbm import LGBMClassifier

import sklearn.metrics as m

from mne import make_fixed_length_epochs
from mne_icalabel import label_components

matplotlib.use('Qt5Agg')
mne.set_config('MNE_BROWSE_RAW_SIZE','20,20')
%matplotlib qt
mne.viz.set_3d_backend("notebook")
plt.rcParams["figure.figsize"] = [20,20]

from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [106]:
raw2 = mne.io.read_raw_eeglab('PRE_ICA_new_analyses/faulty/30CK_01-100.set', preload=True)

Reading D:\Pain_Detection_Dataset\PRE_ICA_new_analyses\faulty\30CK_01-100.fdt
Reading 0 ... 1331859  =      0.000 ...  2663.718 secs...


In [17]:
def extract_data_categories(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Extract the data of different categories from the raw file and return the segregated raw data files.
    :param raw:
    :return: eyes_open, eyes_close, warm_feel, hot_feel, noise_data
    """
    for each in range(0, 101):

        if raw._annotations[each]['description'] == '5':
            eyes_open = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                        tmax=raw._annotations[each]['onset'] + 150)
        if raw._annotations[each]['description'] == '4':
            eyes_close = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                         tmax=raw._annotations[each]['onset'] + 150)
        if raw._annotations[each]['description'] == '41':
            warm_feel = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                        tmax=raw._annotations[each]['onset'] + 300)
        if raw._annotations[each]['description'] == '11':
            hot_feel = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                       tmax=raw._annotations[each]['onset'] + 300)
        if raw._annotations[each]['description'] == '71':
            noise_data = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                         tmax=raw._annotations[each]['onset'] + 300)

    return eyes_open, eyes_close, warm_feel, hot_feel, noise_data

def filter_data(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Use the band pass filter to filter out the noise and unwanted frequency data from the actual dataset.
    :param raw: raw object of the data which needs to be filtered.
    :return: raw object with filtered data.
    """
    filtered_data = mne.io.Raw.filter(raw, l_freq=0.5, h_freq=30)

    return filtered_data


In [25]:
# %%capture
p=0

for each in glob.glob('D://Pain_Detection_Dataset//PRE_ICA_new_analyses/new_faulty///*.set'):
    
    raw2 = mne.io.read_raw_eeglab(each, preload=True)

    raw = filter_data(raw2)

    # making the copy of the data
    # applying ica on the copy of the raw object
    raw_copy = raw.copy()
    ica = mne.preprocessing.ICA(n_components=0.99, random_state=42)
    ica.fit(raw_copy)

    ic_labels = label_components(raw_copy, ica, method="iclabel")

    # ICA0 was correctly identified as an eye blink, whereas ICA12 was
    # also classified as a muscle artifact


    exclude_index = []
    for i, each in enumerate(ic_labels['labels']):
        if (each=='eye blink') or (each.endswith('noise')):
            exclude_index.append(i)

    ica.apply(raw_copy, exclude = exclude_index)

    eyes_open, eyes_close, warm_feel, hot_feel, noise_data = extract_data_categories(raw=raw_copy)

    warm_feel_epochs = make_fixed_length_epochs(warm_feel, duration=1, overlap=0.5)

    wf_psd_data, wf_freqs = mne.time_frequency.psd_welch(warm_feel_epochs, fmin=0.5, fmax=50, average='median')

    warm_feel_data = pd.DataFrame(wf_psd_data.reshape(wf_psd_data.shape[0],-1))

    hot_feel_epochs = make_fixed_length_epochs(hot_feel, duration=1, overlap=0.5)

    hf_psd_data, hf_freqs = mne.time_frequency.psd_welch(hot_feel_epochs, fmin=0.5, fmax=50, average='median')

    hot_feel_data = pd.DataFrame(hf_psd_data.reshape(hf_psd_data.shape[0],-1))

    warm_feel_data['target'] = np.zeros(shape=warm_feel_data.shape[0], dtype='int')

    hot_feel_data['target'] = np.full(hot_feel_data.shape[0],1, dtype='int')

    merged_data = pd.concat([warm_feel_data,hot_feel_data])

    X = merged_data.drop(columns=['target']).values
    y = merged_data['target'].values

    X_train, X_test, y_train, y_test = train_test_split(merged_data.drop(columns=['target']), merged_data['target'], 
                                                        test_size=0.2, random_state=42, stratify=merged_data['target'])
    
    lg_clf = LGBMClassifier(device="gpu", colsample_bytree=0.6798358579930577,
                        learning_rate=0.00919045768056308, n_estimators=600,
                        num_leaves=10, objective='binary')
    lda_clf = LinearDiscriminantAnalysis(shrinkage=0.21797193476378346, solver='lsqr')
    knn_clf = KNeighborsClassifier(metric='manhattan', n_neighbors=8)

    res_dict = warm_vs_hot(warm_feel_data, hot_feel_data, participant_number=p)
    p=p+1
    file_name = each.split('.')[0][57:]
    pd.DataFrame(res_dict).to_csv(f'wf_hf/{p}.csv')

Reading D:\Pain_Detection_Dataset\PRE_ICA_new_analyses\new_faulty\29BE_01-100.fdt
Reading 0 ... 1221739  =      0.000 ...  2443.478 secs...


  raw2 = mne.io.read_raw_eeglab(each, preload=True)


Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 3301 samples (6.602 sec)



  raw2 = mne.io.read_raw_eeglab(each, preload=True)
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   4 out of   4 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  63 out of  63 | elapsed:    3.8s finished


Fitting ICA to data using 63 channels (please be patient, this may take a while)
Selecting by explained variance: 3 components
Fitting ICA took 14.2s.


  ic_labels = label_components(raw_copy, ica, method="iclabel")
  ic_labels = label_components(raw_copy, ica, method="iclabel")
  ic_labels = label_components(raw_copy, ica, method="iclabel")


Applying ICA to Raw instance
    Transforming to ICA space (3 components)
    Zeroing out 2 ICA components
    Projecting back using 63 PCA components
Not setting metadata
599 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 599 events and 500 original time points ...
0 bad epochs dropped
Effective window size : 0.512 (s)


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


Not setting metadata
599 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 599 events and 500 original time points ...


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   28.9s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   28.9s finished


0 bad epochs dropped
Effective window size : 0.512 (s)


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   29.4s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   29.4s finished


Reading D:\Pain_Detection_Dataset\PRE_ICA_new_analyses\new_faulty\30CK_01-100.fdt
Reading 0 ... 1331859  =      0.000 ...  2663.718 secs...
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 3301 samples (6.602 sec)



[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.2s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   4 out of   4 | elapsed:    0.4s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  63 out of  63 | elapsed:    6.8s finished


Fitting ICA to data using 63 channels (please be patient, this may take a while)
Selecting by explained variance: 3 components
Fitting ICA took 11.9s.


  ic_labels = label_components(raw_copy, ica, method="iclabel")
  ic_labels = label_components(raw_copy, ica, method="iclabel")
  ic_labels = label_components(raw_copy, ica, method="iclabel")


Applying ICA to Raw instance
    Transforming to ICA space (3 components)
    Zeroing out 2 ICA components
    Projecting back using 63 PCA components
Not setting metadata
599 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 599 events and 500 original time points ...
0 bad epochs dropped
Effective window size : 0.512 (s)


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


Not setting metadata
599 matching events found
No baseline correction applied
0 projection items activated
Using data from preloaded Raw for 599 events and 500 original time points ...


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   47.7s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   47.7s finished


0 bad epochs dropped
Effective window size : 0.512 (s)


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   50.5s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:   50.5s finished


In [109]:
lg_clf = LGBMClassifier(device="gpu", colsample_bytree=0.6798358579930577,
                        learning_rate=0.00919045768056308, n_estimators=600,
                        num_leaves=10, objective='binary')
lda_clf = LinearDiscriminantAnalysis(shrinkage=0.21797193476378346, solver='lsqr')
knn_clf = KNeighborsClassifier(metric='manhattan', n_neighbors=8)

res_dict = warm_vs_hot(warm_feel_data, hot_feel_data, participant_number=1)

pd.DataFrame(res_dict).to_csv('wf_hf/30CK_01-100.csv')

In [19]:
import numpy as np
import glob
from pathlib import Path
from typing import Tuple, Any

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import StratifiedKFold


def check_file_exists(file_path: str) -> bool:
    """Check if the file exists in a particular path.
    :param:: file_path: file path in string format.
    :return: boolean
    """
    file_path = Path(file_path)
    check_file_exist = file_path.is_file()

    return check_file_exist


def get_file_list_from_path(file_path: Path) -> Tuple[list, list]:
    """
    Get the list of files in the directory with particular format
    :param:: Path object of the file directory path.
    :return list of set files and list of fdt files.
    """
    list_of_fdt = glob.glob(f'{file_path}/*.fdt')
    list_of_set = glob.glob(f'{file_path}/*.set')

    return list_of_fdt, list_of_set


def compute_accuracy(y_actual: np.ndarray, y_predicted: np.ndarray) -> float:
    """
    Compute the accuracy for the given true and predicted values.
    :param: y_actual: The actual value of y variable or target.
    :param: y_predicted: The predicted value of y variable or target.
    :return: accuracy percentage score of the values.
    """
    accuracy = accuracy_score(y_actual, y_predicted)

    return accuracy


def compute_classification_report(y_actual: np.ndarray, y_predicted: np.ndarray):
    """
    Compute the classification report for the given true and predicted values.
    :param: y_actual: The actual value of y variable or target.
    :param: y_predicted: The predicted value of y variable or target.
    :return: the classification report with precision and recall.
    """

    cf_report = classification_report(y_actual, y_predicted)

    return cf_report


def compute_sensitivity_specificity(y_actual: np.ndarray, y_predicted: np.ndarray) -> Tuple[int, int]:
    """
    Compute the sensitivity and specificity using actual and predicted target values
    :param: y_actual: The actual value of y variable or target.
    :param: y_predicted: The predicted value of y variable or target.
    :return: sensitivity and specificity
    """
    pass


def apply_stratified_cv(model_object, x: np.ndarray, y: np.ndarray):
    """
    Divide the data into pre-defined splits and used the passed model_object to train and compute accuracy and
    classification_report.
    :param: model_object: model object of particular model applied on the dataset.
    :param: X: value or the features
    :param: y: value or the targets
    :return: list of accuracy value in decimals for all the splits.
    """
    accuracy_value = []
    f1_score_list = []
    sensitivity_list = []
    specificity_list = []
    sfk = StratifiedKFold(n_splits=10)
    for train_index, test_index in sfk.split(x, y):
        x_train, x_test = x[train_index], x[test_index]
        y_train, y_test = y[train_index], y[test_index]
        model_object.fit(x_train, y_train)
        predictions = model_object.predict(x_test)
        accuracy_value.append(compute_accuracy(y_test, predictions))
        f1_score, sensitivity, specificity = accuracy_metrics(y_test, predictions)
        f1_score_list.append(f1_score)
        sensitivity_list.append(sensitivity)
        specificity_list.append(specificity)

    return accuracy_value, f1_score_list, sensitivity_list, specificity_list


def accuracy_metrics(y_test, predictions):
    conf_matrix = confusion_matrix(y_test, predictions)

    TP = conf_matrix[1][1]
    TN = conf_matrix[0][0]
    FP = conf_matrix[0][1]
    FN = conf_matrix[1][0]

    # calculate accuracy
    conf_accuracy = (float(TP + TN) / float(TP + TN + FP + FN))

    # calculate mis-classification
    conf_misclassification = 1 - conf_accuracy

    # calculate the sensitivity
    conf_sensitivity = (TP / float(TP + FN))
    # calculate the specificity
    conf_specificity = (TN / float(TN + FP))

    # calculate precision
    conf_precision = (TN / float(TN + FP))
    # calculate f_1 score
    conf_f1 = 2 * ((conf_precision * conf_sensitivity) / (conf_precision + conf_sensitivity))

    return conf_f1, conf_sensitivity, conf_specificity


In [21]:
import mne
import numpy as np
import pandas as pd
from mne_icalabel import label_components
from mne import make_fixed_length_epochs
from typing import Tuple


def read_file(path: str) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Read the EEG data from the given path of fdt and set.
    :param: path: path of the data file in string format.
    :return: raw object of type mne.io.eeglab.eeglab.RawEEGLAB
    """
    raw = mne.io.read_raw_eeglab(path, preload=True)

    return raw


def extract_data_categories(raw: mne.io.eeglab.eeglab.RawEEGLAB):
    """
    Extract the data of different categories from the raw file and return the segregated raw data files.
    :param raw:
    :return: eyes_open, eyes_close, warm_feel, hot_feel, noise_data
    """
    global eyes_open, eyes_close, warm_feel, hot_feel, noise_data
    for each in range(0, 101):

        if raw._annotations[each]['description'] == '5':
            eyes_open = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                        tmax=raw._annotations[each]['onset'] + 150)
        if raw._annotations[each]['description'] == '4':
            eyes_close = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                         tmax=raw._annotations[each]['onset'] + 150)
        if raw._annotations[each]['description'] == '41':
            warm_feel = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                        tmax=raw._annotations[each]['onset'] + 300)
        if raw._annotations[each]['description'] == '11':
            hot_feel = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                       tmax=raw._annotations[each]['onset'] + 300)
        if raw._annotations[each]['description'] == '71':
            noise_data = raw.copy().crop(tmin=raw._annotations[each]['onset'],
                                         tmax=raw._annotations[each]['onset'] + 300)

    return eyes_open, eyes_close, warm_feel, hot_feel, noise_data


def filter_data(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Use the band pass filter to filter out the noise and unwanted frequency data from the actual dataset.
    :param: raw: raw object of the data which needs to be filtered.
    :return: raw object with filtered data.
    """
    filtered_data = mne.io.Raw.filter(raw, l_freq=0.5, h_freq=30)

    return filtered_data


def standardize_raw_data(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Use the standardization method to standardise the raw signals with z-score method.
    :param: raw: raw object of the data to be standardized.
    :return: raw object with standardized values.
    """
    raw = raw.apply_function(lambda x: (x - np.mean(x) / np.std(x)))

    return raw


def get_ica_labels(raw: mne.io.eeglab.eeglab.RawEEGLAB, ica: mne.preprocessing.ica.ICA) -> dict:
    """
    USe the raw data to get the labels from the ica obj
    :param: raw: get the labels of ICA fit raw object
    :param: ica: ica object of the fitted raw object.
    :return: dictionary containing the labels for ica components
    """
    ic_labels = label_components(raw, ica, method="iclabel")

    return ic_labels


def fit_ica(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.preprocessing.ica.ICA:
    """
    Fit the ica on the raw object to obtain the ica components.
    :param: raw: raw object containing the data.
    :return: ica object after it is fitted on the raw data.
    """
    raw = filter_data(raw=raw)
    ica = mne.preprocessing.ICA(n_components=0.99, random_state=42)
    ica.fit(raw)

    return ica


def get_label_index_to_exclude(ica_labels: dict) -> list:
    """
    Find the ICA components indexes which are to be excluded during artifact repair.
    :param: ica_labels:
    :return: exclude_index list
    """
    exclude_index = []
    for i, each in enumerate(ica_labels['labels']):
        if (each == 'eye blink') or (each.endswith('noise')):
            exclude_index.append(i)

    return exclude_index


def repair_ica_artifact(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> mne.io.eeglab.eeglab.RawEEGLAB:
    """
    Repair the raw object artifact using the ica
    :param: raw: raw object to be repaired
    :return: repaired raw object with removed noises.
    """
    ica = fit_ica(raw=raw)
    ic_labels = get_ica_labels(raw=raw, ica=ica)
    exclude_index = get_label_index_to_exclude(ica_labels=ic_labels)
    raw = ica.apply(raw, exclude=exclude_index)

    return raw


def convert_epoch_to_frequency_vs_time(epoch_object: mne.epochs.Epochs) -> np.ndarray:
    """
    Convert the epochs into times vs frequency using psd_welch method.
    :param epoch_object:
    :return: numpy array of 3 dimensions with (n_epochs, n_channels, n_freq)
    """

    epoch_array, _frequencies = mne.time_frequency.psd_welch(epoch_object, fmin=0.5, fmax=30, average='median')

    return epoch_array


def create_dataframe_from_epoch(epoch_data: np.ndarray, label_value: int):
    """
    Convert the numpy data into dataframe and add target label to it.
    :param: epoch_data: numpy array obtained from the epoch conversion of raw data.
    :param: label_value: label value to be used for the particular type of data
    :return: pandas dataframe containing the feature and target value.
    """

    data_set = pd.DataFrame(epoch_data.reshape(epoch_data.shape[0], -1))
    data_set['target'] = np.full(data_set.shape[0], label_value, dtype='int')

    return data_set


def create_epoch_data(raw: mne.io.eeglab.eeglab.RawEEGLAB, duration: int) -> np.ndarray:
    """
    Get the epoch of the raw object using the fixed length epoch method.
    :param: raw: raw object containing the data.
    :param: duration: duration of the epoch to be used.
    :return: 3 dimensional array containing the data of the raw object.
    """
    epoch_data = make_fixed_length_epochs(raw, duration=duration, preload=True, overlap=float(duration / 2))
    epoch_frequency_data = convert_epoch_to_frequency_vs_time(epoch_data)

    return epoch_frequency_data


def combine_dataframes(dataframe1, dataframe2):
    """
    Merge the two dataframes into one
    :param: dataframe1: dataframe of one type with label.
    :param: dataframe2: dataframe of another type with label.
    """
    merged_dataframe = pd.concat([dataframe1, dataframe2])

    return merged_dataframe


def calculate_duration(raw: mne.io.eeglab.eeglab.RawEEGLAB) -> float:
    """
    Calculate the duration of the raw object reading.
    :param: raw object
    """
    scan_duration = raw._data.shape[1] / raw.info['sfreq']

    return scan_duration


In [23]:
metrics_list_wf_hf = []

lg_clf = LGBMClassifier(device="gpu", colsample_bytree=0.6798358579930577,
                        learning_rate=0.00919045768056308, n_estimators=600,
                        num_leaves=10, objective='binary')
lda_clf = LinearDiscriminantAnalysis(shrinkage=0.21797193476378346, solver='lsqr')
knn_clf = KNeighborsClassifier(metric='manhattan', n_neighbors=8)

PATH = 'D:\\Pain_Detection_Dataset\\PRE_ICA_new_analyses\\faulty\\'


def warm_vs_hot(warm_dataframe, hot_dataframe, participant_number: int):
    """
    Compare warm vs hot condition by fitting machine learning algorithm and computing accuracy and other measures for
    each subject.
    :return: results of each subject in dictionary
    """
    result_dict = {'participant_id': participant_number, 'lightgbm_accuracy': [], 'lda_accuracy': [],
                   'knn_accuracy': [], 'lightgbm_f1_score': [], 'lda_f1_score': [], 'knn_f1_score': [],
                   'lightgbm_sensitivity': [], 'lda_sensitivity': [], 'knn_sensitivity': [], 'lightgbm_specificity': [],
                   'lda_specificity': [], 'knn_specificity': []}
    data = combine_dataframes(warm_dataframe, hot_dataframe)
    x = data.drop(columns=['target']).values
    y = data['target'].values
    for each in [lg_clf, lda_clf, knn_clf]:
        accuracy_list, f1_score_list, sensitivity_list, specificity_list = apply_stratified_cv(each, x=x, y=y)
        if type(each).__name__ == 'LGBMClassifier':
            result_dict['lightgbm_accuracy'] = accuracy_list
            result_dict['lightgbm_f1_score'] = f1_score_list
            result_dict['lightgbm_sensitivity'] = sensitivity_list
            result_dict['lightgbm_specificity'] = specificity_list
        if type(each).__name__ == 'LinearDiscriminantAnalysis':
            result_dict['lda_accuracy'] = accuracy_list
            result_dict['lda_f1_score'] = f1_score_list
            result_dict['lda_sensitivity'] = sensitivity_list
            result_dict['lda_specificity'] = specificity_list
        if type(each).__name__ == 'KNeighborsClassifier':
            result_dict['knn_accuracy'] = accuracy_list
            result_dict['knn_f1_score'] = f1_score_list
            result_dict['knn_sensitivity'] = sensitivity_list
            result_dict['knn_specificity'] = specificity_list
    return result_dict



def main(file_path: Path):
    """
    Run all the preprocessing and epoch conversion. Convert the epochs to dataframe and then use it for Machine Learning
    :param: file_path: Path of the files where the data is stored.
    """
    list_of_fdt_file_path, list_of_set_file_path = get_file_list_from_path(file_path=file_path)
    participant = 0
    if list_of_fdt_file_path and list_of_set_file_path:
        try:
            if len(list_of_fdt_file_path) == len(list_of_set_file_path):
                for fdt_file, set_file in zip(list_of_fdt_file_path, list_of_set_file_path):
                    if (check_file_exists(fdt_file)) & (check_file_exists(set_file)):
                        raw = read_file(set_file)
                        if calculate_duration(raw) < 2200:
                            continue
                        else:
                            repaired_raw = repair_ica_artifact(raw=raw)
                            eyes_open_raw, eyes_close_raw, warm_feel_raw, hot_feel_raw, sound_feel_raw = \
                                extract_data_categories(repaired_raw)
                            hot_feel_epoch = create_epoch_data(raw=hot_feel_raw, duration=1)
                            warm_feel_epoch = create_epoch_data(raw=warm_feel_raw, duration=1)
                            metrics_list_wf_hf.append(
                                warm_vs_hot(create_dataframe_from_epoch(warm_feel_epoch, label_value=0),
                                            create_dataframe_from_epoch(hot_feel_epoch, label_value=1),
                                            participant_number=participant))
                            print(f'Participant #: {participant + 1}')
                            participant += 1
                            accuracy_wf_hf = pd.DataFrame(metrics_list_wf_hf)
                            accuracy_wf_hf.to_csv(Path(f'{PATH}\\hf_wf\\{participant}_accuracy_wf_hf.csv'))
            else:
                raise SizeNotMatched({
                    f"The number of FDT and SET files is not same fdt is {len(list_of_fdt_file_path)} \
                     != set is {len(list_of_set_file_path)}"})
        except FileNotFoundError as fe:
            fe.strerror = f"Either fdt_file or set_file doesn't exist"
            raise fe

In [28]:
main('D:\\Pain_Detection_Dataset\\PRE_ICA_new_analyses\\faulty')

Reading D:\Pain_Detection_Dataset\PRE_ICA_new_analyses\faulty\23HH_2_01-100.fdt
Reading 0 ... 1203879  =      0.000 ...  2407.758 secs...
Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 30 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 30.00 Hz
- Upper transition bandwidth: 7.50 Hz (-6 dB cutoff frequency: 33.75 Hz)
- Filter length: 3301 samples (6.602 sec)



[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   2 out of   2 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   3 out of   3 | elapsed:    0.2s remaining:    0.0s
[Parallel(n_jobs=1)]: Done   4 out of   4 | elapsed:    0.3s remaining:    0.0s


Fitting ICA to data using 63 channels (please be patient, this may take a while)


[Parallel(n_jobs=1)]: Done  63 out of  63 | elapsed:    5.9s finished


Selecting by explained variance: 12 components
Fitting ICA took 28.0s.


  ic_labels = label_components(raw, ica, method="iclabel")
  ic_labels = label_components(raw, ica, method="iclabel")
  ic_labels = label_components(raw, ica, method="iclabel")


Applying ICA to Raw instance
    Transforming to ICA space (12 components)
    Zeroing out 4 ICA components
    Projecting back using 63 PCA components


IndexError: index 98 is out of bounds for axis 0 with size 98

In [None]:
gridParams = {
    'learning_rate': np.random.uniform(0.005, 0.01, 5),
    'n_estimators': range(300, 900, 300),
    'num_leaves': range(9,12), # large num_leaves helps improve accuracy but might lead to over-fitting
    'boosting_type' : ['gbdt', 'dart'], # for better accuracy -> try dart
    'objective' : ['binary'],
    'colsample_bytree' : np.random.uniform(0.65, 0.7, 3)
    }

gs_lgbm = GridSearchCV(estimator=lg_clf, param_grid=gridParams, cv=sfk)
gs_lgbm.fit(X,y)

gs_lgbm.best_estimator_

In [None]:
param_grid = {'n_neighbors': range(1,16),
              'weights': ['uniform', 'distance'],
              'metric': ['euclidean', 'manhattan']              
             }

gs_knn = GridSearchCV(estimator=knn, param_grid=param_grid, cv=sfk)
gs_knn.fit(X,y)

gs_knn.best_estimator_

In [None]:
lda_params = {'shrinkage': np.random.uniform(0.1,1,10),'solver':['svd', 'lsqr', 'eigen']}

gs_lda = GridSearchCV(estimator=lda, param_grid=lda_params, cv=sfk, scoring='accuracy')
gs_lda.fit(X,y)

gs_lda.best_estimator_

gs_lda.best_estimator_.get_params()

In [9]:
knn = KNeighborsClassifier()
knn.fit(X_train, y_train)
knn_pred = knn.predict(X_test)

print(m.classification_report(y_test, knn_pred))

              precision    recall  f1-score   support

           0       0.80      0.63      0.71       120
           1       0.70      0.84      0.76       120

    accuracy                           0.74       240
   macro avg       0.75      0.74      0.73       240
weighted avg       0.75      0.74      0.73       240



In [10]:
lda = LinearDiscriminantAnalysis()
lda.fit(X_train, y_train)
lda_pred = lda.predict(X_test)

print(m.classification_report(y_test, lda_pred))

              precision    recall  f1-score   support

           0       0.91      0.87      0.89       120
           1       0.87      0.92      0.89       120

    accuracy                           0.89       240
   macro avg       0.89      0.89      0.89       240
weighted avg       0.89      0.89      0.89       240



In [11]:
lg_clf = lg.LGBMClassifier(device="gpu")
lg_clf.fit(X_train, y_train)
pred = lg_clf.predict(X_test)
print(m.classification_report(y_test, pred))

              precision    recall  f1-score   support

           0       0.94      0.97      0.95       120
           1       0.97      0.94      0.95       120

    accuracy                           0.95       240
   macro avg       0.95      0.95      0.95       240
weighted avg       0.95      0.95      0.95       240



In [None]:
sfk = StratifiedKFold(n_splits=5)
lg_clf = lg.LGBMClassifier(device="gpu")
lda = LinearDiscriminantAnalysis()
knn = KNeighborsClassifier()

accuracy_dictonary_hf_wf = {'lightgbm':[], 'lda':[], 'knn':[]}
for train_index, test_index in sfk.split(X,y):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]
    lg_clf.fit(X_train, y_train)
    lg_pred = lg_clf.predict(X_test)
    accuracy_dictonary_hf_wf['lightgbm'].append(m.accuracy_score(y_test, lg_pred))
    lda.fit(X_train, y_train)
    lda_pred = lda.predict(X_test)
    accuracy_dictonary_hf_wf['lda'].append(m.accuracy_score(y_test, lda_pred))
    knn.fit(X_train, y_train)
    knn_pred = knn.predict(X_test)
    accuracy_dictonary_hf_wf['knn'].append(m.accuracy_score(y_test, knn_pred))