# CS448 - Lab 10: Keyword Recognition

## Part 1: Making a Digit Recognizer

In this section we will design a simple spoken digit recognizer, based on Dynamic Time Warping (DTW). In order to make such a system we need to first collect some data, and then design a DTW routine that can compare new inputs with templates for each digit.

To start with make a set of data that will be used here. Make a dozen or so recordings of yourself speaking each of the ten digits (0 to 9). We will use one recording from each digit as the template, and the rest at testing data. In order to not spend too much time collecting the data, record all these utterances in a single (long) sound file. Use your voice activity detector to split that file into the individual spoken digits.

In order to design a digit recognizer we will take a spoken input of a digit and compare it to each digit’s template. By finding which template is the most similar we can classify the input as belonging to that template’s digit. In order to measure the distance between the two sequences we have to use DTW on an appropriate feature space.

Decide which feature to use to represent your speech signals. It can be any feature that we used in the past (e.g. some type of an STFT, MFCCs, etc). When comparing a template with a new input you need to perform the following steps:

1. Compute the distance matrix between all the features of each input. This will be a $M$ by $N$ matrix in which the $(i, j)$ element will represent the distance between the $i$-th frame of the template and the $j$-th frame of the input. We will use the cosine similarity which is defined as:

$$D(\mathbf{a},\mathbf{b}) = 1 - \frac{\sum a_i b_i}{\sqrt{\sum a_i^2}\sqrt{\sum b_i^2}}$$

2. Once you obtain the distance matrix, you need to compute the cost matrix that encodes the cost of passing through a node given a previously optimal path. We will use the local constraint that to reach node $(i, j)$ you can either come from nodes $(i–1, j–1)$, $(i, j–1)$ or $(i–1, j)$.

3. Starting from the first element of the matrix (1,1), and for each element of the cost matrix you will need to perform the following steps. For node $(i, j)$ you need to examine the nodes from which you can reach it – these will be nodes $(i–1, j–1)$, $(i, j–1)$ or $(i–1, j)$ – and see which one has the lowest cost. Therefore, reaching that node from the optimal path will have the cost of the optimal preceding node plus the distance that corresponds to being at node $(i, j)$. Iterate until you calculate the cost of passing through every node. As you do that, for each node keep track of which of the three preceding nodes was the optimal one.

4. Now you can backtrack and find the optimal path. Start from the final point of the cost matrix and find the node from which you arrived there (it will be the same one that had the lowest cost above). Once you get to that node, repeat this process until you reach the beginning indexes of the two sequences. The path that you took in this process will be the optimal path that aligns the two sequences.

5. The distance between the two sequences will be the cost of being at the final node. Use this to perform the digit classification.

In [1]:
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy import signal
import numpy as np
import sounddevice as sd
from collections import defaultdict
from copy import deepcopy

In [2]:
def sound(x, rate=8000, label=''):
    from IPython.display import display, Audio, HTML
    if label == '':
        display( Audio( x, rate=rate))
    else:
        display( HTML( 
        '<style> table, th, td {border: 0px; }</style> <table><tr><td>' + label + 
        '</td><td>' + Audio( x, rate=rate)._repr_html_()[3:] + '</td></tr></table>'
        ))

def stft( input_sound, dft_size, hop_size, zero_pad, window):
    zero_padding = np.zeros(dft_size)
    x = np.append(zero_padding, input_sound)
    x = np.append(x, zero_padding)

    frames = []
    for i in range(0, len(x)-dft_size, hop_size):
        frames.append(x[i:i+dft_size] * window)

    f = []
    for frame in frames:
        freq_vec = np.reshape(np.fft.rfft(frame, dft_size+zero_pad), (-1, 1))
        f.append(freq_vec)

    return np.hstack(f)

def istft( stft_output, dft_size, hop_size, zero_pad, window):
    t = []
    for f in stft_output.T:
        t.append(np.fft.irfft(f, dft_size+zero_pad))
    t = np.array(t)

    x = np.zeros(dft_size+hop_size*(len(t)-1))
    for i in range(len(t)):
        x[i*hop_size:i*hop_size+dft_size] += t[i][:dft_size] * window
    
    return x[dft_size:]

