# Import Libraries and Dataset (add `INDEX`)

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import re
import os
import datetime

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.preprocessing import OneHotEncoder

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers, models

Mounted at /content/drive/


In [None]:
def processing_folder(folder_path, train_size=0.8):
    os.chdir(folder_path)
    files = sorted(os.listdir(folder_path), key=lambda x: int(re.findall(r'\d+', x)[0]))
    train_samples = []
    test_samples = []
    samples = []
    cols = []
    for i, path in enumerate(files):
        with open(path, encoding='utf-8',           ##https://stackoverflow.com/questions/12468179/unicodedecodeerror-utf8-codec-cant-decode-byte-0x9c
                 errors='ignore') as f:
            lines = f.readlines()
        ## extract column names (once)
        if i == 1:
            ls = lines[0].split('\t')
            if re.findall(r'\w+|\d+', ls[-1]):
                ls[-1] = re.findall(r'\w+|\d+', ls[-1])[0]
                cols = ls
        ## extracting all samples from the current file
        sample = []
        curr_text_id = ''
        curr_index = -1
        for line in lines[1:]:
            ls = line.split('\t')
            # if len(ls) != 9:
            #     print(path, line)
            if re.findall(r'\w+|\d+', ls[-1]):
                ls[-1] = re.findall(r'\w+|\d+', ls[-1])[0]
                if ls[1] != curr_text_id:
                    curr_index = 0
                    curr_text_id = ls[1]
                else:
                    curr_index += 1
                ls.append(curr_index)
                sample.append(ls)
        ##  split the current data into train-test-sets
        split_index = int(train_size * len(sample))
        train_samples = train_samples + sample[:split_index]
        test_samples = test_samples + sample[split_index:]
        samples = samples + sample
    ## forming dataframes
    df_all = pd.DataFrame(samples)
    df_train = pd.DataFrame(train_samples)
    df_test = pd.DataFrame(test_samples)
    ## renaming columns
    cols = cols + ['INDEX']
    df_all.columns, df_train.columns, df_test.columns = cols, cols, cols
    return df_all, df_train, df_test

In [None]:
## --> 7001
folder_path = "/content/drive/MyDrive/COMP576/keystroke-samples"
## baby sample
folder_path = "/content/drive/MyDrive/COMP576/train-dev-test"
data, train_data, test_data = processing_folder(folder_path)

# Keyboard Layout Encoding (QWERTY)

In [None]:
first_row = [27, 27, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 0, 0, 145, 126, 0, 0, 0, 0, 0]
space = [0] * 23
second_row = [192, 49, 50, 51, 52, 53, 54, 55, 56, 57, 48, 189, 187, 8, 0, 45, 36, 33, 0, 144, 111, 106, 109]
third_row = [9, 81, 87, 69, 82, 84, 89, 85, 73, 79, 80, 219, 221, 220, 0, 46, 35, 34, 0, 103, 104, 105, 107]
fourth_row = [20, 65, 83, 68, 70, 71, 72, 74, 75, 76, 186, 222, 13, 13, 0, 0, 0, 0, 0, 100, 101, 102, 107]
fifth_row = [16, 16, 90, 88, 67, 86, 66, 78, 77, 188, 190, 191, 16, 16, 0, 0, 38, 0, 0, 97, 98, 99, 13]
sixth_row = [17, 17, 191, 18, 32, 32, 32, 32, 32, 18, 92, 93, 17, 17, 0, 37, 40, 39, 0, 96, 96, 110, 13]
qwerty_keyboard = pd.DataFrame({'1st': first_row,
                                'space': space,
                                '2nd': second_row,
                                '3rd': third_row,
                                '4th': fourth_row,
                                '5th': fifth_row,
                                '6th': sixth_row}).transpose()
qwerty_keyboard.index = list(range(7))
qwerty_keyboard

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,13,14,15,16,17,18,19,20,21,22
0,27,27,112,113,114,115,116,117,118,119,...,123,0,0,145,126,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,192,49,50,51,52,53,54,55,56,57,...,8,0,45,36,33,0,144,111,106,109
3,9,81,87,69,82,84,89,85,73,79,...,220,0,46,35,34,0,103,104,105,107
4,20,65,83,68,70,71,72,74,75,76,...,13,0,0,0,0,0,100,101,102,107
5,16,16,90,88,67,86,66,78,77,188,...,16,0,0,38,0,0,97,98,99,13
6,17,17,191,18,32,32,32,32,32,18,...,17,0,37,40,39,0,96,96,110,13


