In [None]:
import glob
import os
import re
import json
import itertools
import warnings

import pandas as pd
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff
from plotly.offline import init_notebook_mode

from scipy import stats as st

from pathlib import Path

import polars as pl
from polars.exceptions import ColumnNotFoundError, ComputeError

from typing import Union, List

from helpers import *

warnings.filterwarnings("ignore")
init_notebook_mode()

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

## Check DATA

The metadata_participants.txt contains demographics and derived metrics for each participant.
The column names are described as follows:
<table>
    <tr><td>PARTICIPANT_ID</td><td>Unique ID of participant</td></tr>
    <tr><td>AGE</td><td>Participant's age</td></tr>
    <tr><td>GENDER</td><td>Participant's gender</td></tr>
    <tr><td>HAS_TAKEN_TYPING_COURSE</td><td>Whether the participant has taken a typing course (1) or not (0)</td></tr>
    <tr><td>COUNTRY</td><td>Country from which the participant has taken the study</td></tr>
    <tr><td>KEYBOARD_LAYOUT</td><td>QWERTY, AZERTY, QWERTZ or other layout</td></tr>
    <tr><td>NATIVE_LANGUAGE</td><td>Native language of participant</td></tr>
    <tr><td>FINGERS</td><td>choice between 1-2, 3-4, 5-6, 7-8 and 9-10 fingers used</td></tr>
    <tr><td>TIME_SPENT_TYPING</td><td>Number of hours spent typing per day</td></tr>
    <tr><td>KEYBOARD_TYPE</td><td>full (desktop), laptop, small (physical) or (on-screen) touch keyboard</td></tr>
    <tr><td>ERROR_RATE(%)</td><td>Average error rate</td></tr>
    <tr><td>AVG_WPM</td><td>Average words per minute</td></tr>
    <tr><td>AVG_IKI</td><td>Average inter-key interval</td></tr>
    <tr><td>ROLLOVER</td><td>Average rollover ratio</td></tr>
    <tr><td>KSPC</td><td>Average Keystrokes per Character</td></tr>
    <tr><td>AVG_KEYPRESS</td><td>Average keypress duration</td></tr>
</table>

Files named number_keystrokes.txt are the keystroke-by-keystroke logs for all test sentences transcribed by the participant ID = number.<br>
The column names are described as follows: <br>
    
 <table>
    <tr><td>PARTICIPANT_ID</td><td>Unique ID of participant</td></tr>
    <tr><td>TEST_SECTION_ID</td><td>Unique ID of sentence within a participant's test</td></tr>
    <tr><td>SENTENCE</td><td>Presented sentence</td></tr>
    <tr><td>USER_INPUT</td><td>Transcribed sentence</td></tr>
    <tr><td>KEYSTROKE_ID</td><td>Unique keystroke id (across all participants)</td></tr>
    <tr><td>PRESS_TIME</td><td>Timestamp when the key was pressed</td></tr>
    <tr><td>RELEASE_TIME</td><td>Timestamp when the key was released</td></tr>
    <tr><td>LETTER</td><td>String representation of the pressed key</td></tr>
    <tr><td>KEYCODE</td><td>JavaScript keycode of the pressed key</td></tr>
</table>

For some users, the typed letter was not logged correctly. Instead, the corresponding javascript keycode can be used. <br>

### Features to calculate 
Link: https://towardsdatascience.com/keystroke-dynamics-analysis-and-prediction-part-1-eda-3fe2d25bac04 <br>
- **Hold Time (aka Dwell Time)**: Time the key is pressed
- **Press-Press Time**: Time between the presses of 2 consecutive keys
- **Release-Press Time (aka Flight Time or Inter-Key Inteval or IKI)**: Time to 'fly' from one key to another (negative in case of rollover)
- **Release-Release Time**: Time between the releases of 2 consecutive keys  

**ROLLOVER** - RELEASE_PRESS_TIME IS NEGATIVE!!!

In [None]:
df = read_data_for_participant(145007)
df.head(5)

