## References

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pprint import pprint
import constants
import os
from dotenv import load_dotenv
import matplotlib.cm as cm
import scipy.io as sp
import json
import pprint as pp
from scipy.signal import welch
from scipy.integrate import trapezoid

In [None]:
import sys
np.set_printoptions(threshold=sys.maxsize, linewidth=300, suppress=True)
pd.set_option('display.max_colwidth', 500)

## Data Visualization and Preprocessing

### Read data

In [None]:
load_dotenv()
dataset_path = os.getenv('DATASET_PATH')

def get_dataset_files_and_user_ids(data_category = constants.GENUINE, data_type = constants.TRAIN):
    user_ids = []
    files_csv = []
    files_mat = []

    # Get training and testing data
    data_split = pd.read_csv(os.path.join(dataset_path, "Identification_split.csv"))
    training_data_files = data_split[data_split.set == constants.TRAIN].filename.str.rsplit('.', n=1).str[0]
    # print(training_data_files) # only for debugging

    for root, dirs, files in os.walk(dataset_path):
        if os.path.basename(root) == constants.GENUINE == data_category:
            for file in files:
                if file.endswith('.csv'):
                    files_csv.append(os.path.join(root, file))
                elif file.endswith('.mat'):
                    files_mat.append(os.path.join(root, file))
        elif os.path.basename(root) == constants.FORGED == data_category:
            for file in files:
                if file.endswith('.csv'):
                    files_csv.append(os.path.join(root, file))
                elif file.endswith('.mat'):
                    files_mat.append(os.path.join(root, file))
        if os.path.basename(root) != constants.GENUINE and os.path.basename(root) != constants.FORGED and os.path.basename(root) != 'SignEEGv1.0':
            user_ids.append(os.path.basename(root))
    files_csv = sorted(files_csv, key=lambda x: int(x.split('_')[3].split(".")[0]))
    files_mat = sorted(files_mat, key=lambda x: int(x.split('_')[3]))
    return files_csv, files_mat, user_ids

# print("Genuine MAT files:")
# pprint(get_genuine_csv_mat_files())
# print("Forged MAT files:")
# pprint(get_forged_csv_mat_files())

### Get List of UserIDs from Dataset

In [None]:
# user_ids = []
# def get_list_of_user_ids():
#     for root, dir, files in os.walk(dataset_path):
#         if os.path.basename(root) != 'Genuine' and os.path.basename(root) != 'Forged' and os.path.basename(root) != 'SignEEGv1.0':
#             user_ids.append(os.path.basename(root))
#     # print(len(user_ids))
#     return user_ids

# print("User IDs:")
# pprint(get_list_of_user_ids())
    

### Clean up Signature CSV data and reset column names

In [None]:
def get_user_csv_sign_data_cleaned(user_sign_data_csv): #Provide file name of the csv file
    content = pd.read_csv(user_sign_data_csv, skiprows=1, header=None)
    content.drop
    content.columns = [c.strip() for c in content.iloc[0]] #gettting rid of extra space in column names
    content = content.iloc[1:]
    return content


### Plot signature

#### Uncomment in case images of signatures need to be generated again

In [None]:

# csv_data, _ = get_genuine_csv_mat_files()
# print(csv_data)
# user_id = '000000001045402'
# user_match = [data for data in csv_data if user_id in data]
# print("Users matched with user_id '{}':".format(user_match))
# count=0
# for file in user_match:
#     count+=1
#     content = get_user_csv_sign_data_cleaned(file)
#     x, y, _, press, _, _ = normalize_sign_data(content)
#     # cmap = cm.Blues
#     # colors = cmap(press)
#     # # colors[:, 3] = press
#     # # plt.scatter(sign_coords['X'], sign_coords['Y'], c=colors, s=50)
#     # plt.scatter(x, y, c=colors, s=50)
#     # plt.title('Sign Coordinates')
#     # plt.xlabel('X')
#     # plt.ylabel('Y')
#     # plt.show()

#     min_linewidth = 0
#     max_linewidth = 5.0
#     linewidths = min_linewidth + press * (max_linewidth - min_linewidth)