def VAD(x, fs, threshold, min_num_clip_frames, max_num_under_threshold_frames):
    x_sq = np.square(x)

    b, a = signal.butter(2, 1, 'lowpass', fs=fs)
    e = signal.lfilter(b, a, x_sq)
    e = e.clip(min=0)
    
    clips = []
    clipping = False
    num_under_threshold_frames = 0

    for i in range(x.shape[0]):
        if clipping and e[i] < threshold:
            num_under_threshold_frames += 1
            if num_under_threshold_frames < max_num_under_threshold_frames and i - start_index > min_num_clip_frames:
                clips.append(x[start_index:i])
                clipping = False
            elif num_under_threshold_frames > max_num_under_threshold_frames:
                clipping = False
        elif not clipping and e[i] > threshold:
            start_index = i
            num_under_threshold_frames = 0
            clipping = True
            
    return clips

def plot_spectrogram(x, X, fs, title='', add_value=np.e):
    time_axis = np.linspace(0, len(x)/fs, X.shape[1])
    freq_axis = np.linspace(0, fs/2, X.shape[0])
    freq_abs = np.absolute(X)
    plt.pcolormesh(np.arange(freq_abs.shape[1]), freq_axis, np.log(freq_abs + add_value), cmap='Blues')
    plt.title(title)
    plt.xlabel('Index')
    plt.ylabel('Freq (Hz)')
    plt.show()

In [3]:
class DigitRecognizer():
    def __init__(self, train_set, num_class):
        self.train_set = train_set
        self.num_class = num_class
        self.distance_matrix = {}
        self.cost_matrix = {}
        self.backtrack = {}
        
    def cosine_distance(self, a, b):
        return 1 - np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-10)
        
    def calculate_distance_matrix(self, X_input):
        for digit in range(self.num_class):
            template = self.train_set[digit]
            D = np.zeros((template.shape[1], X_input.shape[1]))
            for i in range(D.shape[0]):
                D[i, :] = [self.cosine_distance(template[:, i], X_input[:, j]) for j in range(X_input.shape[1])]
            self.distance_matrix[digit] = D
                    
    def calculate_cost_matrix(self):
        for digit in range(self.num_class):
            D = self.distance_matrix[digit]
            C = np.zeros_like(D)
            B = np.zeros_like(D, dtype=int)
            
            C[0, :] = np.cumsum(D[0, :])
            B[0, 1:] = 1
            C[:, 0] = np.cumsum(D[:, 0])
            B[1:, 0] = 3

            for i in range(1, C.shape[0]):
                for j in range(1, C.shape[1]):
                    choices = [C[i, j-1], C[i-1, j-1], C[i-1, j]]
                    B[i, j] = np.argmin(choices) + 1
                    C[i, j] = choices[B[i, j]-1] + D[i, j]

            self.cost_matrix[digit] = C
            self.backtrack[digit] = B
            
    def backtracking(self, class_index):
        i, j = self.backtrack[class_index].shape
        i -= 1
        j -= 1
        path = []
        
        while i > 0 or j > 0:
            path.append((i, j))
            direction = self.backtrack[class_index][i, j]
            if direction == 1:
                j -= 1
            elif direction == 2:
                i -= 1
                j -= 1
            elif direction == 3:
                i -= 1
        path.append((0, 0))
        path.reverse()
        return path
    
    def predict(self, X_input):
        self.calculate_distance_matrix(X_input)
        self.calculate_cost_matrix()

        scores = [self.cost_matrix[digit][-1, -1] for digit in range(self.num_class)]
        return np.argmin(scores)


In [4]:
# globals
dft_size = 1024
hop_size = 1024
zero_pad = 0
window = signal.windows.hann(dft_size, sym=False)

