In [None]:
import os
import sys
import librosa
import matplotlib.pyplot as plt
import librosa.display
import IPython.display as ipd
import numpy as np
import mysql.connector as mysql
import bokeh.plotting
import bokeh.models
import bokeh.io
import datetime
import pandas as pd
import math
import re
import statistics
from scipy.signal import find_peaks
import sklearn.cluster
from itertools import groupby
from sklearn.preprocessing import scale
from sklearn.metrics.cluster import homogeneity_score, normalized_mutual_info_score, homogeneity_completeness_v_measure
from sklearn.metrics import silhouette_score
import sklearn.mixture
from tqdm import tqdm

AUDIO_FILES_DIR='../web/data/videos'

datetimeTickFormatter = bokeh.models.DatetimeTickFormatter(
    microseconds = ['%d-%m    %H:%M:%S.%3N'],
    milliseconds = ['%d-%m    %H:%M:%S.%3N'],
    seconds = ['%d-%m    %H:%M:%S.%3N'],
    minsec = ['%d-%m    %H:%M:%S.%3N'],
    minutes = ['%d-%m    %H:%M:%S.%3N'],
    hourmin = ['%d-%m    %H:%M:%S.%3N'],
    hours = ['%d-%m    %H:%M:%S.%3N'],
    days = ['%d-%m    %H:%M:%S.%3N'],
    months = ['%d-%m    %H:%M:%S.%3N'],
    years = ['%d-%m    %H:%M:%S.%3N'])

def getCombinedWavs(directory, use_directory_starttime=True):
    nr = 0
    sr = None
    y = None
    files = os.listdir(f'{AUDIO_FILES_DIR}/{directory}')
    files.sort()
    last_endtime = None
    total_gap = 0
    for filename in files:
        fullfilename = f'{AUDIO_FILES_DIR}/{directory}/{filename}'
        y_part, sr_part = librosa.load(fullfilename, sr=None)
        
        file_end = int(re.search("^\d*-(\d*)\.wav", filename).group(1))
        file_start = file_end - int(len(y_part) * 1000 / sr_part)
        if sr == None:
            sr = sr_part
            y = y_part
            end = int(re.search("^\d*-(\d*)\.wav", files[0]).group(1))
            if use_directory_starttime:
                unixtime_start = int(re.search(".*-(\d*)", directory).group(1))
            else:
                unixtime_start = file_start            
        else:
            if sr != sr_part:
                raise Exception("Sampling rate mismatch")
            y = np.concatenate([y, y_part])
        nr += 1
        if last_endtime == None:
            print (f'part {"%3d" % nr}, {"%7d" % len(y_part)} samples from unixtime {file_start} to {file_end}.')
        else:
            gap = file_start - last_endtime
            total_gap += gap
            print (f'part {"%3d" % nr}, {"%7d" % len(y_part)} samples from unixtime {file_start} to {file_end}. Gap: {gap}. Total gap: {total_gap}')
        last_endtime = file_end
        
    unixtime_end = int(unixtime_start + len(y) * 1000 / sr)
    print (f'total:    {"%7d" % len(y)} samples from unixtime {unixtime_start} to {unixtime_end}')
    print (f'Read {nr} parts from {directory}. {y.shape[0]} samples: {"%.1f" % (y.shape[0]/sr)} seconds at {sr} samples/sec, starting at unixtime {unixtime_start}')
    return y, sr, unixtime_start

def queryDb(query, args=()):
    try:
        conn = mysql.connect(user="root",
                             password="1234",
                             host="localhost",
                             port=3306,
                             database="imagedescription")
    except mysql.Error as e:
        print(f"Error connecting to MariaDB Platform: {e}")
        sys.exit(1)

    # Get Cursor
    cur = conn.cursor()

    cur.execute(query, args)
    data = [x for x in cur]
    cur.close()
    return data

def getSteps(s):
    s = list(s)
    return [cur-prev for (cur, prev) in zip(s, [s[0]]+s)]

def getAllTraceEvents(directory):
    starttime=int(re.search(".*-(\d*)", directory).group(1))
    return queryDb(f"""select time, unix_time, type, data from logger_traces where uuid = (select uuid from logger_traces lt where type='startaudio' and data like '%{starttime}') order by id""", ())