### Getting information on which hand is used to type a specific key (how-we-type dataset)

In [None]:
hwt_df = pl.DataFrame()

for i in os.listdir(howwetype):
    if i.startswith("."):
      continue 
    df = pl.read_csv(os.path.join(howwetype, i), sep="\t")
    hwt_df = pl.concat([hwt_df, df[["key_symbol", "finger"]]])

In [None]:
hwt = hwt_df.with_columns([
    pl.col("finger").str.split("_"),
]).apply(lambda x: (x[0], x[1][0]))

hwt.columns = ["key", "hand"]

In [None]:
hwt = hwt.groupby("key", maintain_order=True).agg([
          pl.col("hand").apply(lambda x: st.mode(x).mode[0]).alias("hand"),
      ])

In [None]:
hwt

## Creating Bigrams

**While creating most common bigrams, space and shift wont be taken into consideration - only characters / symbols**

In [None]:
mapper = Mapper()
participant_ids = find_all_participants(MAIN_DIR)

In [None]:
d = read_data_for_participant(participant_ids[2])
ignore_keys = [mapper.get_code_from_key("<SoS>"), mapper.get_code_from_key("shift"), mapper.get_code_from_key("space")]
create_bigrams(d, ignore_keys=ignore_keys)

## Big Data

In [None]:
BIG_DATA_DIR = "/Users/ivanshamilov/Uni/Master's/S3/Masters-Thesis/data/big_data/Keystrokes/files"

big_metadata = pl.read_csv(f"{BIG_DATA_DIR}/metadata_participants.txt", sep="\t")
big_metadata = big_metadata.filter(pl.col("PARTICIPANT_ID") != 3)
print(big_metadata.shape)
big_metadata.head()

In [None]:
with open(f"{BIG_DATA_DIR}/readme.txt", "r") as f:
    content = f.read()
    
print(content)

### Categorization of participants

For the sake of analysis, participants will be categorized into: slow / fast typists, trained / untrained typists.

Participant will be considered 'fast', if his/her WPM is bigger than 80% of other typist on the dataset. Others - 'slow'' 

In [None]:
fast_typist_threshold = big_metadata["AVG_WPM_15"].quantile(0.8)

big_metadata = big_metadata.select([
    pl.col("*"),
    pl.when(pl.col("AVG_WPM_15") > fast_typist_threshold)
    .then("fast")
    .otherwise("slow").alias("SPEED")
])

In [None]:
fast_typist_threshold

In [None]:
# only qwerty keyboard will be used 
print(big_metadata["LAYOUT"].value_counts())

big_metadata = big_metadata.filter(pl.col("LAYOUT") == "qwerty")
big_metadata.shape

In [None]:
# only full (desktop) and laptop keyboard type will be used
print(big_metadata["KEYBOARD_TYPE"].value_counts())
big_metadata = big_metadata.filter((pl.col("KEYBOARD_TYPE") == "full") | (pl.col("KEYBOARD_TYPE") == "laptop"))
print(big_metadata["KEYBOARD_TYPE"].value_counts())

In [None]:
# set upper bound (regular office time + 4 hours after) and lower bound for TIME_SPENT_TYPING 
upper_bound_tst = 8 + 4
lower_bound_tst = 0

print(big_metadata.shape)
big_metadata = big_metadata.filter((pl.col("TIME_SPENT_TYPING") <= upper_bound_tst) & (pl.col("TIME_SPENT_TYPING") >= lower_bound_tst))
print(big_metadata.shape)

In [None]:
participant_ids = big_metadata["PARTICIPANT_ID"].view().tolist()
len(participant_ids)

**Keypress values were missing in metadata**

In [None]:
participant_ids = find_all_participants(BIG_DATA_DIR)

In [None]:
pid_keypress = pl.DataFrame()

