In [1]:
#import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, welch
from scipy.stats import skew, kurtosis, entropy
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

In [2]:
# For loading .arff files
from scipy.io import arff

h_train_data, h_train_meta = arff.loadarff('../data/EOGHorizontalSignal_TRAIN.arff')
hEOG_train = pd.DataFrame(h_train_data)

h_test_data, h_test_meta = arff.loadarff('../data/EOGHorizontalSignal_TEST.arff')
hEOG_test = pd.DataFrame(h_test_data)

v_train_data, v_train_meta = arff.loadarff('../data/EOGVerticalSignal_TRAIN.arff')
vEOG_train = pd.DataFrame(v_train_data)

v_test_data, v_test_meta = arff.loadarff('../data/EOGVerticalSignal_TEST.arff')
vEOG_test = pd.DataFrame(v_test_data)

In [3]:
#convert target column into integer
hEOG_train['target'] = hEOG_train['target'].apply(lambda x: int(x.decode('utf-8')))
vEOG_train['target'] = vEOG_train['target'].apply(lambda x: int(x.decode('utf-8')))
hEOG_test['target'] = hEOG_test['target'].apply(lambda x: int(x.decode('utf-8')))
vEOG_test['target'] = vEOG_test['target'].apply(lambda x: int(x.decode('utf-8')))

In [4]:
# Assuming both have the same target
y_train = hEOG_train['target'].copy()
y_test = hEOG_test['target'].copy()

# Optionally, concatenate the horizontal and vertical data along columns (axis=1)
X_train_combined = pd.concat([hEOG_train.drop(columns=['target']), vEOG_train.drop(columns=['target'])], axis=1)
X_test_combined = pd.concat([hEOG_test.drop(columns=['target']), vEOG_test.drop(columns=['target'])], axis=1)

In [5]:
#signal de-noising

from scipy.signal import butter, filtfilt

def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = filtfilt(b, a, data)
    return y

# Parameters for filtering
fs = 1000  # Sampling rate (1 kHz)
lowcut = 0.05  # Low frequency cut (for removing drift)
highcut = 30.0  # High frequency cut (for noise reduction)

# Apply filtering on the train EOG signal
hEOG_train_filtered = hEOG_train.drop(columns=['target']).apply(lambda x: butter_bandpass_filter(x, lowcut, highcut, fs))
vEOG_train_filtered = vEOG_train.drop(columns=['target']).apply(lambda x: butter_bandpass_filter(x, lowcut, highcut, fs))
# Apply filtering on the test EOG signal
hEOG_test_filtered = hEOG_test.drop(columns=['target']).apply(lambda x: butter_bandpass_filter(x, lowcut, highcut, fs))
vEOG_test_filtered = vEOG_test.drop(columns=['target']).apply(lambda x: butter_bandpass_filter(x, lowcut, highcut, fs))


In [10]:
#extract time domain & statistical features

def extract_time_domain_features(signal):
    features = {
        'signal_variance': np.var(signal),
        'signal_iqr': np.percentile(signal, 75) - np.percentile(signal, 25),
        'mean': np.mean(signal),
        'std': np.std(signal),
        'max': np.max(signal),
        'min': np.min(signal),
        'skew': pd.Series(signal).skew(),
        'kurtosis': pd.Series(signal).kurtosis(),
        'energy': np.sum(np.square(signal)),
        'median': np.median(signal),
        'entropy':entropy(np.abs(signal))
    }
    
    # Calculate the slope of the signal
    if len(signal) > 1:
        signal_slope = np.diff(signal)  # First derivative (slope)
        features['mean_slope'] = np.mean(signal_slope)
        features['std_slope'] = np.std(signal_slope)
    else:
        features['mean_slope'] = 0
        features['std_slope'] = 0
        
        
    return pd.Series(features)

# Apply feature extraction to each row of the dataset
hEOG_train_time_features = hEOG_train_filtered.apply(extract_time_domain_features, axis=1)
vEOG_train_time_features = vEOG_train_filtered.apply(extract_time_domain_features, axis=1)
hEOG_test_time_features = hEOG_test_filtered.apply(extract_time_domain_features, axis=1)
vEOG_test_time_features = vEOG_test_filtered.apply(extract_time_domain_features, axis=1)

In [11]:
# Rename columns in horizontal features
hEOG_train_time_features.columns = ['h_' + col for col in hEOG_train_time_features.columns]
hEOG_test_time_features.columns = ['h_' + col for col in hEOG_test_time_features.columns]