def getData(directory, use_directory_starttime=True):
    wav, sr, starttime_file = getCombinedWavs(directory, use_directory_starttime)
    audio_length_ms = int(len(wav) / sr * 1000)

    print (f'Recording starts at unix time {starttime_file} and lasts until {starttime_file + audio_length_ms}')

    events = getAllTraceEvents(directory)

    up_events = [x for x in events if x[2]=='keyup']
    down_events = [x for x in events if x[2]=='keydown']
    recorder_mark_events = [x for x in events if x[2]=='recorder-mark']

    first_mark_at = recorder_mark_events[0][1]
    first_mark_samples = recorder_mark_events[0][3]
    starttime_recorder_mark_event = int(first_mark_at-(first_mark_samples*1000/sr))
    starttime_recorder_start_event = [x for x in events if x[2]=='recorder-start'][0][1]
    starttime_startaudio_event = [x for x in events if x[2]=='startaudio'][0][1]
    
    print(f'Read {len(up_events)} keyup events, {len(down_events)} keydown events, and {len(recorder_mark_events)} recorder-mark events.')

    return {
        'wav': wav,
        'sr': sr,
        'starttime_file': starttime_file,
        'starttime_recorder_mark_event': starttime_recorder_mark_event,
        'starttime_recorder_start_event': starttime_recorder_start_event,
        'starttime_startaudio_event': starttime_startaudio_event,
        'up_events': up_events,
        'down_events': down_events,
        'recorder_mark_events': recorder_mark_events,
        'events': events
    }

def plotMatplotlib(data):
    wav=data['wav']
    sr=data['sr']
    starttime_file=data['starttime_file']
    up_events=data['up_events']
    down_events=data['down_events']
    
    plt.figure(figsize=(12, 4))
    plt.ylim((0,0.02))
    librosa.display.waveplot(wav, sr=sr, alpha=0.1)
    plt.scatter([(x[1]-starttime_file)/1000 for x in down_events], [0 for _ in down_events])
    plt.scatter([(x[1]-starttime_file)/1000 for x in up_events], [0.01 for _ in up_events])

def plotBokeh(data, adjust_ms=0, whichWav='wav'):
    wav=data[whichWav]
    sr=data['sr']
    starttime_file=data['starttime_file']
    up_events=data['up_events']
    down_events=data['down_events']
    
    s_sound = pd.Series(data=wav, index=range(len(wav)))
    df_sound = pd.DataFrame(s_sound)
    df_sound.reset_index(inplace=True)
    df_sound.columns = ['Index', 'wav']
    df_sound['unixtime'] = df_sound['Index'].apply(lambda x: starttime_file + x * 1000 / sr)
    df_sound['time'] = df_sound['Index'].apply(lambda x: datetime.datetime.utcfromtimestamp(int(x / 1000)))

    bokeh.io.output_notebook()
    p = bokeh.plotting.figure()
    p.xaxis.formatter = datetimeTickFormatter
    p.xaxis.major_label_orientation = math.pi/2
    p.line(x='unixtime', y='wav', source=df_sound)
    
    def plotEvents(events, colour="black", y_offset=0):
        df = pd.DataFrame()
        df['x'] = [e[1]+adjust_ms for e  in events]
        df['y'] = [0.015]*len(events)
        df['text'] = [e[3][3:] if e[3].startswith('Key') else e[3] for e in events]
        source = bokeh.models.ColumnDataSource(df)

        p.scatter(x='x', y='y', source=source, size=10, color=colour, alpha=0.5)
        p.add_layout(bokeh.models.LabelSet(x='x', y='y', text='text', source=source, x_offset=5, y_offset=y_offset, render_mode='canvas', angle=math.pi/2, text_color=colour))
    
    plotEvents(down_events, "green", 20)
    plotEvents(up_events, "red", -120)
       
    bokeh.plotting.show(p)
    
def diffPeekAmplAndKeyDown(directory):
    data = getData(directory)
    down_event = data['down_events'][0][1]
    max_index = np.argmax(data['wav'])
    max_time = data['starttime_file'] + int(max_index * 1000 / data['sr'])
    print (f'{directory} max ampl at {max_time}, keydown at {down_event}, difference {max_time-down_event}')