#     plt.figure(figsize=(8, 4))
#     for i in range(len(x) - 1):
#         plt.plot(
#             x[i:i+2], y[i:i+2],
#             linewidth=linewidths[i],
#             color='black',
#             solid_capstyle='round'
#         )
#     # plt.axis('equal')  # Keep aspect ratio square
#     plt.axis('off')    # Hide axes for cleaner look
#     # plt.show() # uncomment only for debug processes, uncommenting will make plt.savefigure() save blank images

#     plt.savefig("SignImages\\"+user_id+"-"+str(count)+".jpeg", dpi=300, bbox_inches='tight', pad_inches=0)
#     plt.close()

### Signature Data Preprocessing

In [None]:
def normalize_sign_data(data):
    x = np.array(data['X']).astype(int)
    y = np.array(data['Y']).astype(int)
    t = np.array(data['T']).astype(int)
    pressure = np.array(data['Pressure']).astype(int)
    azimuth = np.array(data['Azimuth']).astype(int)
    altitude = np.array(data['Altitude']).astype(int)
    # normalize signature data
    norm_x = x / np.max(x)
    norm_y = y / np.max(y)
    norm_pressure = pressure / np.max(pressure)
    norm_azimuth = azimuth / np.max(azimuth)
    norm_altitude = altitude / np.max(altitude)
    return norm_x, norm_y, t, norm_pressure, norm_azimuth, norm_altitude

### EEG Data Preprocessing

In [None]:
_, mat_data, user_ids = get_dataset_files_and_user_ids()
# print(mat_data)
def get_user_mat_data(user_id=None):
    if user_id is None:
        user_id = user_ids[0]  # Default to the first user if none specified
    user_files = [x for x in mat_data if user_id in x]
    user_files_sorted = pd.Series(user_files)
    user_files_sorted.sort_values(key=lambda x: x.str.split('_').str[3].astype(int), inplace=True)
    user_files_reset = user_files_sorted.reset_index(drop=True)
    # print(user_files_reset)
    return user_files_reset

# Fetch matlab data
mat_files_sorted = get_user_mat_data()
mat_content = sp.loadmat(mat_files_sorted[0])

In [None]:
# normalizing using z-score

def normalize_eeg_data(eeg_input):
    norm_eeg_data = []  
    for channel in eeg_input:
        mean = np.mean(channel)
        std = np.std(channel)
        norm_channel = (channel - mean)/std
        norm_eeg_data.append(norm_channel)
    norm_eeg_array = np.array(norm_eeg_data)
    # print(norm_eeg_data)
    return norm_eeg_array

# normalize_eeg_data(eeg_data_list)

### EEG Data Visualization

In [None]:
def plot_eeg_data(eeg_data):
    plt.figure(figsize=(15, 6))
    offset = 500
    colors = ['b', 'g', 'r', 'c', 'y']
    for idx, col in enumerate(eeg_columns):
        plt.plot(eeg_data[col] + idx * offset, color=colors[idx % len(colors)], label=col)
    plt.title('EEG Signal Data (with vertical offset)')
    plt.xlabel('Time')
    plt.ylabel('Amplitude + Offset')
    plt.legend(loc='upper right')
    plt.show()

## Feature Extraction

### Signature data features

In [None]:
# sign_data = get_user_csv_sign_data_cleaned('D:\\KCL Final Year Individual Project\\Implementation\\Project Implementation\\Dataset\\SignEEGv1.0\\SignEEGv1.0\\000000000200894\\Genuine\\000000000200894_Genuine_000000000200894_1.csv')
# sign_data

