# Import 

In [1]:
from scipy.io import loadmat
import numpy as np
from scipy.signal import welch, find_peaks, periodogram
from scipy.signal import lfilter
from scipy.linalg import toeplitz, solve_toeplitz
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
import pandas as pd
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn.cluster import KMeans
from sklearn.pipeline import make_pipeline
import pyswarms as ps
from tqdm.notebook import  tqdm
import pickle


# Load Data

In [2]:
data = loadmat("Project_data.mat")


In [3]:
data.keys()


dict_keys(['__header__', '__version__', '__globals__', 'Channels', 'TestData', 'TrainData', 'TrainLabels', 'fs'])

In [4]:

print(data['TrainData'].size )

print(len(data['TrainData']))

print(len(data['TrainData'][1]) )

print(data['TrainData'][1][1].size)


162250000
59
5000
550


In [5]:
data['TrainLabels']

array([[ 1,  1, -1,  1, -1, -1,  1,  1,  1,  1,  1,  1, -1,  1, -1,  1,
         1, -1, -1,  1,  1,  1,  1, -1, -1, -1,  1,  1, -1, -1,  1,  1,
        -1, -1,  1,  1,  1,  1,  1, -1, -1,  1,  1, -1, -1, -1, -1,  1,
        -1,  1, -1, -1, -1,  1,  1, -1, -1, -1,  1,  1, -1,  1,  1,  1,
         1,  1, -1, -1, -1, -1, -1,  1,  1,  1,  1, -1, -1, -1,  1, -1,
        -1,  1,  1, -1,  1, -1, -1, -1,  1, -1,  1, -1, -1,  1,  1, -1,
         1,  1,  1,  1, -1,  1,  1, -1, -1, -1, -1, -1, -1,  1,  1, -1,
        -1,  1, -1,  1,  1,  1, -1, -1,  1, -1,  1,  1, -1,  1, -1,  1,
         1, -1, -1, -1,  1, -1,  1,  1,  1,  1,  1, -1, -1, -1,  1,  1,
         1, -1,  1, -1,  1, -1,  1,  1,  1, -1, -1, -1, -1,  1,  1, -1,
        -1,  1,  1, -1, -1, -1,  1, -1,  1,  1,  1,  1,  1,  1,  1, -1,
        -1, -1, -1,  1, -1, -1,  1,  1,  1, -1,  1, -1,  1, -1, -1,  1,
         1, -1, -1, -1,  1, -1, -1, -1,  1,  1,  1, -1, -1, -1,  1, -1,
        -1,  1, -1, -1,  1, -1, -1, -1, -1, -1, -1,  1, -1, -1, 

# Checking for balance

In [6]:
array = data['TrainLabels']

shape = array.shape
size = array.size
data_type = array.dtype
mean = array.mean()
std_dev = array.std()
min_value = array.min()
max_value = array.max()

print(f"Array Shape: {shape}")
print(f"Total Number of Elements: {size}")
print(f"Data Type of Elements: {data_type}")
print(f"Mean: {mean}")
print(f"Standard Deviation: {std_dev}")

count_neg_1 = np.sum(array == -1)
count_pos_1 = np.sum(array == 1)

print("Count of -1:", count_neg_1)
print("Count of  1:", count_pos_1)



Array Shape: (1, 550)
Total Number of Elements: 550
Data Type of Elements: int16
Mean: 0.007272727272727273
Standard Deviation: 0.9999735533692962
Count of -1: 273
Count of  1: 277


# Functions

In [7]:

def calculate_variance(signal):
    return np.var(signal)

def calculate_dominant_frequency_histogram(signal, fs):
    frequencies, power = periodogram(signal, fs)
    peaks, _ = find_peaks(power)
    dominant_frequencies = frequencies[peaks]
    histogram, bin_edges = np.histogram(dominant_frequencies, bins=10, range=(0, fs/2))
    return histogram

def calculate_standard_histogram(signal, bin_count=10):
    histogram, bin_edges = np.histogram(signal, bins=bin_count)
    return histogram

def calculate_ar_model_coefficients(signal, order=4):
    R = np.correlate(signal, signal, mode='full')[len(signal)-1:]
    R /= R[0]
    ar_coeffs = solve_toeplitz((R[:order], R[:order]), R[1:order+1])
    return ar_coeffs

def calculate_form_factor(signal):
    rms = np.sqrt(np.mean(signal**2))
    mean_abs = np.mean(np.abs(signal))
    return rms / mean_abs

def calculate_covariance_between_channels(signal1, signal2):
    return np.cov(signal1, signal2)[0, 1]

def calculate_correlation_between_channels(signal1, signal2):
    return np.corrcoef(signal1, signal2)[0, 1]

def calculate_frequency_features(signal, fs):
    freqs, power = periodogram(signal, fs)
    max_freq = freqs[np.argmax(power)]
    mean_freq = np.sum(freqs * power) / np.sum(power)
    median_freq = freqs[np.searchsorted(np.cumsum(power), np.sum(power)/2)]
    return max_freq, mean_freq, median_freq

def calculate_relative_band_power(signal, fs):
    bands = {'Delta': (0.5, 4), 'Theta': (4, 8), 'Alpha': (8, 12), 'Beta': (12, 30), 'Gamma': (30, 45)}
    freqs, power = periodogram(signal, fs)
    total_power = np.sum(power)
    relative_powers = {}
    for band in bands:
        band_freqs = bands[band]
        idx_band = np.logical_and(freqs >= band_freqs[0], freqs <= band_freqs[1])
        relative_powers[band] = np.sum(power[idx_band]) / total_power
    return relative_powers



# channel_num = signal.shape[0]
# train_num = signal.shape[2]

# for i in train_num
# variance = calculate_variance(signal)
# dom_freq_hist = calculate_dominant_frequency_histogram(signal, fs)
# std_hist = calculate_standard_histogram(signal)
# ar_coeffs = calculate_ar_model_coefficients(signal)
# form_factor = calculate_form_factor(signal)
# covariance = calculate_covariance_between_channels(signal, data['TrainData'][1][0])
# correlation = calculate_correlation_between_channels(signal, data['TrainData'][1][0])
# max_freq, mean_freq, median_freq = calculate_frequency_features(signal, fs)
# relative_band_power = calculate_relative_band_power(signal, fs)

def calculate_covariance_between_channels_1(chnl_signal, exp_signal):
    cov_list = []
    for i, signal in enumerate(exp_signal):
        cov_list.append(calculate_covariance_between_channels(chnl_signal, signal))
    return cov_list

def calculate_correlation_between_channels_1(chnl_signal, exp_signal):
    cor_list = []
    for i, signal in enumerate(exp_signal):
        cor_list.append(calculate_correlation_between_channels(chnl_signal, signal))
    return cor_list

# Define feature function

In [11]:
def extract_feature(signal, set_name='features'):
    channel_num = signal.shape[0]
    train_num = signal.shape[2]
    all_experiments_features = [] 
    
    for exp_idx in tqdm(range(train_num)):#train_num
        experiment_signal = signal[:,:, exp_idx]
        # print(experiment_signal.shape)
        experiment_features = []  
        
        for i, channel_signal in enumerate(experiment_signal):  
            covariance = calculate_covariance_between_channels_1(channel_signal, experiment_signal)
            correlation = calculate_correlation_between_channels_1(channel_signal, experiment_signal)
            max_freq, mean_freq, median_freq = calculate_frequency_features(channel_signal, fs)    
            relative_band_power = calculate_relative_band_power(channel_signal, fs)
            variance = calculate_variance(channel_signal)
            dom_freq_hist = calculate_dominant_frequency_histogram(channel_signal, fs)
            std_hist = calculate_standard_histogram(channel_signal)
            ar_coeffs = calculate_ar_model_coefficients(channel_signal)
            form_factor = calculate_form_factor(channel_signal)
            # print(form_factor)
        
            channel_features = [
                variance, *dom_freq_hist, *std_hist, *ar_coeffs, form_factor, *covariance, *correlation,
                max_freq, mean_freq, median_freq, *relative_band_power.values()
            ]
            # print(len(channel_features))
            experiment_features.extend(channel_features)
    
        all_experiments_features.append(experiment_features)
        
    with open(f'{set_name}.pkl', 'wb') as f:
        pickle.dump(all_experiments_features, f)
        
    return all_experiments_features


In [9]:
# len(all_experiments_features)/550
# all_experiments_features_ = []
# for i in range(550):
#     list_ = all_experiments_features[8968*i:8968*(i+1)]
#     all_experiments_features_.append(list_)
# len(all_experiments_features_[0])

In [12]:
def normalize_feature(all_experiments_features, scaler=None):
    imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
    all_experiments_features_imputed = imputer.fit_transform(all_experiments_features)

    if not scaler:
        # print("Train data Normalization")
        scaler = StandardScaler()
    all_experiments_features_normalized = scaler.fit_transform(all_experiments_features_imputed)
    
    all_experiments_features_non_negative = all_experiments_features_normalized - np.min(all_experiments_features_normalized)

    return all_experiments_features_non_negative, scaler


In [13]:
def select_feature(all_experiments_features, labels, k=50):
    selector = SelectKBest(chi2, k=k)
    selected_features = selector.fit_transform(all_experiments_features, labels)
    
    selected_indices = selector.get_support(indices=True)
    return selected_features, selected_indices

# Train Data and Test Data

In [14]:
fs = 1000 

train_signal = data['TrainData']
train_labels = np.array(data['TrainLabels']).flatten()
train_labels[train_labels == -1] = 0

# valid_train_signal = data['TrainData']
# valid_train_labels = np.array(data['TrainLabels']).flatten()
# valid_train_labels[valid_train_labels == -1] = 0
# random_nums = np.random.permutation(valid_train_signal.shape[2])
# threshold = int(0.1 * valid_train_signal.shape[2])


# train_signal = valid_train_signal[:,:, random_nums[threshold:]]
# train_labels = valid_train_labels[random_nums[threshold:]]
# valid_signal = valid_train_signal[:,:, random_nums[:threshold]]
# valid_labels = valid_train_labels[random_nums[:threshold]]



test_signal = data['TestData']

# print(test_signal.shape)


##-----------Train data-------------##
try:
    with open('train.pkl', 'rb') as f:
        # valid_train_experiments_features = pickle.load(f)
        train_experiments_features = pickle.load(f)
except:
    # valid_train_experiments_features = extract_feature(valid_train_signal, 'train')
    train_experiments_features = extract_feature(train_signal, 'train')
    
# train_experiments_features = [valid_train_experiments_features[i] for i in random_nums[threshold:]]
# valid_experiments_features = [valid_train_experiments_features[i] for i in random_nums[:threshold]]

train_experiments_features_normalized, train_normalizer = normalize_feature(train_experiments_features)
train_selected_features, train_selected_indices = select_feature(train_experiments_features_normalized, train_labels)
print(f"Train set size:      {train_signal.shape} -> {train_selected_features.shape}")

##-----------Validation data-------------##


# valid_experiments_features_normalized, _ = normalize_feature(valid_experiments_features, train_normalizer)
# valid_selected_features = valid_experiments_features_normalized[:, train_selected_indices]
# print(f"Validation set size: {valid_signal.shape}  -> {valid_selected_features.shape}")

##-----------Test data-------------##

try:
    with open('test.pkl', 'rb') as f:
         test_experiments_features = pickle.load(f)
except:
    test_experiments_features = extract_feature(test_signal, 'test')
test_experiments_features_normalized, _ = normalize_feature(test_experiments_features, train_normalizer)
test_selected_features = test_experiments_features_normalized[:, train_selected_indices]
print(f"Test set size:       {test_signal.shape} -> {test_selected_features.shape}")




  0%|          | 0/550 [00:00<?, ?it/s]

Train set size:      (59, 5000, 550) -> (550, 50)


  0%|          | 0/159 [00:00<?, ?it/s]

Test set size:       (59, 5000, 159) -> (159, 50)


In [15]:
test_experiments_features_normalized

array([[ 7.60080955,  6.42968057,  9.31057258, ...,  8.15404857,
         7.01968903,  8.2802208 ],
       [ 8.13949418,  8.6041973 ,  7.97247892, ..., 10.35589971,
         7.92661266,  7.49003864],
       [ 7.91629466,  9.53613304,  9.20764229, ...,  7.62805576,
         8.85769378,  9.41511561],
       ...,
       [ 7.59240289,  8.91484254,  8.48713033, ...,  7.82971239,
         7.2990854 ,  8.14991256],
       [10.63672799,  8.91484254,  5.81094303, ...,  7.70551414,
        10.2784142 ,  8.22882529],
       [ 7.8016933 ,  7.67226156,  9.61936342, ...,  7.47161765,
         8.4710336 ,  9.54921009]])

# feature selection

There is a [sourse on Kaggle](https:/https://www.kaggle.com/code/prashant111/comprehensive-guide-on-featur/)  that mentions the implementation of the chi-square test as an equivalent to Fisher's score for feature selection. This approach is often used in statistical analysis to determine the independence of two events. In the context of feature selection, it's used to evaluate whether the occurrence of a specific feature and the occurrence of a specific class are independent. This method can be particularly useful in categorical data analysis.






we can also use j fisher method 

In [16]:


# scaler = StandardScaler()
# all_experiments_features_normalized = scaler.fit_transform(all_experiments_features)


# def calculate_fishers_score(features, labels):
#     features_class_1 = features[labels == 1]
#     features_class_0 = features[labels == 0]

#     mean_1 = np.mean(features_class_1, axis=0)
#     mean_0 = np.mean(features_class_0, axis=0)
#     var_1 = np.var(features_class_1, axis=0)
#     var_0 = np.var(features_class_0, axis=0)

#     fishers_score = (mean_1 - mean_0) ** 2 / (var_1 + var_0)

#     return fishers_score

# fishers_scores = calculate_fishers_score(all_experiments_features_normalized, labels)
# num_top_features = 50 
# top_feature_indices = np.argsort(fishers_scores)[-num_top_features:]

# selected_features_J = all_experiments_features_normalized[:, top_feature_indices]


In [17]:
# selected_features_df = pd.DataFrame(selected_features_J)
# selected_features_df.to_csv('selected_features.csv', index=False)
# selcted_features_count=pd.read_csv("selcted_features_count.csv")
# selcted_features_count.to_numpy().shape


# find best MLP Modele

In [18]:
scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(train_selected_features)

hidden_layer_sizes_list = [ (100, 100), (150, 100), (160, 100)]  
activation_functions = ['relu', 'logistic', 'tanh']  

best_accuracy = 0
best_params = {}

for hidden_layer_sizes in hidden_layer_sizes_list:
    for activation in activation_functions:
       
        mlp = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes, activation=activation, max_iter=1000)

       
        kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        accuracies = cross_val_score(mlp, selected_features_normalized, train_labels, cv=kfold, scoring='accuracy')

 
        avg_accuracy = np.mean(accuracies)

        print(f"Hidden Layer Sizes: {hidden_layer_sizes}, Activation: {activation}")
        print(f"Average Accuracy: {avg_accuracy}")

        
        if avg_accuracy > best_accuracy:
            best_accuracy = avg_accuracy
            best_params['hidden_layer_sizes'] = hidden_layer_sizes
            best_params['activation'] = activation

best_mlp = MLPClassifier(hidden_layer_sizes=best_params['hidden_layer_sizes'],
                         activation=best_params['activation'], max_iter=1000)
best_mlp.fit(selected_features_normalized, train_labels)

# best_mlp.predict()

Hidden Layer Sizes: (100, 100), Activation: relu
Average Accuracy: 0.7872727272727273
Hidden Layer Sizes: (100, 100), Activation: logistic
Average Accuracy: 0.7745454545454546
Hidden Layer Sizes: (100, 100), Activation: tanh
Average Accuracy: 0.7890909090909091
Hidden Layer Sizes: (150, 100), Activation: relu
Average Accuracy: 0.8018181818181818
Hidden Layer Sizes: (150, 100), Activation: logistic
Average Accuracy: 0.7836363636363637
Hidden Layer Sizes: (150, 100), Activation: tanh
Average Accuracy: 0.7890909090909092
Hidden Layer Sizes: (160, 100), Activation: relu
Average Accuracy: 0.801818181818182
Hidden Layer Sizes: (160, 100), Activation: logistic
Average Accuracy: 0.7763636363636364
Hidden Layer Sizes: (160, 100), Activation: tanh
Average Accuracy: 0.7745454545454545


# Find feature for test data

In [19]:
scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(test_selected_features)
best_mlp.predict(selected_features_normalized)

array([0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0,
       1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1,
       0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0,
       1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0,
       1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 1, 0,
       1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1,
       1, 0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0,
       1, 0, 0, 1, 0], dtype=int16)

In [20]:
from scipy.io import savemat
predicted_labels = best_mlp.predict(selected_features_normalized)
predicted_labels[predicted_labels == 0] = -1

savemat('mlp_test.mat', {'predicted_labels': predicted_labels})

#  find best RBF Model

In [21]:

scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(train_selected_features)

hidden_layer_sizes_rbf = (5, 4)  
n_clusters = 10 

best_accuracy_rbf = 0
best_params_rbf = {}

for n_clusters in [25, 10, 15,20]:  
    rbf_pipeline = make_pipeline(
        KMeans(n_clusters=n_clusters),
        MLPClassifier(hidden_layer_sizes=hidden_layer_sizes_rbf, activation='identity', max_iter=1000)
    )

    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    accuracies_rbf = cross_val_score(rbf_pipeline, selected_features_normalized, train_labels, cv=kfold, scoring='accuracy')

    avg_accuracy_rbf = np.mean(accuracies_rbf)

    print(f"Number of Clusters: {n_clusters}")
    print(f"Average Accuracy (RBF Network): {avg_accuracy_rbf}")

    if avg_accuracy_rbf > best_accuracy_rbf:
        best_accuracy_rbf = avg_accuracy_rbf
        best_params_rbf['n_clusters'] = n_clusters

best_rbf_pipeline = make_pipeline(
    KMeans(n_clusters=best_params_rbf['n_clusters']),
    MLPClassifier(hidden_layer_sizes=hidden_layer_sizes_rbf, activation='identity', max_iter=1000)
)
best_rbf_pipeline.fit(selected_features_normalized, train_labels)

best_rbf = best_rbf_pipeline



  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 25
Average Accuracy (RBF Network): 0.6927272727272726


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 10
Average Accuracy (RBF Network): 0.7


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 15
Average Accuracy (RBF Network): 0.709090909090909


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 20
Average Accuracy (RBF Network): 0.7018181818181818


  super()._check_params_vs_input(X, default_n_init=10)


# Find feature for test data

In [22]:
scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(test_selected_features)
best_rbf_pipeline.predict(selected_features_normalized)

array([0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
       1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1,
       0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 0,
       1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0,
       0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0,
       1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0,
       1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0,
       1, 1, 1, 0, 0], dtype=int16)

In [23]:
from scipy.io import savemat
predicted_labels = best_rbf_pipeline.predict(selected_features_normalized)
predicted_labels[predicted_labels == 0] = -1

savemat('rbf_test.mat', {'predicted_labels': predicted_labels})

# Phase 2

**PSO function**

In [29]:

def calculate_variance(signal):
    return np.var(signal)

def calculate_dominant_frequency_histogram(signal, fs):
    frequencies, power = periodogram(signal, fs)
    peaks, _ = find_peaks(power)
    dominant_frequencies = frequencies[peaks]
    histogram, bin_edges = np.histogram(dominant_frequencies, bins=10, range=(0, fs/2))
    return histogram

def calculate_standard_histogram(signal, bin_count=10):
    histogram, bin_edges = np.histogram(signal, bins=bin_count)
    return histogram

def calculate_ar_model_coefficients(signal, order=4):
    R = np.correlate(signal, signal, mode='full')[len(signal)-1:]
    R /= R[0]
    ar_coeffs = solve_toeplitz((R[:order], R[:order]), R[1:order+1])
    return ar_coeffs

def calculate_form_factor(signal):
    rms = np.sqrt(np.mean(signal**2))
    mean_abs = np.mean(np.abs(signal))
    return rms / mean_abs

def calculate_covariance_between_channels(signal1, signal2):
    return np.cov(signal1, signal2)[0, 1]

def calculate_correlation_between_channels(signal1, signal2):
    return np.corrcoef(signal1, signal2)[0, 1]

def calculate_frequency_features(signal, fs):
    freqs, power = periodogram(signal, fs)
    max_freq = freqs[np.argmax(power)]
    mean_freq = np.sum(freqs * power) / np.sum(power)
    median_freq = freqs[np.searchsorted(np.cumsum(power), np.sum(power)/2)]
    return max_freq, mean_freq, median_freq

def calculate_relative_band_power(signal, fs):
    bands = {'Delta': (0.5, 4), 'Theta': (4, 8), 'Alpha': (8, 12), 'Beta': (12, 30), 'Gamma': (30, 45)}
    freqs, power = periodogram(signal, fs)
    total_power = np.sum(power)
    relative_powers = {}
    for band in bands:
        band_freqs = bands[band]
        idx_band = np.logical_and(freqs >= band_freqs[0], freqs <= band_freqs[1])
        relative_powers[band] = np.sum(power[idx_band]) / total_power
    return relative_powers

fs = 1000 
signal = data['TrainData']
print(signal.shape)

channel_num = signal.shape[0]
train_num = signal.shape[2]


def calculate_covariance_between_channels_1(chnl_signal, exp_signal):
    cov_list = []
    for i, signal in enumerate(exp_signal):
        cov_list.append(calculate_covariance_between_channels(chnl_signal, signal))
    return cov_list

def calculate_correlation_between_channels_1(chnl_signal, exp_signal):
    cor_list = []
    for i, signal in enumerate(exp_signal):
        cor_list.append(calculate_correlation_between_channels(chnl_signal, signal))
    return cor_list
channel_num = signal.shape[0]
train_num = signal.shape[2]

all_experiments_features = [] 

for exp_idx in tqdm(range(train_num)):#train_num
    experiment_signal = signal[:,:, exp_idx]
    # print(experiment_signal.shape)
    experiment_features = []  
    
    for i, channel_signal in enumerate(experiment_signal):  
        covariance = calculate_covariance_between_channels_1(channel_signal, experiment_signal)
        correlation = calculate_correlation_between_channels_1(channel_signal, experiment_signal)
        max_freq, mean_freq, median_freq = calculate_frequency_features(channel_signal, fs)    
        relative_band_power = calculate_relative_band_power(channel_signal, fs)
        variance = calculate_variance(channel_signal)
        dom_freq_hist = calculate_dominant_frequency_histogram(channel_signal, fs)
        std_hist = calculate_standard_histogram(channel_signal)
        ar_coeffs = calculate_ar_model_coefficients(channel_signal)
        form_factor = calculate_form_factor(channel_signal)
        # print(form_factor)
    
        channel_features = [
            variance, *dom_freq_hist, *std_hist, *ar_coeffs, form_factor, *covariance, *correlation,
            max_freq, mean_freq, median_freq, *relative_band_power.values()
        ]
        # print(len(channel_features))
        experiment_features.extend(channel_features)

    all_experiments_features.append(experiment_features)



(59, 5000, 550)


  0%|          | 0/550 [00:00<?, ?it/s]

In [43]:
def PSO_feature_selection(all_features, n_features):
    n_particles = 20
    x_swarm = np.zeros((n_particles, n_features), dtype=int)

    n_all_features = all_features.shape[2]

    for i in range(n_particles):
        x_swarm[i, :] = np.random.permutation(n_all_features)[:n_features]

    all_j_scores = []
    current_j_score = np.zeros(n_particles)

    for i in range(n_particles):
        group_1 = all_features[0, :, x_swarm[i, :]]
        group_2 = all_features[1, :, x_swarm[i, :]]
        both_groups = np.concatenate((group_1, group_2))
        current_j_score[i] = j_score_cal_function(group_1, group_2, both_groups)

    all_j_scores.append(current_j_score)

    x_best_local = np.copy(x_swarm)
    x_best_global = x_swarm[np.argmax(current_j_score)]
    max_j = np.max(current_j_score)
    local_max_j = np.copy(current_j_score)

    alpha = 1
    t = 1
    b1 = np.random.rand()
    b2 = np.random.rand()
    v = np.zeros((n_particles, n_features, 2), dtype=float)
    J_thresh = 0.08

    while (np.sum(all_j_scores[-1] >= J_thresh) < n_particles) or (t < 100):
        for i in range(n_particles):
            v[i, :, 1] = alpha * v[i, :, 0] + b1 * (x_best_local[i, :] - x_swarm[i, :]) + b2 * (x_best_global - x_swarm[i, :])
            x_swarm[i, :] = np.round(x_swarm[i, :] + v[i, :, 1])

            if len(np.unique(x_swarm[i, :])) != n_features:
                x_swarm[i, :] = np.round(x_swarm[i, :] - 0.5 * np.random.rand())

            n_all_features = all_features.shape[2]
            upper_bound = x_swarm[i, :] > n_all_features
            x_swarm[i, upper_bound] = n_all_features
            lower_bound = x_swarm[i, :] < 1
            x_swarm[i, lower_bound] = 1

            group_1 = all_features[0, :, x_swarm[i, :]]
            group_2 = all_features[1, :, x_swarm[i, :]]
            both_groups = np.concatenate((group_1, group_2))
            prev_j_score = current_j_score[i]
            current_j_score[i] = j_score_cal_function(group_1, group_2, both_groups)

            if current_j_score[i] >= local_max_j[i]:
                x_best_local[i, :] = x_swarm[i, :]
                local_max_j[i] = current_j_score[i]

            if np.max(current_j_score) >= max_j:
                max_j = np.max(current_j_score)
                x_best_global = x_swarm[np.argmax(current_j_score), :]

            t += 1
            alpha = 1 / t
            b1 = np.random.rand()
            b2 = np.random.rand()

        all_j_scores.append(current_j_score)

    return x_best_global

def j_score_cal_function(group_1, group_2, both_groups):

    mean_group_1 = np.mean(group_1)
    mean_group_2 = np.mean(group_2)
    mean_both_groups = np.mean(both_groups)

    var_between = len(group_1) * ((mean_group_1 - mean_both_groups) ** 2 + (mean_group_2 - mean_both_groups) ** 2)
    var_within = np.sum((group_1 - mean_group_1) ** 2) + np.sum((group_2 - mean_group_2) ** 2)

    j_score = var_between / (var_between + var_within)

    return j_score


In [44]:
scaler = StandardScaler()
all_experiments_features_normalized = scaler.fit_transform(all_experiments_features)
all_experiments_features_normalized.shape

labels = np.array(data['TrainLabels'])
labels[labels == -1] = 0
labels.shape


# def fitness_function(position):
#     selected_features_bool = np.clip(position, 0, 1).astype(bool)
    
#     if len(selected_features_bool) == all_experiments_features_normalized.shape[1]:
#         selected_features = all_experiments_features_normalized[:, selected_features_bool]
#         classifier = SVC()
#         scores = cross_val_score(classifier, selected_features, labels, cv=5)
#         return 1 - scores.mean() 
#     else:
#         return 1


# options = {'c1': 1.5, 'c2': 1.5, 'w':0.9}

# optimizer = ps.single.GlobalBestPSO(n_particles=50, dimensions=all_experiments_features_normalized.shape[1], options=options)

# cost, pos = optimizer.optimize(fitness_function, iters=100)

# selected_features_bool = np.clip(pos, 0, 1).astype(bool)

# if len(selected_features_bool) == all_experiments_features_normalized.shape[1]:
#     try:
#         selected_features = all_experiments_features_normalized[:, selected_features_bool]
#         print("Selected features shape:", selected_features)
#     except IndexError as e:
#         print("IndexError:", e)
#         print("Further diagnostic needed.")
# else:
#     print("Mismatch in dimensions! Boolean mask length does not match the number of features.")



# ... (your feature calculation functions)

# Assuming 'all_experiments_features_normalized' is the array containing your features
# and 'labels' is your array of labels

def fitness_function(position):
    selected_features_bool = np.clip(position, 0, 1).astype(bool)
    
    if len(selected_features_bool) == len(all_experiments_features_normalized[0]):
        selected_features = all_experiments_features_normalized[:, selected_features_bool]
        
        # Initialize your classifier (e.g., SVC)
        classifier = SVC()
        
        # Perform cross-validation
        scores = cross_val_score(classifier, selected_features, labels, cv=5)
        
        # Return 1 - mean accuracy as PSO minimizes the objective function
        return 1 - scores.mean() 
    else:
        return 1

# Set dimensions to the number of features before selection
options = {'c1': 1.5, 'c2': 1.5, 'w': 0.9}
optimizer = ps.single.GlobalBestPSO(n_particles=50, dimensions=len(all_experiments_features_normalized[0]), options=options)

# Run PSO optimization
cost, pos = optimizer.optimize(fitness_function, iters=100)

# Extract selected features
selected_features_bool = np.clip(pos, 0, 1).astype(bool)
selected_features_PSO = all_experiments_features_normalized[:, selected_features_bool]
print("Selected features shape:", selected_features_PSO)



2024-02-04 02:15:02,039 - pyswarms.single.global_best - INFO - Optimize for 100 iters with {'c1': 1.5, 'c2': 1.5, 'w': 0.9}
pyswarms.single.global_best: 100%|██████████|100/100, best_cost=1
2024-02-04 02:15:05,848 - pyswarms.single.global_best - INFO - Optimization finished | best cost: 1.0, best pos: [0.76658365 0.99887434 0.15989154 ... 0.77948393 0.19230361 0.52642582]


Selected features shape: [[ 0.03437026  0.1838853  -1.69796564 ... -0.40576522  2.44438493
   1.16489373]
 [-0.33033712 -0.68798465  1.02707378 ...  3.68548989 -2.48698859
  -1.79009606]
 [-0.64094116 -2.43172455 -1.79889303 ... -0.51550472 -0.75709358
   0.7974495 ]
 ...
 [ 2.34645109  0.76513193 -1.59703825 ... -0.46981193 -0.12589
   0.21626472]
 [-0.28826638  0.1838853   0.52243685 ... -0.54895285  1.31773552
   0.40280457]
 [ 2.02364041 -0.97860797 -1.79889303 ...  0.04226653  1.09117406
  -0.07768295]]


In [38]:
print("Shape of selected_features_normalized:", selected_features_normalized.shape)
print("Shape of labels:", labels.shape)  # Add this line to check the shape of labels


Shape of selected_features_normalized: (550, 8968)
Shape of labels: (1, 550)


In [40]:
scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(selected_features_PSO)

hidden_layer_sizes_list = [ (100, 100), (150, 100), (160, 100)]  
activation_functions = ['relu', 'logistic', 'tanh']  
labels = np.ravel(labels)


best_accuracy = 0
best_params = {}

for hidden_layer_sizes in hidden_layer_sizes_list:
    for activation in activation_functions:
       
        mlp = MLPClassifier(hidden_layer_sizes=hidden_layer_sizes, activation=activation, max_iter=1000)

       
        kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        accuracies = cross_val_score(mlp, selected_features_normalized, labels, cv=kfold, scoring='accuracy')

 
        avg_accuracy = np.mean(accuracies)

        print(f"Hidden Layer Sizes: {hidden_layer_sizes}, Activation: {activation}")
        print(f"Average Accuracy: {avg_accuracy}")

        
        if avg_accuracy > best_accuracy:
            best_accuracy = avg_accuracy
            best_params['hidden_layer_sizes'] = hidden_layer_sizes
            best_params['activation'] = activation

best_mlp = MLPClassifier(hidden_layer_sizes=best_params['hidden_layer_sizes'],
                         activation=best_params['activation'], max_iter=1000)
best_mlp.fit(selected_features_normalized, labels)



Hidden Layer Sizes: (100, 100), Activation: relu
Average Accuracy: 0.8509090909090908
Hidden Layer Sizes: (100, 100), Activation: logistic
Average Accuracy: 0.8418181818181818
Hidden Layer Sizes: (100, 100), Activation: tanh
Average Accuracy: 0.8618181818181817
Hidden Layer Sizes: (150, 100), Activation: relu
Average Accuracy: 0.8527272727272728
Hidden Layer Sizes: (150, 100), Activation: logistic
Average Accuracy: 0.8509090909090908
Hidden Layer Sizes: (150, 100), Activation: tanh
Average Accuracy: 0.8727272727272728
Hidden Layer Sizes: (160, 100), Activation: relu
Average Accuracy: 0.8545454545454547
Hidden Layer Sizes: (160, 100), Activation: logistic
Average Accuracy: 0.841818181818182
Hidden Layer Sizes: (160, 100), Activation: tanh
Average Accuracy: 0.850909090909091


In [41]:

scaler = StandardScaler()
selected_features_normalized = scaler.fit_transform(selected_features_PSO)

hidden_layer_sizes_rbf = (5, 4)  
n_clusters = 10 

best_accuracy_rbf = 0
best_params_rbf = {}

for n_clusters in [25, 10, 15,20]:  
    rbf_pipeline = make_pipeline(
        KMeans(n_clusters=n_clusters),
        MLPClassifier(hidden_layer_sizes=hidden_layer_sizes_rbf, activation='identity', max_iter=1000)
    )

    kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    accuracies_rbf = cross_val_score(rbf_pipeline, selected_features_normalized, train_labels, cv=kfold, scoring='accuracy')

    avg_accuracy_rbf = np.mean(accuracies_rbf)

    print(f"Number of Clusters: {n_clusters}")
    print(f"Average Accuracy (RBF Network): {avg_accuracy_rbf}")

    if avg_accuracy_rbf > best_accuracy_rbf:
        best_accuracy_rbf = avg_accuracy_rbf
        best_params_rbf['n_clusters'] = n_clusters

best_rbf_pipeline = make_pipeline(
    KMeans(n_clusters=best_params_rbf['n_clusters']),
    MLPClassifier(hidden_layer_sizes=hidden_layer_sizes_rbf, activation='identity', max_iter=1000)
)
best_rbf_pipeline.fit(selected_features_normalized, train_labels)

best_rbf = best_rbf_pipeline



  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 25
Average Accuracy (RBF Network): 0.49818181818181817


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 10
Average Accuracy (RBF Network): 0.5218181818181817


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 15
Average Accuracy (RBF Network): 0.48


  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)
  super()._check_params_vs_input(X, default_n_init=10)


Number of Clusters: 20
Average Accuracy (RBF Network): 0.5145454545454545


  super()._check_params_vs_input(X, default_n_init=10)