for i, pid in enumerate(participant_ids):
    if i % 1000 == 0:
        print(f"File {i:7d} of {len(participant_ids)}")
    try: 
        df = pl.read_csv(os.path.join(BIG_DATA_DIR, f"{pid}_keystrokes.txt"), sep="\t", columns=["PRESS_TIME", "RELEASE_TIME"], infer_schema_length=10000)
        hold_time = df.with_columns([(pl.col("RELEASE_TIME") - pl.col("PRESS_TIME")).alias("HOLD_TIME")])["HOLD_TIME"]
        pid_keypress = pl.concat([pid_keypress, pl.DataFrame([[int(pid)], [hold_time.mean()]])])
    except FileNotFoundError:
        continue

In [None]:
pid_keypress.columns = ["PARTICIPANT_ID", "AVG_KEYPRESS"]
pid_keypress

In [None]:
big_metadata = big_metadata.join(pid_keypress, on="PARTICIPANT_ID", how="left")
print(big_metadata["AVG_KEYPRESS"].null_count())
big_metadata.write_csv("data/preprocessed_big_metadata.csv")

In [None]:
big_metadata = pl.read_csv("data/preprocessed_big_metadata.csv")
big_metadata

In [None]:
big_metadata_trained = big_metadata.filter(pl.col("HAS_TAKEN_TYPING_COURSE") == 1)
big_metadata_untrained = big_metadata.filter(pl.col("HAS_TAKEN_TYPING_COURSE") == 0)

big_metadata_fast = big_metadata.filter(pl.col("SPEED") == "fast")
big_metadata_slow = big_metadata.filter(pl.col("SPEED") == "slow")

big_metadata_trained.shape, big_metadata_untrained.shape, big_metadata_fast.shape, big_metadata_slow.shape

## Data Analysis

## Metrics of Effectiveness

In [None]:
def statistics(data):
    print(data.describe())
    print("Fisher's kurtosis: ", data.kurtosis())
    print("Skewness: ", data.skew())

### WPM distribution

In [None]:
fig = ff.create_distplot([big_metadata["AVG_WPM_15"].view().tolist()], group_labels=["AVG_WPM_15"], bin_size=3, show_rug=False, colors=["orange"])
fig.update_layout(template="none", showlegend=False, width=1000, height=800, xaxis=dict(dtick=10), font=dict(size=16))
fig.update_xaxes(showgrid=True)
fig.show()

### WPM Trained/Untrained

In [None]:
fig = ff.create_distplot([
        big_metadata_trained["AVG_WPM_15"].view().tolist(),
        big_metadata_untrained["AVG_WPM_15"].view().tolist(),
      ], group_labels=["Trained", "Untrained"], bin_size=3, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=10), font=dict(size=18), legend=dict(
    yanchor="top",
    y=0.99,
    xanchor="left",
    x=0.8,
    font=dict(size=30)
))

fig.update_xaxes(showgrid=True)
fig.show()

### WPM Fast/Slow

In [None]:
fig = ff.create_distplot([
        big_metadata_fast["AVG_WPM_15"].view().tolist(),
        big_metadata_slow["AVG_WPM_15"].view().tolist(),
      ], group_labels=["Fast", "Slow"], bin_size=1, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=10), font=dict(size=18), legend=dict(
    yanchor="top",
    y=0.99,
    xanchor="left",
    x=0.8,
    font=dict(size=30)
))

fig.update_xaxes(showgrid=True)
fig.show()

### Inter-Key Interval Trained/Untrained

In [None]:
fig = ff.create_distplot([
        big_metadata_trained["AVG_IKI"].view().tolist(),
        big_metadata_untrained["AVG_IKI"].view().tolist(),
      ], group_labels=["Trained", "Untrained"], bin_size=15, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"], 
        
)

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=100), font=dict(size=16), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    )
                 )
fig.update_xaxes(showgrid=True, range=[0, 1000])
fig.show()

### Inter-Key Interval Fast/Slow

In [None]:
fig = ff.create_distplot([
        big_metadata_fast["AVG_IKI"].view().tolist(),
        big_metadata_slow["AVG_IKI"].view().tolist(),
      ], group_labels=["Fast", "Slow"], bin_size=5, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=100), font=dict(size=16), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))