In [None]:
## function turning the pandas dataframe keyboard into dictionary
def keycode_position(keyboard):
    keyboard_dict = {}
    for row in keyboard.index:
        for col, entry in enumerate(keyboard.iloc[row, :]):
            if entry in keyboard_dict:
                keyboard_dict[entry].append([row, col])
            else:
                keyboard_dict[entry] = [[row, col]]
    return keyboard_dict

## function determining the keycode distances (if multiple entry exists, assumes the shortest one)
def keycode_distance(keyboard_pos, keycode1, keycode2):
    def manhattan_dist(arr1, arr2):
        return abs(arr1[0] - arr2[0]) + abs(arr1[1] - arr2[1])
    distance = 30 ## any integer larger than 22+6
    if keycode1 in keyboard_pos and keycode2 in keyboard_pos:
        for arr1 in keyboard_pos[keycode1]:
            for arr2 in keyboard_pos[keycode2]:
                curr_dist = manhattan_dist(arr1, arr2)
                if curr_dist < distance:
                    distance = curr_dist
        if distance < 5:
            return distance
    return 5

## function determining the distance of a list of keys to the home keys (ASDF JKL;)
def home_distance(keyboard_pos, keycode_list):
    '''
    In QWERTY keyboard, F and H are the home keys with keycodes 70 and 74 resp.
    '''
    sum = 0
    for key in keycode_list:
        sum += min([keycode_distance(keyboard_pos, 70, key), keycode_distance(keyboard_pos, 74, key)])
    return sum/len(keycode_list)


## function return a dictionary contianing `keycode`, `home`, and `pos`
def keyboard_dict():
    qwerty_keyboard_pos = keycode_position(qwerty_keyboard)
    return {'keycode': keycode_distance, 'home': home_distance, 'pos': qwerty_keyboard_pos}

In [None]:
# qwerty keyboard dictionary
qwerty_keyboard_pos = keycode_position(qwerty_keyboard)

# test
keycode_distance(qwerty_keyboard_pos, 85, 117), home_distance(qwerty_keyboard_pos, [72])

(3, 1.0)

# Preprocessing

## Preprocessing: functions

In [None]:
def feature_extractor(data, keyboard=None, user_int=True, keycode_int=True):
    df = data[['PARTICIPANT_ID', 'PRESS_TIME', 'RELEASE_TIME', 'KEYCODE', 'INDEX']]
    df = df.astype('float64')
    if user_int:
        df['PARTICIPANT_ID'] = df['PARTICIPANT_ID'].astype('int64')
    df = df.rename(columns={'PARTICIPANT_ID': 'USER'})

    df['K1'] = df['KEYCODE']
    if keycode_int:
        df['K1'] = df['K1'].astype('int64')
    df['K2'] = pd.concat([df['KEYCODE'][1:], pd.Series([0])], ignore_index=True)
    if keycode_int:
        df['K2'] = df['K2'].astype('int64')
    df['I1'] = df['INDEX']
    df['I2'] = pd.concat([df['INDEX'][1:], pd.Series([0])], ignore_index=True)
    df['HL1'] = df['RELEASE_TIME'] - df['PRESS_TIME']
    df['IL'] = pd.concat([df['PRESS_TIME'][1:], pd.Series([0])], ignore_index=True) - df['RELEASE_TIME']
    df['HL2'] = pd.concat([df['HL1'][1:], pd.Series([0])], ignore_index=True)

    if keyboard:
        keycode_dist = []
        home_dist = []
        for row in df.index:
            keycode_dist.append(keyboard['keycode'](keyboard['pos'], df['K1'][row], df['K2'][row]))
            home_dist.append(keyboard['home'](keyboard['pos'], [df['K1'][row], df['K2'][row]]))
        df['KD'] = keycode_dist
        df['HD'] = home_dist

    df = df.drop(columns=['PRESS_TIME', 'RELEASE_TIME', 'KEYCODE', 'INDEX'])
    df = df.iloc[:-1, :]
    return df

