In [1]:
import os
import pandas as pd
import librosa
import librosa.display
import glob
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
import scipy.stats as stats
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import tensorflow.keras as keras
from tensorflow.keras.utils import to_categorical

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
path = 'sensor_logger'
start_time = 15
end_time = 55
activity_list = []
wav_list = []
stft_list = []
stft_mean_list = []
stft_std_list = []
zcr_list = []
folder_list = []
N_FFT = 4096
HOP_SIZE = 1024
for root, dirs, files in os.walk(path, topdown=False):
    for name in files:
        if name == 'Microphone.caf':
            mic_path = os.path.join(root, name)    
            if ('scroll' in root) | ('scrolling' in root):
                wav_data, sr = librosa.load(mic_path, sr=None, mono=True)
                stft = librosa.amplitude_to_db(np.abs(librosa.stft(wav_data[start_time*sr:end_time*sr], n_fft=N_FFT, hop_length=HOP_SIZE, win_length=N_FFT,
                                                                        window='hann', center=True, pad_mode='constant' )), ref=np.max)
                                                                        # N_FFT/2 number of frequency bins x number of frames t
                folder = mic_path.split('/')[1]
                folder_list.append(folder)
                activity_list.append('scroll')
                wav_list.append(wav_data[sr*start_time:sr*end_time])
                stft_list.append(stft)
                stft_mean_list.append(np.mean(stft, axis=0)) # axis=0 -> average across time bins, axis=1 -> average across frequency bins
                stft_std_list.append(np.std(stft, axis=0))
            elif ('swipe' in root) | ('swiping' in root):
                wav_data, sr = librosa.load(mic_path, sr=None, mono=True)
                stft = librosa.amplitude_to_db(np.abs(librosa.stft(wav_data[start_time*sr:end_time*sr], n_fft=N_FFT, hop_length=HOP_SIZE, win_length=N_FFT,
                                                                        window='hann', center=True, pad_mode='constant' )), ref=np.max)
                                                                        # N_FFT/2 number of frequency bins x number of frames t
                folder = mic_path.split('/')[1]
                folder_list.append(folder)
                activity_list.append('swipe')
                wav_list.append(wav_data[sr*start_time:sr*end_time])
                stft_list.append(stft)
                stft_mean_list.append(np.mean(stft, axis=0)) # axis=0 -> average across time bins, axis=1 -> average across frequency bins
                stft_std_list.append(np.std(stft, axis=0))
            
            #elif ('tap' in root) | ('tapping' in root):
            #    wav_data, sr = librosa.load(mic_path, sr=None, mono=True)
            #    stft = librosa.amplitude_to_db(np.abs(librosa.stft(wav_data[start_time*sr:end_time*sr], n_fft=N_FFT, hop_length=HOP_SIZE, win_length=N_FFT,
            #                                                            window='hann', center=True, pad_mode='constant' )), ref=np.max)
            #                                                            # N_FFT/2 number of frequency bins x number of frames t
            #    folder = mic_path.split('/')[1]
            #    folder_list.append(folder)
            #    activity_list.append('tap')
            #    wav_list.append(wav_data[sr*start_time:sr*end_time])
            #    stft_list.append(stft)
            #    stft_mean_list.append(np.mean(stft, axis=0)) # axis=0 -> average across time bins, axis=1 -> average across frequency bins
            #    stft_std_list.append(np.std(stft, axis=0))
# 18 swipes, 13 taps, 9 scrolls 

In [3]:
label = LabelEncoder()
y = label.fit_transform(activity_list)

df = pd.DataFrame()
df['classes'] = activity_list
df['label'] = y
df['wav_data'] = wav_list
df['stft'] = stft_list

# randomly undersample swipe classes by 9 samples to reduce class imbalance (class distribution is now 9/9 swipe/scroll respectively))
remove_n = 9
drop_indices = np.random.choice(df.index[df['classes'] == 'swipe'], remove_n, replace=False)
df = df.drop(drop_indices)

In [4]:
test_n=4 # 20% total available  clips

# randomly select 4 clips to be test data
test_indices = np.random.choice(df.index, test_n, replace=False) 
test_df = df.loc[test_indices]

train_indices = np.setdiff1d(list(range(0,df.shape[0])),test_indices)
train_df = df.drop(test_indices)

train_data = np.dstack(train_df['stft'])
train_data = np.reshape(train_data, (-1, train_data.shape[0], train_data.shape[1]))
X_train = train_data[..., np.newaxis]

test_data = np.dstack(test_df['stft'])
test_data = np.reshape(test_data, (-1, test_data.shape[0], test_data.shape[1]))
X_test = test_data[..., np.newaxis]

train_labels = train_df.iloc[:,1].values
test_labels = test_df.iloc[:,1].values

y_train = to_categorical(train_labels, 2)
y_test = to_categorical(test_labels, 2)

print(X_train.shape, y_train.shape, X_test.shape, y_test.shape) #(14, 2049, 1723, 1) (14, 3) (4, 2049, 1723, 1) (4, 3)

(14, 2049, 1723, 1) (14, 2) (4, 2049, 1723, 1) (4, 2)


In [5]:
# build CNN for classification
def create_model():
    model = keras.Sequential()

    # 1st conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation = 'relu', padding='same', input_shape = X_train[0].shape))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # 2nd conv layer
    model.add(keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # 3rd conv layer
    model.add(keras.layers.Conv2D(16, (3, 3), activation='relu', padding='same'))
    model.add(keras.layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
    model.add(keras.layers.BatchNormalization())

    # flatten output and feed it into dense layer
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dropout(0.3))

    model.add(keras.layers.Dense(32, activation='relu'))
    model.add(keras.layers.Dropout(0.3))

    # output layer
    model.add(keras.layers.Dense(2, activation='softmax'))
    return model

In [6]:
callback = keras.callbacks.EarlyStopping(monitor='val_precision', patience=5)
model = create_model()
model.compile(loss='categorical_crossentropy', optimizer='adam',metrics=[keras.metrics.Recall(), keras.metrics.Precision()])
history = model.fit(X_train, y_train, epochs = 10, validation_data=(X_test, y_test), callbacks=[callback], verbose=1)

Epoch 1/10


2023-03-25 17:40:21.150894: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