# Rename columns in vertical features
vEOG_train_time_features.columns = ['v_' + col for col in vEOG_train_time_features.columns]
vEOG_test_time_features.columns = ['v_' + col for col in vEOG_test_time_features.columns]

# Combine horizontal and vertical features
combined_train_time_features = pd.concat([hEOG_train_time_features, vEOG_train_time_features], axis=1)
combined_test_time_features = pd.concat([hEOG_test_time_features, vEOG_test_time_features], axis=1)

# Display the new column names
print("Training Time Features Columns:")
print(combined_train_time_features.columns)

print("\nTest Time Features Columns:")
print(combined_test_time_features.columns)


Training Time Features Columns:
Index(['h_signal_variance', 'h_signal_iqr', 'h_mean', 'h_std', 'h_max',
       'h_min', 'h_skew', 'h_kurtosis', 'h_energy', 'h_median', 'h_entropy',
       'h_mean_slope', 'h_std_slope', 'v_signal_variance', 'v_signal_iqr',
       'v_mean', 'v_std', 'v_max', 'v_min', 'v_skew', 'v_kurtosis', 'v_energy',
       'v_median', 'v_entropy', 'v_mean_slope', 'v_std_slope'],
      dtype='object')

Test Time Features Columns:
Index(['h_signal_variance', 'h_signal_iqr', 'h_mean', 'h_std', 'h_max',
       'h_min', 'h_skew', 'h_kurtosis', 'h_energy', 'h_median', 'h_entropy',
       'h_mean_slope', 'h_std_slope', 'v_signal_variance', 'v_signal_iqr',
       'v_mean', 'v_std', 'v_max', 'v_min', 'v_skew', 'v_kurtosis', 'v_energy',
       'v_median', 'v_entropy', 'v_mean_slope', 'v_std_slope'],
      dtype='object')


In [8]:
#extract frequency domain features
from scipy.fft import fft

def extract_frequency_domain_features(signal):
    # Ensure that signal is a 1D numpy array
    signal = np.array(signal).flatten()
    
    # Compute FFT
    fft_vals = fft(signal)
    
    # Compute magnitude and power
    fft_magnitude = np.abs(fft_vals)
    power = np.square(fft_magnitude)
    
    # Extract frequency domain features
    features = {
        'fft_max_freq': np.argmax(fft_magnitude),
        'fft_mean_power': np.mean(power),
        'fft_peak_power': np.max(power),
    }
    return pd.Series(features)

# Apply FFT feature extraction to each row of the dataset
hEOG_train_freq_features = hEOG_train_filtered.apply(lambda row: extract_frequency_domain_features(row.values), axis=1)
vEOG_train_freq_features = vEOG_train_filtered.apply(lambda row: extract_frequency_domain_features(row.values), axis=1)

hEOG_test_freq_features = hEOG_test_filtered.apply(lambda row: extract_frequency_domain_features(row.values), axis=1)
vEOG_test_freq_features = vEOG_test_filtered.apply(lambda row: extract_frequency_domain_features(row.values), axis=1)

In [9]:
# Rename columns in horizontal frequency features
hEOG_train_freq_features.columns = ['h_' + col for col in hEOG_train_freq_features.columns]
hEOG_test_freq_features.columns = ['h_' + col for col in hEOG_test_freq_features.columns]

# Rename columns in vertical frequency features
vEOG_train_freq_features.columns = ['v_' + col for col in vEOG_train_freq_features.columns]
vEOG_test_freq_features.columns = ['v_' + col for col in vEOG_test_freq_features.columns]

# Combine horizontal and vertical frequency domain features
combined_train_freq_features = pd.concat([hEOG_train_freq_features, vEOG_train_freq_features], axis=1)
combined_test_freq_features = pd.concat([hEOG_test_freq_features, vEOG_test_freq_features], axis=1)

# Display the new column names
print("Training Frequency Features Columns:")
print(combined_train_freq_features.columns)

print("\nTest Frequency Features Columns:")
print(combined_test_freq_features.columns)


Training Frequency Features Columns:
Index(['h_fft_max_freq', 'h_fft_mean_power', 'h_fft_peak_power',
       'v_fft_max_freq', 'v_fft_mean_power', 'v_fft_peak_power'],
      dtype='object')

Test Frequency Features Columns:
Index(['h_fft_max_freq', 'h_fft_mean_power', 'h_fft_peak_power',
       'v_fft_max_freq', 'v_fft_mean_power', 'v_fft_peak_power'],
      dtype='object')


