In [None]:
# Imports
import pandas as pd
import numpy as np
from pydub import AudioSegment
import noisereduce as nr
import soundfile as sf

import matplotlib.pyplot as plt

## Audio loading and preprocessing

In [None]:
from preprocess_data import preprocess_data

features, (labels, labels_ohe), transformers = preprocess_data()

In [None]:
labels.unique()

In [None]:
print(features.shape)

## K-Means Clustering and t-SNE for data visualization

In [None]:
# Perform clustering on the keystrokes
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.cluster import SpectralClustering
from sklearn.metrics import silhouette_score

# Set k to the number of unique labels logged
cluster_k = len(labels.unique())
kmeans = KMeans(n_clusters=cluster_k, random_state=0, n_init="auto").fit(features)

print(f"{cluster_k} clusters")
print('K-Means Clustering')
print(f"\tSilhouette score: {silhouette_score(features, kmeans.labels_) :.2f}")
print('')

#gmm = GaussianMixture(n_components=cluster_k, random_state=0)
#gmm_predicted = gmm.fit_predict(features)
#
#print("Gaussian Mixture Model")
#print(f"\tSilhouette score: {silhouette_score(features, gmm_predicted) :.2f}")
#print('')
#
#spectral = SpectralClustering(n_clusters=cluster_k, random_state=0).fit(features)
#
#print("Spectral Clustering")
#print(f"\tSilhouette score: {silhouette_score(features, spectral.labels_) :.2f}")
#print('')

# Get cluster predictions for each row
predicted_clusters = []
for _, keystroke in features.iterrows():
    predicted_clusters.append(kmeans.predict(keystroke.array.reshape(1, -1)))

predicted_clusters = np.array(predicted_clusters).squeeze()

In [None]:
# Plot the data with t-SNE dimensionality reduction to determine if the keystrokes make clusters

from sklearn.manifold import TSNE
import seaborn as sn

def tsne_plot(data, labels=None):
    model = TSNE(n_components = 2, random_state = 1)
    # configuring the parameters
    # the number of components = 2
    # default perplexity = 30
    # default learning rate = 200
    # default Maximum number of iterations
    # for the optimization = 1000
    
    tsne_data = model.fit_transform(data)
    
    # Colors points based on their label, if they have any
    if labels is not None:
        tsne_data = np.vstack((tsne_data.T, labels)).T
        tsne_df = pd.DataFrame(data = tsne_data,
           columns =("Dim_1", "Dim_2", "label"))
    
        # Plotting the result of tsne
        sn.scatterplot(data=tsne_df, x='Dim_1', y='Dim_2',
                       hue='label', palette="bright")
    else:
        tsne_df = pd.DataFrame(data = tsne_data,
           columns =("Dim_1", "Dim_2"))
    
        # Plotting the result of tsne
        sn.scatterplot(data=tsne_df, x='Dim_1', y='Dim_2', palette="bright")
    
    plt.legend(bbox_to_anchor=(1.05, 1.0), loc='upper left')

    plt.title("t-SNE keystrokes")
    plt.show()

tsne_plot(features, labels)

## Deep Learning

In [None]:
features.head()

In [None]:
from imblearn.over_sampling import RandomOverSampler

# Balance dataset with oversampling
def oversample_datset(X, y):

    oversample = RandomOverSampler()
    over_X, over_y = oversample.fit_resample(X, y)
    return over_X, over_y

In [None]:
# Count number of each class
unique_elements, counts = np.unique(np.array(labels), return_counts=True)
count_of_elements = list(zip(unique_elements, counts))
count_of_elements = sorted(count_of_elements, key=lambda x: x[1], reverse=True)

total_elements = counts.sum()

print("Element: frequency of element")
for element, count in count_of_elements:
    print(f"{element}:\t{count / total_elements * 100 :.2f}%")

In [None]:
# Separate data into training and test sets
from sklearn.model_selection import train_test_split

X, y = oversample_datset(features, labels)

y = transformers['encoder'].transform(np.array(y).reshape(-1, 1)).toarray()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

In [None]:
X_train = X_train.reset_index(drop=True)
X_test = X_test.reset_index(drop=True)

In [None]:
import torch
import torch.nn as nn