fig.update_xaxes(showgrid=True, range=[0, 1000])
fig.show()

In [None]:
fig = ff.create_distplot([
        big_metadata["KSPC"].view().tolist(),
        big_metadata["ECPC"].view().tolist(),
      ], group_labels=["KSPC", "ECPC"], bin_size=0.008, show_rug=False, 
    colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=0.1), font=dict(size=18), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))
fig.update_xaxes(showgrid=True, range=[0, 1.6])
fig.show()

### KSPC & ECPC for Trained / Untrained

In [None]:
fig = ff.create_distplot([
        big_metadata_trained["KSPC"].view().tolist(),
        big_metadata_untrained["KSPC"].view().tolist(),
        big_metadata_trained["ECPC"].view().tolist(),
        big_metadata_untrained["ECPC"].view().tolist(),
      ], group_labels=["Trained KSPC", "Untrained KSPC", "Trained ECPC", "Untrained ECPC"], bin_size=0.008, show_rug=False, 
    colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)", "rgba(255, 0, 0, 0.5)", "rgba(127, 0, 255, 0.6)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=0.1), font=dict(size=18), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))
fig.update_xaxes(showgrid=True, range=[0, 1.6])
fig.show()

### KSPC & ECPC for Fast / Slow

In [None]:
fig = ff.create_distplot([
        big_metadata_fast["KSPC"].view().tolist(),
        big_metadata_slow["KSPC"].view().tolist(),
        big_metadata_fast["ECPC"].view().tolist(),
        big_metadata_slow["ECPC"].view().tolist(),
      ], group_labels=["Fast KSPC", "Slow KSPC", "Fast ECPC", "Slow ECPC"], bin_size=0.008, show_rug=False, 
    colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)", "rgba(255, 0, 0, 0.5)", "rgba(127, 0, 255, 0.6)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=0.1), font=dict(size=18), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))
fig.update_xaxes(showgrid=True, range=[0, 1.6])
fig.show()

### Inter-Key Inteval & Keypress Distributions

In [None]:
fig = ff.create_distplot([
        big_metadata["AVG_KEYPRESS"].view().tolist(),
        big_metadata["AVG_IKI"].view().tolist(),
      ], group_labels=["KeyPress", "Inter-Key Interval"], bin_size=3, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=50), font=dict(size=16), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))
fig.update_xaxes(showgrid=True, range=[0, 500])
fig.show()

### Key Press for trained / untrained

In [None]:
fig = ff.create_distplot([
        big_metadata_trained["AVG_KEYPRESS"].view().tolist(),
        big_metadata_untrained["AVG_KEYPRESS"].view().tolist(),
      ], group_labels=["Trained", "Untrained"], bin_size=5, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=50), font=dict(size=16), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    ))

fig.update_xaxes(showgrid=True, range=[0, 400])
fig.show()

### Key Press for fast / slow

In [None]:
fig = ff.create_distplot([
        big_metadata_fast["AVG_KEYPRESS"].view().tolist(),
        big_metadata_slow["AVG_KEYPRESS"].view().tolist(),
      ], group_labels=["Fast", "Slow"], bin_size=5, show_rug=False, colors=["rgba(255, 165, 0, 0.9)", "rgba(0, 0, 255, 0.3)"])

fig.update_layout(template="none", width=1200, height=1000, xaxis=dict(dtick=50), font=dict(size=18), 
                 legend=dict(
                        yanchor="top",
                        y=0.99,
                        xanchor="left",
                        x=0.8,
                        font=dict(size=30)
                    )
                 )
fig.update_xaxes(showgrid=True, range=[0, 400])
fig.show()

### Uncorrected error rate

In [None]:
fig = ff.create_distplot([big_metadata["ERROR_RATE"].view().tolist()], group_labels=["ERROR_RATE"], bin_size=0.2, show_rug=False, colors=["orange"])
fig.update_layout(template="none", showlegend=False, width=1000, height=800, font=dict(size=16))
fig.update_xaxes(showgrid=True)
fig.show()