## https://towardsdatascience.com/do-you-use-apply-in-pandas-there-is-a-600x-faster-way-d2497facfa66
def extract_avg_pair(df, drop_origin=True, rename_avg=True, round_avg=True):
    df['K1_K2'] = df[['K1', 'K2']].apply(tuple, axis=1)
    df['HL1_avg'] = df['HL1']
    df['IL_avg'] = df['IL']
    df['HL2_avg'] = df['HL2']
    for pair in df['K1_K2'].unique():
        avg_df = df[df['K1_K2'] == pair][['HL1', 'IL', 'HL2']].mean()
        mask = df['K1_K2'] == pair
        df.loc[mask, 'HL1_avg'] = avg_df['HL1']
        df.loc[mask, 'IL_avg'] = avg_df['IL']
        df.loc[mask, 'HL2_avg'] = avg_df['HL2']
    if round_avg:
        df['HL1_avg'] = round(df['HL1_avg'])
        df['IL_avg'] = round(df['IL_avg'])
        df['HL2_avg'] = round(df['HL2_avg'])
    if drop_origin:
        df = df.drop(columns=['HL1', 'IL', 'HL2', 'K1_K2'])
    if drop_origin and rename_avg:
        df = df.rename(columns={'HL1_avg':'HL1', 'IL_avg':'IL', 'HL2_avg':'HL2'})
    return df


def generate_keycode_dict(top, data=train_data, add_UNK=True):
    '''
    generate dictionary for the most popular `top` many keycodes using training data
    '''
    keycode_dict = {keycode: i for i, keycode in enumerate(data['KEYCODE'].astype('int32').value_counts()[:top].to_dict().keys())}
    if add_UNK:
        keycode_dict[0] = len(keycode_dict)
    return keycode_dict


def single_kdi_image(curr_chunk, mat_length, keycode_dict):
    mat_kd = np.zeros((mat_length, mat_length))
    mat_hd = np.zeros((mat_length, mat_length))
    mat_index = np.zeros((mat_length, mat_length))
    mat_hl1 = np.zeros((mat_length, mat_length))
    mat_il = np.zeros((mat_length, mat_length))
    mat_hl2 = np.zeros((mat_length, mat_length))
    count = np.zeros((mat_length, mat_length))

    ## form input image
    for row in curr_chunk.index[:-1]:
        i = curr_chunk['K1'][row]
        j = curr_chunk['K2'][row]
        if i in keycode_dict:
            i = keycode_dict[i]
        else:
            i = keycode_dict[0]
        if j in keycode_dict:
            j = keycode_dict[j]
        else:
            j = keycode_dict[0]

        mat_kd[i, j] += curr_chunk['KD'][row]
        mat_hd[i, j] += curr_chunk['HD'][row]
        mat_index[i, j] += curr_chunk['I1'][row]
        mat_hl1[i, j] += curr_chunk['HL1'][row]
        mat_il[i, j] += curr_chunk['IL'][row]
        mat_hl2[i, j] += curr_chunk['HL2'][row]
        count[i, j] += 1
    mat_kd = np.divide(mat_kd, count, out=np.zeros_like(mat_kd), where=count!=0)
    mat_hd = np.divide(mat_hd, count, out=np.zeros_like(mat_hd), where=count!=0)
    mat_index = np.divide(mat_index, count, out=np.zeros_like(mat_index), where=count!=0)
    mat_hl1 = np.divide(mat_hl1, count, out=np.zeros_like(mat_hl1), where=count!=0)
    mat_il = np.divide(mat_il, count, out=np.zeros_like(mat_il), where=count!=0)
    mat_hl2 = np.divide(mat_hl2, count, out=np.zeros_like(mat_hl2), where=count!=0)

    kdi_image = np.stack([mat_kd, mat_hd, mat_index, mat_hl1, mat_il, mat_hl2], axis=-1)

    ## form output vector and get last keycode
    row = curr_chunk.index[-1]
    b1_a1 = curr_chunk['IL'][row]
    b2_b1 = curr_chunk['HL2'][row]
    keycode = curr_chunk['K2'][row]
    
    return kdi_image, keycode, np.array([b1_a1, b2_b1])


def generate_kdi_images(data, mat_length, window, shift):
    keycode_dict = generate_keycode_dict(top=mat_length-1)    ## -1 to offset the 0:mat_length in dict
    window_length = window + 1
    image_arr = []
    keycode_arr = []
    output_arr = []
    for user in data['USER'].unique():
        curr_ds = data[data['USER'] == user]
        i = 0
        while i + window_length < len(curr_ds.index):
            curr_chunk = curr_ds.loc[curr_ds.index[i:i+window_length]]
            curr_image, curr_keycode, curr_output = single_kdi_image(curr_chunk, mat_length, keycode_dict)
            image_arr.append(curr_image)
            keycode_arr.append(curr_keycode)
            output_arr.append(curr_output)
            i = i + shift
        if i < len(curr_ds.index) - 1:
            curr_chunk = curr_ds.loc[curr_ds.index[i:]]
            curr_image, curr_keycode, curr_output = single_kdi_image(curr_chunk, mat_length, keycode_dict)
            image_arr.append(curr_image)
            keycode_arr.append(curr_keycode)
            output_arr.append(curr_output)
    return np.stack(image_arr, axis=0), np.stack(keycode_arr, axis=0), np.stack(output_arr, axis=0)