In [None]:
# Uses GPU if available, otherwise uses CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Convert data to tensors
X_train = torch.tensor(X_train.to_numpy().astype(np.float32)).to(device)
y_train = torch.tensor(y_train.astype(np.float32)).to(device)
X_test = torch.from_numpy(X_test.to_numpy().astype(np.float32)).to(device)
y_test = torch.from_numpy(y_test.astype(np.float32)).to(device)

In [None]:
input_size = X_train.shape[1]
output_size = y_train.shape[1]
hidden_size = [256, 128, 64] # Hidden layer sizes
print(f'Input size: {input_size}\nOutput size: {output_size}')

In [None]:
# Define model
class Model(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Model, self).__init__()
        self.linear_sequential_stack = nn.Sequential(
            nn.Linear(input_size, hidden_size[0]),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(hidden_size[0], hidden_size[1]),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(hidden_size[1], hidden_size[2]),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(hidden_size[2], output_size),
            nn.Softmax(dim=1),
        )

    def forward(self, x):
        logits = self.linear_sequential_stack(x)
        return logits

model = Model(input_size, hidden_size, output_size).to(device)
l = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Get the F1 score
from sklearn.metrics import f1_score
import numpy as np
def getF1(pred_y: torch.Tensor, true_y: torch.Tensor):
    pred_y = pred_y.cpu().detach().numpy()
    pred_y = np.argmax(pred_y, axis=1)
    
    true_y = true_y.cpu().detach().numpy()
    true_y = np.argmax(true_y, axis=1)

    return f1_score(true_y, pred_y, average='macro')

# Get accuracy
def getAcc(pred_y: torch.Tensor, true_y: torch.Tensor):
    pred_y = pred_y.cpu().detach().numpy()
    pred_y = np.argmax(pred_y, axis=1)

    true_y = true_y.cpu().detach().numpy()
    true_y = np.argmax(true_y, axis=1)

    num_values = np.float32(pred_y.shape[0])
    num_correct = np.sum(pred_y == true_y)
    
    return num_correct / num_values

In [None]:
torch.manual_seed(1112)
np.random.seed(1112)

num_epochs = 10_000

train_loss = [None]*num_epochs
val_loss = [None]*num_epochs

train_acc = [None]*num_epochs
val_acc = [None]*num_epochs

for epoch in range(num_epochs):
    model.train()

    pred = model(X_train)
    loss = l(pred, y_train)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    tr_loss = loss.item()
    tr_acc = getAcc(pred, y_train)

    model.eval()
    
    pred = model(X_test)
    te_loss = l(pred, y_test).item()
    te_acc = getAcc(pred, y_test)


    train_loss[epoch] = tr_loss
    val_loss[epoch] = te_loss
    train_acc[epoch] = tr_acc
    val_acc[epoch] = te_acc
    if (epoch+1) % 10 == 0 or epoch == 0:
        print(f'Epoch {epoch+1} - train loss: {tr_loss :.4f} - val loss: {te_loss :.4f} - val acc: {te_acc:.4f}')

In [None]:
pt_trainhist = pd.DataFrame({'train_loss': train_loss, 'train_acc': train_acc, 'val_loss': val_loss, 'val_acc': val_acc, 'epoch': np.arange(num_epochs)})
pt_trainhist.tail()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.subplot(2, 1, 1)
plt.title("Model Loss")
sns.lineplot(x='epoch', y='train_loss', data=pt_trainhist)
sns.lineplot(x='epoch', y='val_loss', data=pt_trainhist)
plt.legend(labels=['train_loss', 'val_loss'])

plt.subplot(2, 1, 2)
plt.title("Model Accuracy")
sns.lineplot(x='epoch', y='train_acc', data=pt_trainhist)
sns.lineplot(x='epoch', y='val_acc', data=pt_trainhist)
plt.legend(labels=['train_acc', 'val_acc'])

plt.tight_layout()
plt.show()

In [None]:
# Check model predictions
pred_idx_end = 50

predictions = model(torch.tensor(X[:pred_idx_end].to_numpy().astype(np.float32)).to(device)).cpu().detach().numpy()
pred_y = transformers['encoder'].inverse_transform(predictions).squeeze()
true_y = transformers['encoder'].inverse_transform(y[:pred_idx_end]).squeeze()

print("Predicted:\tActual:")
for i in range(pred_idx_end):
    print(f"{pred_y[i]}\t\t{true_y[i]}")

In [None]:
print(model(torch.tensor(X[0:1].to_numpy().astype(np.float32)).to(device)).cpu().detach().numpy())