def getDataSubset(data, offset_ms, length_ms=2000):
    sr = data['sr']
    starttime  = data['starttime_file'] + offset_ms
    subset_wav = data['wav'][int(offset_ms*sr/1000):int((offset_ms+length_ms)*sr/1000)]
    if 'adjusted_wav' in data.keys():
        subset_adjusted_wav = data['adjusted_wav'][int(offset_ms*sr/1000):int((offset_ms+length_ms)*sr/1000)]
    else:
        subset_adjusted_wav = None
    subset_up_events   = [e for e in data['up_events']   if starttime < e[1] and e[1] < (starttime+length_ms)]
    subset_down_events = [e for e in data['down_events'] if starttime < e[1] and e[1] < (starttime+length_ms)]
    
    return {
        'wav': subset_wav,
        'adjusted_wav': subset_adjusted_wav,
        'sr': sr,
        'starttime_file': starttime,
        'up_events': subset_up_events,
        'down_events': subset_down_events
    }
    
def aap(x, adjust_ms=0):
    data=getData(f'../web/data/videos/{x}')
    plotBokeh(data, adjust_ms=adjust_ms)
    
    wav=data['wav']
    sr=data['sr']
    s=librosa.stft(y=wav, hop_length=int(sr/1000))
    data['wav'] = abs(s.sum(axis=0))
    data['sr'] = 1000
    plotBokeh(data)


def moving_ranges(data, context):
    data2 = list(data)
    data3 = [data2[0]]*(context) + data2
    return [data3[i:i+1+context] for i in range(len(data2))]
def moving_ranges_f(data, context, f):
    ranges = moving_ranges(data, context)
    return [f(x) for x in ranges]
def moving_average(x, context):
    return moving_ranges_f(x, context, statistics.mean)
def moving_var(x, context):
    return moving_ranges_f(x, context, lambda r: max(r)-min(r))

## Calculate where to add dummy values to compensate for occassionally dropped samples and keep the audio in sync with database

# The difference between the number of samples in the data, and the expected number based on sampling rate and during
# varies over time. Samples don't get delivered exactly on time, but usually the difference stays within a clear bound.
#
# Two things may occur that break this pattern:
#  1) occassionally some samples get queued up and delivered late, so we see a temporary increase in the difference which quickly gets corrected when the samples do arrive
#  2) on other occassions samples really get dropped and we see the difference increase permanently
#
# To keep keystrokes in sync, the first case doesn't matter since later we will only use the samples and don't care when they were received,
# but for the second case we want to insert some dummy samples to make sure the samples after that stay in sync.


def getDroppedSamplesTimeAndLengthForDirectory(directory, showGraph=False):
    return getDroppedSamplesTimeAndLength(getData(directory), showGraph)