In [None]:
csv_data, mat_data, user_ids = get_dataset_files_and_user_ids()
def get_signature_feature_vector(path):
    user_id = [id for id in user_ids if id in path][0]
    sign_data = get_user_csv_sign_data_cleaned(path)
    x, y, t, pressure, azimuth, altitude = normalize_sign_data(sign_data)

    # Calculate pen velocity
    dt = 1 / (4 / 1000)
    vx = np.gradient(x, dt)
    vy = np.gradient(y, dt)
    v = np.sqrt(vx**2 + vy**2)
    
    # Calculate pen acceleration
    ax = np.gradient(vx, dt)
    ay = np.gradient(vy, dt)
    a = np.sqrt(ax**2 + ay**2)

    # Calculate number of pen lifts
    # Do a logical & betwen the values of the array(except for the last) are > 0 and the values for which (except the first element) > 0
    pen_lifts = np.sum((pressure[:-1] > 0) & (pressure[1:] == 0))
    # print(pen_lifts)

    # Calculate stroke duration
    is_pen_down = pressure > 0 
    stroke_durations = []
    start = None
    stroke_count = 0

    for i in range(len(pressure)):
        if is_pen_down[i]:
            if start is None:
                start = i
        else:
            if start is not None:
                duration = t[i-1] - t[start]
                stroke_durations.append(int(duration))
                start = None

    # Handle case where the last stroke goes to the end
    if start is not None:
        duration = t[-1] - t[start]
        stroke_durations.append(int(duration))

    # Calculate average stroke duration
    avg_stroke_duration = np.average(stroke_durations)
    
    # Calculate number of strokes
    stroke_count = len(stroke_durations)

    # Sign centroid
    pen_down = pressure > 0
    x_down = x[pen_down]
    y_down = y[pen_down]
    centroid_x = np.mean(x_down)
    centroid_y = np.mean(y_down)
    sign_centroid = [centroid_x, centroid_y]
    # print(sign_centroid)
    
    sign_feature_data = np.concatenate([x, y, pressure, azimuth, altitude, v, a, stroke_durations, sign_centroid, [pen_lifts, avg_stroke_duration, stroke_count]])

    return sign_feature_data

In [None]:
sign_feature_vector = get_signature_feature_vector(csv_data[0])

### Misc - for debugging

In [None]:
# eeg_data_roi = eeg_input[roi_idx[0]:roi_idx[1]]

In [None]:
# len(eeg_data_roi)

In [None]:
# print(to_print)

In [None]:
# print(mat_files_sorted)

In [None]:
# plot_eeg_data(eeg_data_roi)

In [None]:

# pp.pprint(eeg_data)
# Side note: Can be used for sign data as well, reduces dependency on CSV data
# plot_eeg_data(eeg_input)

### Extract EEG Frequency Weighted Power Features

In [None]:
# def calculate_power_spectral_density(norm_eeg_signal, sampling_freq):
#     freqs, psd = welch(norm_eeg_signal, fs = sampling_freq, nperseg = sampling_freq * 2) # window = 'hann' by default
#     # print("Frequencies: ", freqs)
#     # print("Power distribution: ", psd)

#     band_psd = {}
#     for band, [low, high] in freq_bands.items():
#         idx_band = np.logical_and(freqs >= low, freqs <= high)
#         band_psd[band] = np.mean(psd[:, idx_band], axis = 1) if psd[:, idx_band].size > 0 else np.zeros(norm_eeg_signal.shape[0])
#     print("Band Powers: ", band_psd)
#     return band_psd
# psd_data = calculate_power_spectral_density(norm_eeg_data, 128)


def compute_freq_weighted_power_per_channel(channel, samp_freq, band):
    freqs, psd = welch(channel, fs=samp_freq, nperseg=len(channel))
    idx = (freqs >= band[0]) & (freqs <= band[1])
    freqs = freqs[idx]
    psd = psd[idx]
    return np.sum(freqs * psd) / np.sum(psd) if np.sum(psd) > 0 else 0

def get_freq_weighted_feature(signal, samp_freq, window = 2, overlap = 1, normalize = False):

    # over different frequency bands, calculate power
    # standard bands used for EEG - gamma (20-50 Hz), beta (13-20 Hz), alpha (8-13 Hz), theta (4-8 Hz), delta (0.5-4 Hz)
    # also takign windows of 2seconds witgh 1 second overlap
    # windows made using Hann window
    freq_bands = {
        'delta': [0.5, 4],
        'theta': [4, 8],
        'alpha': [8, 13],
        'beta': [113, 20],
        'gamma': [20, 50]
    }
    
    n_channels, n_samples = signal.shape
    window_len = int(samp_freq * window)
    step = int(samp_freq * overlap)
    n_windows = (n_samples - window_len) // step + 1
    features = []
    for w in range(n_windows):
        start = w * step
        end = start + window_len
        window_features = []
        for channel in range(n_channels):
            segment = signal[channel, start:end]
            freqs, psd = welch(segment, fs = samp_freq, nperseg = window_len)
            for band_range in freq_bands.values():
                idx = (freqs >= band_range[0]) & (freqs <= band_range[1])
                bp = trapezoid(psd[idx], freqs[idx])
                window_features.append(bp)
            fwp = compute_freq_weighted_power_per_channel(segment, samp_freq=samp_freq, band = (0.5, 50))
            window_features.append(fwp)
        features.append(window_features)
    features = np.array(features)

    if normalize:
        mean = np.mean(features, axis = 0)
        std = np.std(features, axis = 0)
        std[std == 0] = 1e-6
        features = (features - mean) / std
    return features

