In [1]:
# %pip install imblearn xgboost scikit-learn numpy pandas matplotlib tqdm

In [2]:
import pandas as pd
import numpy as np

from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, confusion_matrix, precision_score, f1_score, accuracy_score, roc_auc_score
from sklearn.pipeline import Pipeline

In [3]:
np.random.seed(142)

In [4]:
raw_data = pd.read_csv('filtered_keystroke.csv')

In [5]:
print(f"Number of users: {len(raw_data['PARTICIPANT_ID'].unique())}")
print(f"Number of samples: {len(raw_data)}")

Number of users: 168593
Number of samples: 2495152


In [6]:
raw_data.dropna(inplace=True)

In [7]:
print(f"Number of samples: {len(raw_data)}")

Number of samples: 2495095


In [8]:
users = np.random.choice(raw_data["PARTICIPANT_ID"].unique(), 4000, replace=False)
data = raw_data[raw_data["PARTICIPANT_ID"].isin(users)].copy()

In [9]:
print(f"Number of users: {len(data['PARTICIPANT_ID'].unique())}")
print(f"Number of samples: {len(data)}")

Number of users: 4000
Number of samples: 59183


In [10]:
data.head()

Unnamed: 0,PARTICIPANT_ID,TEST_SECTION_ID,mean_hold_time,mean_f1,mean_f2,mean_f3,mean_f4,capslock_usage,negative_uu,negative_ud
255,58.0,421,101.9375,131.68254,230.634921,233.84127,332.793651,0.0,0.0,0.0
256,58.0,435,109.914894,125.891304,233.630435,236.826087,344.565217,0.0,0.0,0.0
257,58.0,450,113.321429,87.222222,193.962963,201.518519,308.259259,0.0,0.0,0.0
258,58.0,452,125.326087,165.133333,287.844444,291.244444,413.955556,0.0,0.0,0.0
259,58.0,463,96.758621,102.245614,197.701754,199.491228,294.947368,0.0,0.0,0.0


In [11]:
normalized_columns = ['mean_f1', 'mean_f2', 'mean_f3', 'mean_f4', 'mean_hold_time']
for column in normalized_columns:
    data[column] = data[column] / 1000

## Description

### Data Description

This dataset is transformed from the BB-MAS keystrokes dataset. Each participant joined two session:
- Fixed-text: the participants typed the same texts
- Free-text: the participants typed a random long predefined texts

Their keystroke events is recorded with three columns:
- **key** - the name of the pressed key
- **direction** - indicates if the key is pressed or released (0 | 1)
- **timestamp** - the time that the event happens, it is UNIX timetamp in milisecons

Our datasets divided the events into samples with the duration 2 minutes for each samples. Some features are extracted from the keystroke events:
- **Hold time**: the duration when user pressed then released the key (also known as Dwell Time)
- **F1: Press-to-Press Time**: The time interval between the press of one key and the press of the next key.
- **F2: Release-to-Press Time**: The time interval between the release of one key and the press of the next key.
- **F3: Press-to-Release Time**: The time interval a key is held down (from press to release).
- **F4: Release-to-Release Time**: The time interval between the release of one key and the release of the next key.


### Experiement Description

- Interate through the user samples, 1 user is picked as legitimate, the remaining ones will be the imposters.
- Through 31 samples, we want to proved the efficicency of our method to dynamically train keystroke dynamics recognition models for each user.
- In real-world use cases, the data of imposter always more than the legitimate one (at enrollment time), so we will apply SMOTE for imbalance datasets top prevent the bias on majority class (imposter).
- Extreme Gradient Booster (XGB) is used in this experiement as it was proved as efficient models with high performance (comparing to Random Forest) in this area.
- We used GridSearch for retrieving the best performance model to use for prediction.

## Define needed utilities for the experiment

In [12]:
def data_augmentation(
    df,
    factor=5,
    perturbation_range=(-0.001, 0.001),
    columns=None,
):
    augmented_data = []
    columns = columns or df.columns

    # Generate augmented data
    for _ in range(factor):
        for _, row in df.iterrows():
            synthetic_sample = row.copy()  # Start with a copy of the original row

            for column in columns:
                if column in df.columns:
                    # Randomly perturb the column within the specified range
                    perturbation = np.random.uniform(*perturbation_range)
                    synthetic_sample[column] += perturbation

            augmented_data.append(synthetic_sample)

    # Create a DataFrame from the augmented data
    df_augmented = pd.DataFrame(augmented_data)

    # Combine original and augmented data
    df_combined = pd.concat([df, df_augmented], ignore_index=True)
    return df_combined, df_augmented

In [13]:
def create_pipeline():
    return Pipeline(
        [
            ("scaler", StandardScaler()),
            (
                "xgb",
                XGBClassifier(eval_metric="logloss"),
            ),
        ]
    )