In [14]:
fs_digit, x_digit = wavfile.read('./digits.wav')
digit_clips = VAD(x_digit, fs_digit, threshold=75, min_num_clip_frames=10000, max_num_under_threshold_frames=500)

records = defaultdict(list)

for index, clip in enumerate(digit_clips):
    digit = index // 5
    records[digit].append(clip)

X = defaultdict(list)
for digit, clips in records.items():
    X[digit] = [np.abs(stft(clip, dft_size, hop_size, zero_pad, window)) for clip in clips]
        
X_train = {digit: clips[2] for digit, clips in X.items()}

X_test = deepcopy(X)
for clips in X_test.values():
    del clips[2]

DR = DigitRecognizer(X_train, 10)

In [15]:
cnt = 0
total = 0

res = {d: [] for d in X}

for digit, clips in X_test.items():
    for clip in clips:
        total += 1
        prediction = DR.predict(clip)
        cnt += (digit == prediction)
        res[digit].append(digit == prediction)

accuracy = cnt / total
print(f'Accuracy: {accuracy:.4f}')
for d in X:
    print(d, res[d])

Accuracy: 0.8750
0 [True, True, True, True]
1 [True, True, True, True]
2 [False, True, True, False]
3 [False, False, True, True]
4 [True, True, True, True]
5 [True, True, True, True]
6 [True, True, True, True]
7 [True, True, True, True]
8 [True, True, True, True]
9 [False, True, True, True]


## Part 2. Making a voice-driven dialer

Suppose you just started working for a phone company and the first thing they ask you is to make a hands-free interface for their phones so that people can dial in their friends by voice. During setup, the users speak the name of a contact and then associate it with a number to call. Make a system for which you use the full name of 4-5 of your friends, so that when you speak their name the system recognizes it (and thus could subsequently call their number)

In [16]:
class NameRecognizer(DigitRecognizer):
    def __init__(self, train_set, num_class, names, contacts):
        super().__init__(train_set, num_class)
        self.names = names
        self.contacts = contacts

    def predict(self, X_input):
        index = super().predict(X_input)
        name = self.names[index]
        contact_info = self.contacts[name]
        return f'Calling to {name}: {contact_info}'

In [124]:
fs_name, x_name = wavfile.read('./names.wav')
sound(x_name, fs_name, 'names')

name1 = x_name[500:100000] # Sophia Bennett
name2 = x_name[150000:250000] # Lucas Graham
name3 = x_name[300000:400000] # Abigail Foster
name4 = x_name[450000:550000] # Ethan Wallace
name5 = x_name[600000:700000] # Lily Thomson

name_clips = [name1, name2, name3, name4, name5]

0,1
names,Your browser does not support the audio element.


In [125]:
names = ['Sophia Bennett', 'Lucas Graham', 'Abigail Foster', 'Ethan Wallace', 'Lily Thomson']
contacts = {
    'Sophia Bennett': '111-111-1111',
    'Lucas Graham': '222-222-2222',
    'Abigail Foster': '333-333-3333',
    'Ethan Wallace': '444-444-4444',
    'Lily Thomson': '555-555-5555'
}

X_train = {}
for i, clip in enumerate(name_clips):
    Xi = stft(clip, dft_size, hop_size, zero_pad, window)
    X_train[i] = np.abs(Xi)

NR = NameRecognizer(X_train, 5, names, contacts)

In [129]:
test1 = name3.copy()
sound(test1, fs_name, 'Abigail Foster')
NR.predict(np.abs(stft(test1, dft_size, hop_size, zero_pad, window)))

0,1
Abigail Foster,Your browser does not support the audio element.


'Calling to Abigail Foster: 333-333-3333'

Test1 passed

In [130]:
test2 = name5.copy()
sound(test2, fs_name, 'Lily Thomson')
NR.predict(np.abs(stft(test2, dft_size, hop_size, zero_pad, window)))

0,1
Lily Thomson,Your browser does not support the audio element.


'Calling to Lily Thomson: 555-555-5555'

Test2 passed