# print(get_freq_weighted_feature(norm_eeg_data, 128, normalize = True))

In [None]:

def get_eeg_features(mat_file):

    # mat_files_sorted = get_user_mat_data(user_id)
    mat_content = sp.loadmat(mat_file)
    
    # For debugging issues
    # to_print = mat_content['subject']
    # print(to_print)

    # converting ICA_EEG data into np structured array
    # eeg_columns = [i for i in mat_content['subject']['EEGHeader'][0][0][0].split(", ")]
    eeg_data_list = [i.tolist() for i in mat_content['subject']['ICA_EEG'][0][0]]

    # print(eeg_data_list)
    # eeg_input = pd.DataFrame(eeg_data_list).T
    # eeg_input.columns = eeg_columns

    norm_eeg_data = normalize_eeg_data(eeg_data_list)
    eeg_features_vector = get_freq_weighted_feature(norm_eeg_data, 128, normalize = True)
    return eeg_features_vector


In [None]:
eeg_feature_vector = get_eeg_features()

In [None]:
print(sign_feature_vector)

In [None]:
print(eeg_feature_vector)

In [None]:
def get_feature_vectors_for_all_users():
    csv_files, mat_files, user_ids = get_dataset_files_and_user_ids()
    sign_features_for_all_users = {}
    eeg_features_for_all_users = {}
    for user in user_ids:
        sign_features_for_all_users[user] = []
        eeg_features_for_all_users[user] = []
        user_csv_raw = [file for file in csv_files if user in file]
        user_mat_raw = [file for file in mat_files if user in file]
        # for debugging only
        print("------------------------------------------------------------------------------------------------------------------------------------------------------------------------------")
        print("User ID: ", user)
        # print("CSV Data: ")
        # pp.pprint(user_csv_raw)
        # print("\n")
        # print("MAT Data:")
        # pp.pprint(user_mat_raw)

        for csv_file in user_csv_raw:
            sign_feature_vector = get_signature_feature_vector(csv_file)
            print("Extracting sign features for file: ", csv_file)
            # print("Sign feature vector: ")
            # pp.pprint(sign_feature_vector)
            sign_features_for_all_users[user].append(sign_feature_vector)

        for mat_file in user_mat_raw:
            eeg_feature_vector = get_eeg_features(mat_file)
            print("Extracting EEG features for file: ", mat_file)
            eeg_features_for_all_users[user].append(eeg_feature_vector)
    return sign_features_for_all_users, eeg_features_for_all_users


In [None]:
sign_features_final, eeg_features_final = get_feature_vectors_for_all_users()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
User ID:  000000000200894
Extracting sign features for file:  D:\KCL Final Year Individual Project\Implementation\Project Implementation\Dataset\SignEEGv1.0\SignEEGv1.0\000000000200894\Genuine\000000000200894_Genuine_000000000200894_1.csv
Extracting sign features for file:  D:\KCL Final Year Individual Project\Implementation\Project Implementation\Dataset\SignEEGv1.0\SignEEGv1.0\000000000200894\Genuine\000000000200894_Genuine_000000000200894_2.csv
Extracting sign features for file:  D:\KCL Final Year Individual Project\Implementation\Project Implementation\Dataset\SignEEGv1.0\SignEEGv1.0\000000000200894\Genuine\000000000200894_Genuine_000000000200894_3.csv
Extracting sign features for file:  D:\KCL Final Year Individual Project\Implementation\Project Implementation\Dataset\SignEEGv1.0\SignEEGv1.0\000000000200894\G

KeyError: 0

In [75]:
print(eeg_features_final["000000000200894"][0].shape)

(55, 30)


In [77]:
import torch
import torch.nn as nn
import torch.nn.functional as F