def getDroppedSamplesTimeAndLength(data, showGraph=False):
    CONTEXT=4
       
    # Transform the list of mark events into a dataframe with a column, samplesDiff,
    # indicating the difference between the actual number of samples and the expected
    # number based on sampling rate and time since the start of the recording.
    timeAndBufferSize = [(x[1], x[3]) for x in data['recorder_mark_events']]
    starttime=timeAndBufferSize[0][0]
    startsize=timeAndBufferSize[0][1]
    times = [t for (t, s) in timeAndBufferSize]
    samplesDiff = [(s-startsize)-((t-starttime)*data['sr']/1000) for (t, s) in timeAndBufferSize]
    msSinceLastSample=getSteps(times)
    df = pd.DataFrame()
    df['time']=times
    df['samplesDiff']=samplesDiff
    df['msSinceLastSample']=msSinceLastSample    
    
    # First find the peaks and dips in the oscilating sample difference
    peaks = find_peaks(df['samplesDiff'])[0]
    dips = find_peaks([-x for x in df['samplesDiff']])[0]
    dips_and_peaks = np.concatenate((dips, peaks))
    dips_and_peaks.sort()
    df_dips_peaks = df.iloc[dips_and_peaks].copy()
    df_dips_peaks['dippeak'] = ['dip' if x in dips else 'peak' for x in df_dips_peaks.index]
    df_dips_peaks['samplesDiffMV'] = moving_var(df_dips_peaks['samplesDiff'], CONTEXT)

    # Filter out these outliers based on the max difference in the samplesDiff column over a trailing window of CONTEXT samples
    median_moving_var = statistics.median(moving_var(df_dips_peaks['samplesDiff'], CONTEXT))    
    not_outliers = [i for i in range(len(dips_and_peaks)-1)
                    if df_dips_peaks['samplesDiff'].iloc[i+1] - df_dips_peaks['samplesDiff'].iloc[i] < 1.5*median_moving_var]
    df_dips_peaks_clean = df_dips_peaks.iloc[not_outliers].copy()
    # recalculate this since it will have changed after removing the outliers
    df_dips_peaks_clean['samplesDiffMV'] = moving_var(df_dips_peaks_clean['samplesDiff'], CONTEXT)     

    # We now want to find the segments of stable data, and calculate the number samples lost by comparing the average value for peaks and dips in two blocks
    # The variation in sample difference is quite stable within a block, so we mark points where this variation exceed 110% of the median variance
    samples_lost_at = [i for i in range(1, len(df_dips_peaks_clean))
                if ((df_dips_peaks_clean['samplesDiffMV'].iloc[i-1] < median_moving_var*1.1)
                and (df_dips_peaks_clean['samplesDiffMV'].iloc[i]   >= median_moving_var*1.1))]
    df_samples_lost_at = df_dips_peaks_clean.iloc[samples_lost_at].copy()
    
    # Determine a list of intervals where the dips and peaks are stable based on the points where samples are lost,
    # adding the first and last samples as endpoints
    firstTime = df['time'].iloc[0]
    lastTime = df['time'].iloc[-1]
    segments = pd.DataFrame(zip([firstTime] + list(df_samples_lost_at['time']),
                                 list(df_samples_lost_at['time'])+[max(df_dips_peaks_clean['time'])]))
    segments.columns = ['from', 'to']

    # Calculate the Mean value for dips and peaks over each interval
    def getMeanDipsPeaksForInterval(row, dippeak):
        samples =  df_dips_peaks_clean[df_dips_peaks_clean.time.between(row['from'], row['to']) 
                                        & (df_dips_peaks_clean.samplesDiffMV < median_moving_var*1.1)
                                        & (df_dips_peaks_clean.dippeak==dippeak)]['samplesDiff']
        return statistics.mean(samples) if len(samples) > 0 else np.NaN
    segments['avg_peak'] = segments.apply(lambda row: getMeanDipsPeaksForInterval(row, 'peak'), axis=1)
    segments['avg_dip'] = segments.apply(lambda row: getMeanDipsPeaksForInterval(row, 'dip'), axis=1)

    # Calculate the required adjustment based on the difference in mean peaks and dips for all but the first interval
    segments.insert(0, 'adjustment', 0)
    segments.insert(0, 'cumulative_adjustment', 0)
    adjustmentSoFar = 0
    for index, _ in segments.iterrows():
        diffs = [segments['avg_peak'].iloc[0] - segments['avg_peak'][index] - adjustmentSoFar,
                segments['avg_dip'].iloc[0] - segments['avg_dip'][index] - adjustmentSoFar]
        diffs = [i for i in diffs if not math.isnan(i)]
        if len(diffs) > 0:
            adjustment = int(statistics.mean(diffs))
            if adjustment > 0:
                segments.at[index, 'adjustment'] = adjustment
                adjustmentSoFar += adjustment
            segments.at[index, 'cumulative_adjustment'] = adjustmentSoFar


    if showGraph:
        print(f'sr: {data["sr"]}')
        print(f'median moving var: {median_moving_var}')
        bokeh.io.output_notebook()
        p = bokeh.plotting.figure()
        p.xaxis.formatter = datetimeTickFormatter
        p.xaxis.major_label_orientation = math.pi/2

        # Plot these points, which will contain some outlier for case 1) above
        p.scatter(x='time', y='samplesDiff', source=df_dips_peaks, color="yellow")
        p.line(x='time', y='samplesDiffMV', source=df_dips_peaks, color="yellow")
        
        # Then plot the dips and peaks again for the cleaned data
        p.scatter(x='time', y='samplesDiff', source=df_dips_peaks_clean, color="blue")
        p.line(x='time', y='samplesDiffMV', source=df_dips_peaks_clean, color="blue")
        
        # Plot the points where data is lost in red
        p.scatter(x='time', y='samplesDiffMV', source=df_samples_lost_at, color="red")
    
        # Create new dataframe for visualisation only to show the dips and peaks after adjustment
        df_adjusted = df_dips_peaks_clean.copy()
        df_adjusted['adjusted'] = df_dips_peaks_clean['samplesDiff'].copy()
        segment_index = 0
        for index, _ in df_adjusted.iterrows():
            if segment_index + 1 < len(segments) and segments['from'].iloc[segment_index+1] == df_adjusted['time'].loc[index]:
                segment_index += 1
            df_adjusted.at[index, 'adjusted'] += segments['cumulative_adjustment'].iloc[segment_index]
        p.scatter(x='time', y='adjusted', source=df_adjusted, color="green")

        bokeh.plotting.show(p)