In [14]:
def trainer(user_id):
    features = [
        'mean_f1', 'mean_f2', 'mean_f3', 'mean_f4', 
        'mean_hold_time', 'capslock_usage', 'negative_uu', 'negative_ud'
    ]

    # Separate legitimate and imposter data
    imposter_data = data[data["PARTICIPANT_ID"] != user_id].copy()[features].sample(4000)
    legitimate_data = data[data["PARTICIPANT_ID"] == user_id].copy()[features]

    # Split legitimate data into train and test
    legitimate_data_for_train = legitimate_data.sample(n=5)
    legitimate_data_for_test = legitimate_data.drop(legitimate_data_for_train.index)

    # Perform data augmentation on legitimate training data
    legitimate_data, _ = data_augmentation(
        legitimate_data_for_train,
        factor=4,
        columns=[
            "mean_hold_time", "mean_f1", "mean_f2", "mean_f3", "mean_f4",
        ],
        perturbation_range=(-0.002, 0.002),
    )

    imposter_data.loc[:, 'label'] = 0
    legitimate_data.loc[:, 'label'] = 1
    legitimate_data_for_test.loc[:, 'label'] = 1

    train_legitimate, test_legitimate = train_test_split(
        legitimate_data, test_size=0.2
    )
    train_imposter, test_imposter = train_test_split(
        imposter_data, test_size=0.2
    )


    # Combine training and testing datasets
    train_set = pd.concat([train_legitimate, train_imposter])
    test_set = pd.concat([test_imposter, test_legitimate, legitimate_data_for_test])

    # Shuffle datasets
    train_set = train_set.sample(frac=1, random_state=142).reset_index(drop=True)
    test_set = test_set.sample(frac=1, random_state=142).reset_index(drop=True)

    # Separate features and labels
    x_train = train_set.drop(columns=["label"])
    x_test = test_set.drop(columns=["label"])
    y_train = train_set["label"]
    y_test = test_set["label"]

    # Handle class imbalance using SMOTE
    smote = SMOTE(random_state=142, k_neighbors=2)
    x_train_balanced, y_train_balanced = smote.fit_resample(x_train, y_train)

    # Define parameter grid for GridSearch
    param_grid = {
        "xgb__n_estimators": [50, 100, 200],
        "xgb__max_depth": [3, 5, 7],
        "xgb__learning_rate": [0.01, 0.1, 0.2],
        "xgb__subsample": [0.8, 1.0],
        "xgb__scale_pos_weight": [25, 50, 75, 99, 100],
    }

    # Create a pipeline and perform GridSearch
    pipeline = create_pipeline()
    grid_search = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        cv=5,
        scoring="accuracy",
        verbose=0,
        n_jobs=-1,
    )
    grid_search.fit(x_train_balanced, y_train_balanced)

    # Get the best model and predictions
    best_model = grid_search.best_estimator_
    y_prob = best_model.predict_proba(x_test)[:, 1]  # Probabilities for the positive class
    y_pred = best_model.predict(x_test)

    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_prob)

    # Calculate EER (point where FAR == FRR)
    fnr = 1 - tpr  # False Negative Rate
    eer_threshold = thresholds[np.nanargmin(np.abs(fpr - fnr))]
    eer = fpr[np.nanargmin(np.abs(fpr - fnr))]

    # Calculate AUC-ROC
    auc_roc = roc_auc_score(y_test, y_prob)

    # Return all metrics
    return {
        "best_model": best_model,
        "accuracy": accuracy_score(y_test, y_pred),
        "f1_score": f1_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred),
        "confusion_matrix": confusion_matrix(y_test, y_pred),
        "eer_threshold": eer_threshold,
        "eer": eer,
        "auc_roc": auc_roc,
    }


In [15]:
history = {}

# Use tqdm to show progress bar
for user_id in tqdm(data['PARTICIPANT_ID'].unique(), desc="Processing Users"):
    try:
        results = trainer(user_id)
        
        history[user_id] = {
            'accuracy': results['accuracy'],
            'f1': results['f1_score'],
            'precision': results['precision'],
            'confusion_matrix': results['confusion_matrix'],
            'eer': results['eer'],
            'eer_threshold': results['eer_threshold'],
            'auc_roc': results['auc_roc']
        }
    except Exception as e:
        pass

  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(data, dtype=dtype, copy=copy,
  _data = np.array(d

In [16]:
import pickle

with open('alto_histories.pkl', 'wb') as file:
    pickle.dump(history, file)

In [17]:
# Extract the metrics
accuracies = [data['accuracy'] for data in history.values()]
f1_scores = [data['f1'] for data in history.values()]
precisions = [data['precision'] for data in history.values()]

# Calculate mean and standard deviation
mean_accuracy = np.mean(accuracies)
std_accuracy = np.std(accuracies)

mean_f1 = np.mean(f1_scores)
std_f1 = np.std(f1_scores)

mean_precision = np.mean(precisions)
std_precision = np.std(precisions)

# Output the results
print(f"Mean Accuracy: {mean_accuracy:.3f}, Std Accuracy: {std_accuracy:.3f}")
print(f"Mean F1: {mean_f1:.3f}, Std F1: {std_f1:.3f}")
print(f"Mean Precision: {mean_precision:.3f}, Std Precision: {std_precision:.3f}")

Mean Accuracy: 0.984, Std Accuracy: 0.004
Mean F1: 0.440, Std F1: 0.138
Mean Precision: 0.655, Std Precision: 0.188