In [13]:
# Combine the time domain and frequency domain features for the training set
combined_train_features = pd.concat([combined_train_time_features, combined_train_freq_features], axis=1)

# Similarly, combine the test set features
combined_test_features = pd.concat([combined_test_time_features, combined_test_freq_features], axis=1)

# Display the shae of the final combined training features
print("Final Combined Training Features Shape:", combined_train_features.shape)

# Display the shape of the final combined test features
print("Final Combined Test Features Shape:", combined_test_features.shape)


Final Combined Training Features Shape: (362, 32)
Final Combined Test Features Shape: (362, 32)


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Initialize the Random Forest Classifier
rf_classifier = RandomForestClassifier(n_estimators=10, random_state=42)  # n_estimators is the number of trees

# Train the model
rf_classifier.fit(combined_train_features, y_train)

# Make predictions on the test set
y_pred = rf_classifier.predict(combined_test_features)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")

# Classification report for detailed metrics
print("Classification Report:\n", classification_report(y_test, y_pred))

# Confusion Matrix to see detailed performance
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))


In [37]:
# Assuming 'final_combined_train_features' and 'final_combined_test_features' are your feature DataFrames
X_train = combined_train_features.values
X_test = combined_test_features.values

# Standardize the data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Reshape data to 3D for GRU input: (samples, timesteps, features)
# Here, timesteps = 1, and features = number of features
X_train_reshaped = X_train_scaled.reshape(X_train_scaled.shape[0], 1, X_train_scaled.shape[1])
X_test_reshaped = X_test_scaled.reshape(X_test_scaled.shape[0], 1, X_test_scaled.shape[1])

# Assuming you have 'y_train' and 'y_test' as target labels, convert them to numpy arrays
y_train = np.array(y_train)
y_test = np.array(y_test)


In [39]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense, Dropout


In [59]:
# Reshape the data to fit Conv1D and GRU layers
X_train_combined_reshaped = X_train_combined.values.reshape(X_train_combined.values.shape[0], 1, X_train_combined.values.shape[1])
X_test_combined_reshaped = X_test_combined.values.reshape(X_test_combined.values.shape[0], 1, X_test_combined.values.shape[1])


In [61]:
X_train_combined_reshaped.shape

(362, 1, 2500)

In [62]:
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Bidirectional, GRU, Dense, Dropout

# Define the CNN-GRU model
model = Sequential()

# Add a 1D Convolutional Layer with 'same' padding to preserve input size
model.add(Conv1D(filters=64, kernel_size=2, activation='relu', padding='same', input_shape=(1, 2500)))

# Another Convolutional Layer with 'same' padding
model.add(Conv1D(filters=32, kernel_size=2, activation='relu', padding='same'))

# Optional: Add MaxPooling, but keep pool size as 1 to avoid further downsampling
model.add(MaxPooling1D(pool_size=1))

# Add a Bidirectional GRU layer
model.add(Bidirectional(GRU(units=64, return_sequences=True)))
model.add(Dropout(0.2))

# Add another GRU layer
model.add(GRU(units=32))
model.add(Dropout(0.2))

# Flatten the output before Dense layers
model.add(Flatten())

# Add a Dense layer for additional learning
model.add(Dense(units=16, activation='relu'))

# Output layer for multi-class classification
model.add(Dense(units=12, activation='softmax'))  # Assuming 12 classes

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Summary of the model
model.summary()


Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_5 (Conv1D)           (None, 1, 64)             320064    
                                                                 
 conv1d_6 (Conv1D)           (None, 1, 32)             4128      
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 1, 32)            0         
 1D)                                                             
                                                                 
 bidirectional_1 (Bidirectio  (None, 1, 128)           37632     
 nal)                                                            
                                                                 
 dropout_4 (Dropout)         (None, 1, 128)            0         
                                                                 
 gru_5 (GRU)                 (None, 32)               

In [42]:
# Adjust labels to be zero-indexed
y_train_zero_indexed = y_train - 1  # Subtract 1 from each label in training set
y_test_zero_indexed = y_test - 1      # Subtract 1 from each label in test set

In [None]:
# Train the model
history = model.fit(X_train_combined_reshaped, y_train_zero_indexed,
                    epochs=10,                # You can adjust the number of epochs
                    batch_size=32,             # Batch size
                    validation_data=(X_test_combined_reshaped, y_test_zero_indexed),  # Validation set
                    verbose=1)