#         print(segments)

    return segments

def adjustForMissingSamples(data, showGraph=False):
    segments = getDroppedSamplesTimeAndLength(data, showGraph)
    def getSampleIndex(time):
        starttime = segments.iloc[0]['from']
        return int((time - starttime) * data['sr'] / 1000)

    new_wav = data['wav'][0:getSampleIndex(segments.iloc[0]['to'])]
    for i in range(1,len(segments)):
        new_wav = np.append(new_wav, [0] * int(segments.iloc[i]['adjustment']))
        new_wav = np.append(new_wav, data['wav'][getSampleIndex(segments.iloc[i]['from']):getSampleIndex(segments.iloc[i]['to'])])
    new_wav = np.append(new_wav, data['wav'][getSampleIndex(segments.iloc[-1]['to']):len(data['wav'])])
    data['adjusted_wav'] = new_wav
    return data

def getDataAndAdjustForMissingSamples(directory):
    data = getData(directory)
    adjustForMissingSamples(data)
    return data

def getKeystrokes(data, sync_adjustment=0, sample_duration=0, min_peak_value=0.01):
    adjustForMissingSamples(data, True)
    starttime = data['starttime_recorder_start_event']
    down_event_times = [(e[1]-starttime+sync_adjustment, e[3]) for e in data['down_events']]
    firstKeydown = down_event_times[0][0]
    sr = data['sr']
    srms = sr/1000
    wav = data['adjusted_wav']

    print(f'Audio starts at {starttime}')
    print(f'First keydown event after {firstKeydown} ms')
    plotBokeh(getDataSubset(data, max(firstKeydown-500, 0), 2000), sync_adjustment, whichWav='adjusted_wav')
    
    samples = [(key, wav[int(time*srms):int((time+sample_duration)*srms)]) for (time, key) in down_event_times]
    filtered_samples = [(key, wav) for (key, wav) in samples if max(wav) >= min_peak_value]
    print (f'Using {len(filtered_samples)} out of {len(samples)} keystrokes')
    
    return [k for (k, _) in filtered_samples], [w for (_, w) in filtered_samples]

def addKeystrokes(data, sync_adjustment, sample_duration, min_peak_value):
    labels, wavs = getKeystrokes(data, sync_adjustment, sample_duration, min_peak_value)
    data['keystroke_labels'] = labels
    data['keystroke_wavs'] = wavs

def normaliseFeatures(features):
    (nr_keystrokes, nr_mfcc_features, nr_frames) = features.shape
    print (f'Normalising {nr_keystrokes} keystrokes, with {nr_mfcc_features} features and {nr_frames} frames per sample')
    
    # features have shape (nr_keystrokes, nr_mfcc_features, nr_frames)
    # first move nr_mfcc_features axis to the front so we get (nr_mfcc_features, nr_keystrokes, nr_frames)
    X1 = np.moveaxis(features, 1, 0)
    # then reshape to make it a 2D (nr_mfcc_features, nr_keystrokes*nr_frames) array
    X2 = X1.reshape(nr_mfcc_features, nr_keystrokes*nr_frames)
    # Normalise
    X3 = scale(X2, axis=1, with_mean=True, with_std=True, copy=True)
    # back to (nr_mfcc_features, nr_keystrokes, nr_frames)
    X4 = X3.reshape(nr_mfcc_features, nr_keystrokes, nr_frames)
    # back to (nr_keystrokes, nr_mfcc_features, nr_frames)
    X5 = np.moveaxis(X4, 0, 1)
    
    return X5