## Preprocess

In [None]:
mat_length = 40         ## also determines the most frequent `mat_length` keycodes
window = 30
shift = 5
batch_size = 32
unit_time_depth = 82

keyboard = keyboard_dict()

In [None]:
train_df = feature_extractor(train_data, keyboard)
test_df = feature_extractor(test_data, keyboard)
data_df = feature_extractor(data, keyboard)

In [None]:
train_df.head(3)

Unnamed: 0,USER,K1,K2,I1,I2,HL1,IL,HL2,KD,HD
0,2004,16,73,0.0,1.0,550.0,-112.0,127.0,5,3.0
1,2004,73,32,1.0,2.0,127.0,32.0,123.0,3,2.0
2,2004,32,84,2.0,3.0,123.0,0.0,196.0,3,2.0


In [None]:
train_df_avg = extract_avg_pair(train_df)
test_df_avg = extract_avg_pair(test_df)

In [None]:
train_df_avg.head(3)

Unnamed: 0,USER,K1,K2,I1,I2,KD,HD,HL1,IL,HL2
0,2004,16,73,0.0,1.0,5,3.0,340.0,-96.0,107.0
1,2004,73,32,1.0,2.0,3,2.0,102.0,94.0,102.0
2,2004,32,84,2.0,3.0,3,2.0,104.0,187.0,101.0


In [None]:
train_df.head(3)

Unnamed: 0,USER,K1,K2,I1,I2,HL1,IL,HL2,KD,HD,K1_K2,HL1_avg,IL_avg,HL2_avg
0,2004,16,73,0.0,1.0,550.0,-112.0,127.0,5,3.0,"(16, 73)",340.0,-96.0,107.0
1,2004,73,32,1.0,2.0,127.0,32.0,123.0,3,2.0,"(73, 32)",102.0,94.0,102.0
2,2004,32,84,2.0,3.0,123.0,0.0,196.0,3,2.0,"(32, 84)",104.0,187.0,101.0


In [None]:
train_kdi, train_keycode, train_output = generate_kdi_images(train_df_avg, mat_length, window, shift)
test_kdi, test_keycode, test_output = generate_kdi_images(test_df_avg, mat_length, window, shift)

## encoder
onehot_encoder = OneHotEncoder().fit(data_df[['K1']].astype(str))    ## the 2nd column is K1

In [None]:
train_kdi.shape, train_keycode.shape, train_output.shape

((7799, 40, 40, 6), (7799,), (7799, 2))

In [None]:
code = train_keycode.reshape([train_keycode.shape[0], 1]).astype(str)
onehot_encoder.transform(code).toarray().shape

In [None]:
def structure_KDI(kdi, keycode, out, encoder):
    keycode = keycode.reshape([keycode.shape[0], 1]).astype(str)
    keycode_onehot = encoder.transform(keycode).toarray()
    dataset = tf.data.Dataset.from_tensor_slices(({'input_kdi': kdi, 'keycode': keycode_onehot}, out)).batch(batch_size)
    return dataset

In [None]:
trainset = structure_KDI(train_kdi, train_keycode, train_output, onehot_encoder)
testset = structure_KDI(test_kdi, test_keycode, test_output, onehot_encoder)

# Model

## CNN Model

In [None]:
input_1 = keras.layers.Input(shape=[mat_length, mat_length, 6], name='input_kdi')
input_2 = keras.layers.Input(shape=[unit_time_depth], name='keycode')

conv2d_1 = keras.layers.Conv2D(64, (3, 3))(input_1)
conv2d_2 = keras.layers.Conv2D(64, (3, 3))(conv2d_1)
maxpool_1 = keras.layers.MaxPooling2D((2, 2))(conv2d_2)
conv2d_3 = keras.layers.Conv2D(128, (3, 3))(maxpool_1)
conv2d_4 = keras.layers.Conv2D(128, (3, 3))(conv2d_3)
maxpool_2 = keras.layers.MaxPooling2D((2, 2))(conv2d_4)
reshape_1 = keras.layers.Reshape((-1, 128))(maxpool_2)
gru_1 = keras.layers.GRU(128, recurrent_dropout=0.2)(reshape_1)
dense_1 = keras.layers.Dense(64, activity_regularizer=tf.keras.regularizers.L2(0.001))(gru_1)

