In [None]:
import numpy as np

from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dropout

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score,cohen_kappa_score,f1_score

from scipy.fft import fft, fftfreq, rfft, rfftfreq, irfft

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


In [1]:
def train_test_set (set_tt, segment_length, col_nb):
# Splitting into time series of 100 samples each
    samples = [] 
    labels = [] 

    col_number=col_nb

    samples.append(set_tt.iloc[:, 3:col_number])# [6,7,8,9]]) #3:col_number
    labels.append(set_tt.iloc[:,1])

    samples = np.array(samples)
    labels = np.array(labels).astype(int)

    samples = np.squeeze(samples)
    labels = np.squeeze(labels)
    
    noOfFeatures = col_number-3
    
    samples = samples.reshape(-1,segment_length,noOfFeatures)
    labels = np.max(labels.reshape(-1, segment_length), axis=1)
    encoder = LabelEncoder()
    int_labels = encoder.fit_transform(labels)
    numeric_labels = tf.keras.utils.to_categorical(int_labels)

    tSamples=samples
    tLabels = numeric_labels

    return  tSamples, tLabels
    

In [None]:
# Velocity calculation
def eye_v_f(eye_subset, series_length):
# eye_subset - recording of 304 eye movement positions along the x-axis, series_length=304
    sig_speed =[]
    for i in range (0,series_length-1):
        next_i=i+1
        speed_one=(eye_subset.iloc[next_i]-eye_subset.iloc[i])*40
        sig_speed.append(speed_one)
        
    return sig_speed

In [None]:
# Acceleration calculation
def eye_a_f(v_subset, series_length):
# v_subset - recording of 303 eye movement velocities along the x-axis, series_length=303
    sig_acc =[]
    for i in range (0,series_length-1):
        next_i=i+1
        acc_one=v_subset.iloc[next_i,0]-v_subset.iloc[i,0]
        sig_acc.append(acc_one)
        
    return sig_acc

In [None]:
# Jerk calculation
def eye_j_f(a_subset, series_length):
#a_subset - - recording of 303 eye movement acceleration along the x-axis, series_length=30
    sig_jerk =[]
    for i in range (0,series_length-1):
        next_i=i+1
        jerk_one=a_subset.iloc[next_i,0]-a_subset.iloc[i,0]
        sig_jerk.append(jerk_one)
        
    return sig_jerk

In [None]:
# Percentage changes calculations
def eye_s_c(eye_subset, series_length):
    sig_percChange =[]
    for i in range (0,series_length-1):
        next_i=i+1
        i1=eye_subset.iloc[next_i].item()
        i0=eye_subset.iloc[i].item()
        if i0==0:
            i0=0.000001
        percChange_one=(i1-i0)/i0
        sig_percChange.append(percChange_one)
    return sig_percChange