def filterLabels(labels, keep=['Space', 'Backspace', 'Enter']):
    return [label if label in keep else 'Other' for label in labels]

def printListGroupPercentages(l):
    percentages = [(k, 100*len(list(g))/len(l)) for k, g in groupby(sorted(l), lambda x: x)]
    percentages = sorted(percentages, key=lambda x: x[1], reverse=True)
    return ', '.join([f"'{key}' {'%d' % percentage}%" for (key, percentage) in percentages])

def printClusteringResult(clustering, labels, method=None):
    if method != None:
        print (f'Clustering results using {method}')
    print (f'homogeneity_completeness_v_measure: {homogeneity_completeness_v_measure(labels, clustering)}')
    def cluster(x):
        return x[0]
    def label(x):
        return x[1]

    print (f'by cluster')
    data=list(zip(clustering, labels))
    data = sorted(data, key=cluster)
    for k, g in groupby(data, cluster):
        group = [label(x) for x in g]
        group.sort()
        print (f'{k}: {printListGroupPercentages(group)}')
    print (f'by key')
    data=list(zip(clustering, labels))
    data = sorted(data, key=label)
    for k, g in groupby(data, label):
        group = [cluster(x) for x in g]
        group.sort()
        print (f'{k}: {printListGroupPercentages(group)}')

In [None]:
logitech_step1_fast = getDataAndAdjustForMissingSamples('logitech-fast/niels-step_1-1617765899871')
logitech_step1_slow = getDataAndAdjustForMissingSamples('logitech-slow/niels-step_1-1617765658270')
lenovo_slow = getDataAndAdjustForMissingSamples('lenovo-slow-easy/niels-step_5-1617952576650')
lenovo_slow2 = getDataAndAdjustForMissingSamples('lenovo-slow-easy/niels-step_5-1618197371437')
lenovo_hard = getDataAndAdjustForMissingSamples('lenovo-hard/niels-step_1-1618199054005')

In [None]:
# addKeystrokes(lenovo_slow, 50, 100, 0.01)
# addKeystrokes(lenovo_slow2, 125, 100, 0.01)
# addKeystrokes(logitech_step1_fast, 50, 100, 0.01)
addKeystrokes(logitech_step1_slow, 60, 50, 0.01)
# addKeystrokes(lenovo_hard, 50, 100, 0.01)

In [None]:

plotBokeh(getDataSubset(logitech_step1_slow, 1000, 2000), 50, whichWav='adjusted_wav')
plotBokeh(getDataSubset(logitech_step1_slow, 200000, 10000), 50, whichWav='adjusted_wav')

In [None]:
len(logitech_step1_slow['wav'])/logitech_step1_slow['sr']

In [None]:
len(logitech_step1_slow['keystroke_wavs'][0])

In [163]:
def addFeatures(data):
    sr = data['sr']
    wavs = data['keystroke_wavs']
    srms = int(sr/1000)

    hop_length = int(2.5*srms)
    window_length = int(10*srms)
    
    print('getFeatures: mfcc')
    data['mfcc_features'] = np.array([librosa.feature.mfcc(wav, sr, n_mfcc=32, win_length=window_length, hop_length=hop_length) for wav in wavs])
    print('getFeatures: Normalise')
    data['normalised_mfcc_features'] = normaliseFeatures(data['mfcc_features'])
    print('getFeatures: Get max')
    data['mfcc_max'] = np.apply_along_axis(max, 2, data['normalised_mfcc_features'])
    print('getFeatures: Get mean')
    data['mfcc_mean'] = np.apply_along_axis(statistics.mean, 2, data['normalised_mfcc_features'])
    print('getFeatures: Get stddev')
    data['mfcc_stddev'] = np.apply_along_axis(statistics.stdev, 2, data['normalised_mfcc_features'])
    print('getFeatures: Done')

    
