# Assignment 1: Dynamic Time Warping

---

## Task 4) Isolated Word Recognition

Due to the relatively large sample number (e.g. 8kHz), performing [DTW](https://en.wikipedia.org/wiki/Dynamic_time_warping) on the raw audio signal is not advised (feel free to try!).
A better solution is to compute a set of features; here we will emtract [mel-frequency cepstral coefficients](https://en.wikipedia.org/wiki/Mel-frequency_cepstrum) over windows of 25ms length, shifted by 10ms.
Recommended implementation is [librosa](https://librosa.org/doc/main/generated/librosa.feature.mfcc.html).

### Data

Download Zohar Jackson's [free spoken digit dataset](https://github.com/Jakobovski/free-spoken-digit-dataset).
There's no need to clone, feel free to use a revision, like [v1.0.10](https://github.com/Jakobovski/free-spoken-digit-dataset/archive/refs/tags/v1.0.10.tar.gz).
File naming convention is trivial (`{digitLabel}_{speakerName}_{indem}.wav`); let's restrict to two speakers, eg. `jackson` and `george`.

### Dynamic Time Warping

[DTW](https://en.wikipedia.org/wiki/Dynamic_time_warping) is closely related to [Levenshtein distance](https://en.wikipedia.org/wiki/Levenshtein_distance) and [Needleman-Wunsch algorithm](https://en.wikipedia.org/wiki/Needleman–Wunsch_algorithm).
The main rationale behind DTW is that the two sequences are can be aligned but their speed and exact realization may very.
In consequence, cost is not dependent on an edit operation but on a difference in observations.

---

### Preparation

In [None]:
import numpy as np
import librosa as lr
import os
from typing import List, Tuple, TypedDict

In [None]:
### TODO: Read in files, compute MFCC, and organize
### Notice: You can restrict the number to a few files for each speaker-digit

class Audio(TypedDict):
    digitLabel: int
    speakerName: str
    index: int
    mfccs: List[Tuple[float]]

audios: List[Audio] = []

speakers = ["george", "jackson", "yweweler"]

### YOUR CODE HERE
FOLDER = 'data/recordings'

for file in os.listdir(FOLDER):
    if not file.endswith('.wav'):
        continue

    parts = file.split('_')
    digit, name, index = int(parts[0]), parts[1], int(parts[2].split('.')[0])

    if name not in speakers or index > 9:
        continue

    y, sr = lr.load(FOLDER+os.sep+file)

    audios.append(Audio(
        digitLabel = digit,
        speakerName = name,
        index = index,
        mfccs = lr.feature.mfcc(y=y, sr=sr, n_mfcc=10, hop_length=int(0.010 * sr), win_length=int(0.025 * sr))
    ))
### END YOUR CODE

### Implement Dynamic Time Warping

In [None]:
def dist(x: Tuple[float], y: Tuple[float]) -> float:
    """
    Compute the distance between two samples.

    Arguments:
    x: MFCCs of first sample.
    y: MFCCs of second sample.

    Returns the distance as float
    """
    ### YOUR CODE HERE
    max_len = max(len(x), len(y))
    x, y = np.pad(x, (0, max_len - len(x)), 'constant'), np.pad(y, (0, max_len - len(y)), 'constant')
    return np.linalg.norm(x - y)
    ### END YOUR CODE


def dtw(obs1: list, obs2: list, dist_fn) -> float:
    """
    Compute the dynamic time warping score between two observations.
    
    Arguments:
    obs1: List of first observations.
    obs2: List of second observations.
    dist_fn: Similarity function to use.

    Returns the score as float.
    """
    ### YOUR CODE HERE
    m, n = len(obs1), len(obs2)
    D = np.full((m + 1, n + 1), np.inf, dtype = float)
    D[0, 0] = 0

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            cost = dist_fn(obs1[i - 1], obs2[j - 1])
            D[i, j] = cost + min(D[i - 1, j],
                                D[i, j - 1],
                                D[i - 1, j - 1])

    return D[m, n]
    ### END YOUR CODE

### Experiment 1: DTW scores

For each speaker and digit, select one recording as an observation (obs1) and the others as tests (obs2). How do scores change across speakers and across digits?

In [None]:
### YOUR CODE HERE
for speaker in speakers:
    for digit in range(10):
        filtered = [a for a in audios if a['digitLabel'] == digit and a['speakerName'] == speaker]
        obs1 = filtered[0]

        for i in range(1, len(filtered)):
            print("DTW score {} for {} saying '{}' compared to the first sample.".format(
                dtw(obs1['mfccs'], filtered[i]['mfccs'], dist_fn = dist), speaker.capitalize(), digit
            ))
### END YOUR CODE

### Implement a DTW-based Isolated Word Recognizer

In [None]:
### TODO: Classify recording into digit label based on reference audio recordings

def recognize(obs: List[Tuple[float]], refs: List[Audio]) -> str:
    """
    Classify the input based on a reference list (train recordings).
    
    Arguments:
    obs: List of input observations (MFCCs).
    refs: List of audio items (train recordings).
    
    Returns classname where distance of observations is minumum.
    """
    ### YOUR CODE HERE
    min_distance = np.inf
    best_label = None
    
    obs = np.array(obs)
    
    for ref in refs:      
        distance = dtw(obs, np.array(ref['mfccs']), dist_fn = dist)
        
        if distance < min_distance:
            min_distance = distance
            best_label = ref['digitLabel']
    
    return str(best_label)
    ### END YOUR CODE

### Experiment 2: Speaker-Dependent IWR

Select training recordings from one speaker $S_i$ and disjoint test recordings from the same speaker $S_i$. Compute the Precision, Recall, and F1 metrics, and plot the confusion matrix.

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

In [None]:
### YOUR CODE HERE
data = [a for a in audios if a['speakerName'] == 'george']

np.random.shuffle(data)
train_set, test_set = train_test_split(data)

predictions = [recognize(np.array(test['mfccs']), train_set) for test in test_set]
true_labels = [str(test['digitLabel']) for test in test_set]

precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='macro')

print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1}")

cm = confusion_matrix(true_labels, predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.show()
### END YOUR CODE

### Experiment 3: Speaker-Independent IWR

Select training recordings from one speaker $S_i$ and test recordings from another speaker $S_j$. Compute the Precision, Recall, and F1 metrics, and plot the confusion matrix.

In [None]:
### YOUR CODE HERE
train_set = [a for a in audios if a['speakerName'] == 'george']
test_set = [a for a in audios if a['speakerName'] == 'jackson']

predictions = [recognize(np.array(test['mfccs']), train_set) for test in test_set]
true_labels = [str(test['digitLabel']) for test in test_set]

precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='macro')

print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1}")

cm = confusion_matrix(true_labels, predictions) 
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.show()
### END YOUR CODE

### Food for Thought

- What are inherent issues of this approach?
- How does this algorithm scale with a larger vocabulary, how can it be improved?
- How can you extend this idea to continuous speech, ie. ?