In [None]:
def point_feature_set (segment_length, v_subset, a_subset, j_subset, vc_subset, va_subset, vj_subset):
    point_fs=pd.DataFrame()
    point_fv=pd.DataFrame()
    
    for segment in range (0,3): 
        
        start_seg=segment*segment_length
        
        v_segment=pd.DataFrame(v_subset).iloc[start_seg:start_seg+segment_length]
        a_segment=pd.DataFrame(a_subset).iloc[start_seg:start_seg+segment_length]
        j_segment=pd.DataFrame(j_subset).iloc[start_seg:start_seg+segment_length]
        vc_segment=pd.DataFrame(vc_subset).iloc[start_seg:start_seg+segment_length]
        va_segment=pd.DataFrame(va_subset).iloc[start_seg:start_seg+segment_length]
        vj_segment=pd.DataFrame(vj_subset).iloc[start_seg:start_seg+segment_length]
        # Calculation of the Fourier transform
        v_segment_np=v_segment.to_numpy()
        fft_segment = pd.DataFrame(np.abs(fft(v_segment_np[:,0])))
        scaler = MinMaxScaler()
        fft_segment_norm = pd.DataFrame(scaler.fit_transform(fft_segment))
        
        # Feature vector generation – three columns per point, with one 12-feature vector per point
        one_segment_df=pd.concat([ v_segment.reset_index(drop=True),a_segment.reset_index(drop=True), j_segment.reset_index(drop=True), \
                           vc_segment.reset_index(drop=True),va_segment.reset_index(drop=True),vj_segment.reset_index(drop=True), fft_segment_norm.reset_index(drop=True)], axis=1)
        
        point_fs=pd.concat([point_fs.reset_index(drop=True),one_segment_df.reset_index(drop=True)], axis=0) 
       
        # Statistic data calculation
        flat_features=pd.Series([float(np.min(v_segment, axis=0)), float(np.max(v_segment, axis=0)), np.average(v_segment), float(np.std(v_segment, axis=0)), \
                                                        float(np.min(a_segment, axis=0)), float(np.max(a_segment, axis=0)), np.average(a_segment), float(np.std(a_segment, axis=0)),\
                                                        float(np.min(j_segment, axis=0)), float(np.max(j_segment, axis=0)), np.average(j_segment), float(np.std(j_segment, axis=0)),\
                                                        float(fft_segment_norm.min()),  float(fft_segment_norm.max()), float(fft_segment_norm.mean()), float(fft_segment_norm.std())])

        point_fv = pd.concat([point_fv.reset_index(drop=True), flat_features.to_frame().T], axis=0)
        

    return  point_fs, point_fv

In [None]:
def model_LSTM(label_nb, colNb): 
    class_nb=label_nb
    tf.random.set_seed(2)
    #del model
    time_range = 100
    INPUT_SEQUENCE_LEN=time_range
    model = Sequential()
    model.add(LSTM(64,input_shape=(INPUT_SEQUENCE_LEN, colNb-3), return_sequences=True)) 
    model.add(Dropout(0.5))
    model.add(LSTM(64))
    model.add(Dense(64))
    model.add(Dense(class_nb, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
    return model

In [None]:
def model_flat(label_nb, colNb, layers):
    model = Sequential()
    model.add(Input(shape=(colNb-3,)))
    model.add(Dense(100, activation='sigmoid'))
    if (layers>=3):
        model.add(Dense(75,  activation='sigmoid'))
        if (layers==4):
             model.add(Dense(55,  activation='sigmoid'))
    model.add(Dense(50, activation='sigmoid'))
    model.add(Dense(label_nb, activation='softmax')) 
    model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])
    return model    

In [None]:
def train_test_set_flat(set_tt,col_nb):
#for statistical features train_f_values
    samples = [] 
    labels = [] 
    col_number=col_nb
    for x in range(0, set_tt.shape[0]):
    
        samples.append(set_tt.iloc[x,3:col_number])
        labels.append(set_tt.iloc[x,1])

    samples = np.array(samples)
    labels = np.array(labels).astype(int)

    encoder = LabelEncoder()
    int_labels = encoder.fit_transform(labels)
    numeric_labels = tf.keras.utils.to_categorical(int_labels)

    tSamples=samples
    tLabels = numeric_labels

    return  tSamples, tLabels

In [None]:
def calc_metrics(modelResults, testLabels):
    
    oryginal_arr=testLabels.argmax(axis=1)
    u_oryg_labels =np.unique(oryginal_arr)
    avg_prob_class=np.zeros(len(u_oryg_labels))
         
    for i in range(len(u_oryg_labels)):
    
        indices=np.where(oryginal_arr==u_oryg_labels[i])
        rows_nb=len(indices[0])
        class_wyb= modelResults[indices,:]
        class_avg=np.mean(class_wyb[0,0:rows_nb], axis=0)                          
        avg_prob_class[i]=class_avg.argmax()  
           
    CK=cohen_kappa_score(u_oryg_labels,avg_prob_class)
    ACC=accuracy_score(u_oryg_labels,avg_prob_class)
    F1=f1_score(u_oryg_labels,avg_prob_class, average='macro')
    
    return ACC, F1, CK