def getConcatenatedFeatures(data, features):
    missing_features = [x for x in features if x not in data.keys()]
    if len(missing_features) > 0:
        print (f'Missing features: {", ".join(missing_features)}. Calling addFeatures to add them.')
        addFeatures(data)
    print (f'Concatenating these features: {features}')
    c = np.concatenate([data[feature] for feature in features], axis=1)
    print (f'Resulting shape: {c.shape}')
    return c

addFeatures(logitech_step1_slow)
features = getConcatenatedFeatures(logitech_step1_slow, ['mfcc_max', 'mfcc_mean', 'mfcc_stddev'])

getFeatures: mfcc
getFeatures: Normalise
Normalising 424 keystrokes, with 32 features and 21 frames per sample
getFeatures: Get max
getFeatures: Get mean
getFeatures: Get stddev
getFeatures: Done
Concatenating these features: ['mfcc_max', 'mfcc_mean', 'mfcc_stddev']
Resulting shape: (424, 96)


In [None]:
logitech_step1_slow['mfcc_max'].shape

In [None]:
def findNumberOfClusters(features):
    x = list(range(2, 40))
    scores = []
    for n_clusters in tqdm(x):
        #     n_clusters = 500
        kmeans = sklearn.cluster.KMeans(n_clusters)
        kmeans.fit(features)
        kmeans_score = kmeans.score(features)
        clustering = kmeans.predict(features)
        kmeans_silhouette_score = silhouette_score(features, clustering)

        gm = sklearn.mixture.GaussianMixture(n_clusters)
        gm.fit(features)
        gm_score = gm.score(features)
        gm_bic_score = gm.bic(features)
        
        scores.append((kmeans_score, kmeans_silhouette_score, gm_score, gm_bic_score))
    return x, scores
number_of_clusters,scores = findNumberOfClusters(getConcatenatedFeatures(logitech_step1_slow, ['mfcc_max', 'mfcc_mean', 'mfcc_stddev']))

for (i, s) in enumerate(['kmeans_score', 'kmeans_silhouette_score', 'gm_score', 'gm_bic_score']):
    plt.figure()
    plt.title(s)
    plt.scatter(number_of_clusters, [x[i] for x in scores])
    plt.show()

In [172]:
def tmp(data):
    labels = data['keystroke_labels']
    print(len(labels))
    features = getConcatenatedFeatures(data, ['mfcc_mean', 'mfcc_stddev'])
    filtered_labels = filterLabels(labels, keep = ['Space', 'Backspace', 'KeyE'])
    n_clusters = len(set(filtered_labels)) + 1

    clustering=sklearn.cluster.SpectralClustering(n_clusters).fit(features).labels_
    printClusteringResult(clustering, filtered_labels, 'SpectralClustering')

    clustering=sklearn.cluster.k_means(features, n_clusters=n_clusters)[1]
    printClusteringResult(clustering, filtered_labels, 'k_means')

    gm = sklearn.mixture.GaussianMixture(n_clusters)
    gm.fit(features)
    clustering = gm.predict(features)
    printClusteringResult(clustering, filtered_labels, 'GaussianMixture')
tmp(logitech_step1_slow)


424
Concatenating these features: ['mfcc_mean', 'mfcc_stddev']
Resulting shape: (424, 64)
Clustering results using SpectralClustering
homogeneity_completeness_v_measure: (0.05687324609258826, 0.07722676040043476, 0.06550538906074985)
by cluster
0: 'Other' 70%, 'Space' 15%, 'KeyE' 11%, 'Backspace' 1%
1: 'Space' 90%, 'Other' 9%
2: 'Other' 100%
3: 'Other' 100%
4: 'Other' 69%, 'Space' 25%, 'KeyE' 4%
by key
Backspace: '0' 100%
KeyE: '0' 93%, '4' 6%
Other: '0' 82%, '4' 14%, '2' 2%, '3' 0%, '1' 0%
Space: '0' 67%, '4' 20%, '1' 12%
Clustering results using k_means
homogeneity_completeness_v_measure: (0.4098899032549833, 0.22195628580553553, 0.28797401041838827)
by cluster
0: 'Other' 82%, 'KeyE' 16%, 'Backspace' 1%
1: 'Other' 86%, 'KeyE' 11%, 'Backspace' 2%
2: 'Other' 86%, 'KeyE' 6%, 'Backspace' 4%, 'Space' 1%
3: 'Space' 93%, 'Other' 6%
4: 'Other' 77%, 'KeyE' 12%, 'Space' 10%
by key
Backspace: '2' 50%, '1' 33%, '0' 16%
KeyE: '0' 34%, '4' 32%, '1' 23%, '2' 9%
Other: '4' 29%, '0' 25%, '1' 25%, '2'