concat = keras.layers.concatenate([dense_1, input_2])

dense_3 = keras.layers.Dense(64, activity_regularizer=tf.keras.regularizers.L2(0.001))(concat)
output = keras.layers.Dense(2)(dense_3)

model_base = keras.Model(inputs=[input_1, input_2], outputs=[output])

model_base.summary()



Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_kdi (InputLayer)         [(None, 40, 40, 6)]  0           []                               
                                                                                                  
 conv2d (Conv2D)                (None, 38, 38, 64)   3520        ['input_kdi[0][0]']              
                                                                                                  
 conv2d_1 (Conv2D)              (None, 36, 36, 64)   36928       ['conv2d[0][0]']                 
                                                                                                  
 max_pooling2d (MaxPooling2D)   (None, 18, 18, 64)   0           ['conv2d_1[0][0]']               
                                                                                              

In [None]:
## functionalize callbacks

def create_checkpoint_callback(experiment_name, 
                               save_weights_only=True, 
                               monitor='val_loss', 
                               mode='min', 
                               save_best_only=True):
    path = '/content/drive/MyDrive/COMP576/training-logs'
    checkpoint_filepath = path + "/" + "checkpoints" + "/" + experiment_name + "/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath,
                                                             save_weights_only=save_weights_only,
                                                             monitor=monitor,
                                                             mode=mode,
                                                             save_best_only=save_best_only)
    print(f"Saving Model Checkpoint files to :{checkpoint_filepath}")
    return checkpoint_callback

def create_tensorboard_callback(experiment_name):
    path = '/content/drive/MyDrive/COMP576/training-logs'
    log_dir = path + "/" + "tensorboard" + "/" + experiment_name + "/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
    print(f"Saving TensorBoard log files to :{log_dir}")
    return tensorboard_callback

def create_earlystopping_callback(monitor='val_loss',
                                  patience=5):
    return tf.keras.callbacks.EarlyStopping(monitor=monitor, patience=patience)

def get_callbacks(experiment_name):
    earlystopping = create_earlystopping_callback()
    modelcheckpoint = create_checkpoint_callback(experiment_name=experiment_name)
    tensorboard = create_tensorboard_callback(experiment_name=experiment_name)
    return [earlystopping, modelcheckpoint, tensorboard]

In [None]:
model_base.compile(optimizer='adam',
              loss='mae',
              metrics=['mae'])
history = model_base.fit(trainset, epochs=20,
                     validation_data=testset,
                     callbacks=get_callbacks('CNN_base_mae'))

Saving Model Checkpoint files to :/content/drive/MyDrive/COMP576/training-logs/checkpoints/CNN_base_mae/20221203-150154
Saving TensorBoard log files to :/content/drive/MyDrive/COMP576/training-logs/tensorboard/CNN_base_mae/20221203-150154
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20


In [None]:
model_base.predict(testset.take(1))



array([[-81.7737  , 328.83102 ],
       [ 77.880974, 115.53378 ],
       [ 80.69356 , 112.75079 ],
       [ 67.3368  , 111.21024 ],
       [ 67.3368  , 111.21024 ],
       [ 77.880974, 115.53378 ],
       [ 80.69356 , 112.75079 ],
       [110.48561 , 104.02603 ],
       [ 96.15524 , 113.04329 ],
       [ 96.15524 , 113.04329 ],
       [ 77.75593 , 107.93945 ],
       [112.72809 , 120.50877 ],
       [ 77.880974, 115.53378 ],
       [199.85643 , 115.35266 ],
       [121.86208 ,  99.81682 ],
       [ 77.880974, 115.53378 ],
       [ 77.880974, 115.53378 ],
       [ 67.3368  , 111.21024 ],
       [-81.7737  , 328.83102 ],
       [ 77.75593 , 107.93945 ],
       [112.62024 , 119.8502  ],
       [127.28073 , 109.127   ],
       [ 67.3368  , 111.21024 ],
       [-81.7737  , 328.83102 ],
       [ 77.75593 , 107.93945 ],
       [ 94.02034 , 107.8581  ],
       [264.9743  , 100.9535  ],
       [ 67.3368  , 111.21024 ],
       [ 49.000973, 101.973755],
       [ 77.75593 , 107.93945 ],
       [11