In [None]:
f = 'normalised_mfcc_features'

spaces = [w for (w, l) in zip(logitech_step1_slow[f], logitech_step1_slow['keystroke_labels']) if l=='Space']
keyEs = [w for (w, l) in zip(logitech_step1_slow[f], logitech_step1_slow['keystroke_labels']) if l=='KeyE']
print(f'number of spaces: {len(spaces)}, number of Es: {len(keyEs)}')


def dtw_distance(key1, key2):
    D, wp = librosa.sequence.dtw(key1, key2)
    best_cost = D[wp[-1, 0], wp[-1, 1]]
    return best_cost


dist_spaces = []
dist_keyEs = []
dist_spacesToKeyEs = []

for i in range(len(spaces)):
    for j in range(i+1, len(spaces)):
        dist_spaces.append(dtw_distance(spaces[i], spaces[j]))
for i in range(len(keyEs)):
    for j in range(i+1, len(keyEs)):
        dist_keyEs.append(dtw_distance(keyEs[i], keyEs[j]))
for i in range(len(spaces)):
    for j in range(len(keyEs)):
        dist_spacesToKeyEs.append(dtw_distance(spaces[i], keyEs[j]))
        
print(np.mean(dist_spaces))
print(np.mean(dist_keyEs))
print(np.mean(dist_spacesToKeyEs))


In [None]:

librosa.sequence.dtw([[0,1,0,0,0,0],
                      [0,1,0,0,0,0]], 
                     [[0,0,1,0,0,0],
                      [0,0,1,0,0,0]], return_steps=True)



In [None]:
wav = logitech_step1_slow['keystroke_wavs'][0]
print( librosa.feature.spectral_centroid(wav, 48000, win_length=1000, hop_length=2).shape )
print( librosa.feature.chroma_stft(wav, 48000, win_length=1000, hop_length=2).shape )
print( librosa.feature.zero_crossing_rate(wav, frame_length=1000, hop_length=2).shape )
print( list(librosa.feature.zero_crossing_rate(wav, frame_length=2, hop_length=2)[0]) )

In [None]:
# getDroppedSamplesTimeAndLength(logitech_step1_fast, True)
# getDroppedSamplesTimeAndLength(logitech_step1_slow, True)
# plotBokeh(getDataSubset(logitech_step1_slow, 1800, 3000), 50)
# plotBokeh(getDataSubset(logitech_step1_slow, 142000, 2000), 50, whichWav='wav')
# plotBokeh(getDataSubset(logitech_step1_slow, 142000, 2000), 50, whichWav='adjusted_wav')
# plotBokeh(getDataSubset(logitech_step1_slow, 182000, 2000), 50, whichWav='wav')
# plotBokeh(getDataSubset(logitech_step1_slow, 182000, 2000), 50, whichWav='adjusted_wav')
# plotBokeh(getDataSubset(logitech_step1_slow, 0, 2000), 50, whichWav='wav')
# getDroppedSamplesTimeAndLength(lenovo_slow, True)

def printKeyStroke(key, wav, sr):
    plt.figure(figsize=(12, 4))
    plt.ylim((-0.4,0.4))
    librosa.display.waveplot(wav, sr=sr, alpha=0.1)
    plt.title(key)
for (key, wav) in list(zip(logitech_step1_slow['keystroke_labels'], logitech_step1_slow['keystroke_wavs']))[160:170]:
    printKeyStroke(key, wav, 48000)