# To Begin

This is a practical part of your ASR-TTS course. In total you will have 5 labs. Three of which will be focused on Automatic Speech Recognition and two on Text-to-Speech models. Each lab will last two hours and consist of two parts:
* Reading Part
* Coding Part 

In each part you might find question or tasks/activities to complete. The grading of the labs is explained below.

LAB 2/5

# What will you learn in LAB 1?

* How feature extraction works in practice, including time--frequency representations and their parameters.
* How different decoder strategies (e.g. greedy decoding. beam search...) affect ASR outputs.
* How ASR errors can be analyzed beyond a single score using detailed WER breakdowns.
* How tensor shapes propagate through an ASR pipeline (waveforms, features, batches).
* How to identify and fix common tensor shape and data type errors in ASR systems.



# Automatic Speech Recognition: Feature Extraction 

## Question:

Read the following, remember and explain how feature extraction works: 

https://thesai.org/Downloads/Volume12No8/Paper_21-Automatic_Speech_Recognition_Features_Extraction_Techniques.pdf

In [None]:
## Answer

Raw audio is a 1D waveform: amplitude over time. But ASR models usually don’t work directly on raw samples; they rely on features that make speech patterns easier to learn. Today we’ll build the most common ones: spectrograms, log-mel spectrograms, and MFCCs

## Waveforms

An audio waveform is the most basic representation of sound in a computer.
When we record speech, the microphone converts air pressure changes into an electrical signal, and the computer stores this signal as a sequence of numbers over time.

Each number represents: how much the air pressure deviates from silence at a specific moment in time

So a waveform is simply: amplitude (loudness) as a function of time

Unlike humans, a computer does not listen or understand meaning.
It only sees numbers. A waveform looks like this to a model:

[0.002, 0.01, -0.03, -0.01, 0.0, 0.02, ...]

## Sampling Rate

Speech is stored by sampling the signal many times per second. Example: 16 kHz = 16,000 samples per second every second of speech → 16,000 numbers. Higher sampling rate means more detail, larger files and more computation. Most ASR systems use 16 kHz because it captures speech frequencies well while remaining efficient. While waveforms contain all information, they are not convenient for learning speech patterns directly: 

* speech information is spread over time
* frequency content (pitch, formants) is hidden
* small shifts in time change the signal a lot

That’s why ASR systems transform waveforms into time–frequency features, such as spectrograms, Mel spectrograms, MFCCs.

➡️ This is the goal of feature extraction.

In [None]:
import torch
import torchaudio
import torchaudio.transforms as T
import matplotlib.pyplot as plt
import scipy.io.wavfile as wavfile
import numpy as np

In [None]:
# Using the code from Lab 1 load one audio file and plot a waveform

## Short-Time Fourier Transform (STFT) Spectrogram — Linear Frequency Representation

Speech changes very quickly over time, so instead of analyzing the whole sound at once, we analyze it little by little. The audio signal is first cut into short pieces called frames, each lasting only a few milliseconds (typically about 25 ms), which is short enough that speech is almost stable inside each frame. These frames overlap slightly so that no information is lost at the borders and transitions remain smooth. Before analyzing each frame, we apply a window function, most commonly the Hann window, which gently fades the signal at the beginning and end of the frame to avoid artificial frequencies caused by sharp cuts. Once a frame is short, overlapping, and smoothly windowed, the computer analyzes which frequencies are present and how strong they are. By stacking the frequency information from all frames over time, we obtain a spectrogram, where:
* the horizontal axis represents time,
* the vertical axis represents frequency (evenly spaced in Hz, called linear frequency),
* and the color represents energy.

This process is called the Short-Time Fourier Transform (STFT) because we transform short pieces of sound into frequencies, revealing how speech frequencies evolve over time.

In [None]:
#STFT Transformation

win_length = 400        
hop_length = 160       
n_fft = 512             

# Create STFT transform
stft = T.Spectrogram(
    n_fft=n_fft,
    win_length=win_length,
    hop_length=hop_length,
    window_fn=torch.hann_window,
    power=2.0            
)


In [None]:
WAV_PATH = "/home/aine/Teaching/ASR-TTS-Course/asr-dataset/speech/61-70968-0000.wav"  # TODO

waveform, sr = torchaudio.load(WAV_PATH)   # waveform: [channels, time]
print("waveform shape:", waveform.shape)
print("sample rate:", sr)


def plot_spectrogram(spec, title="Spectrogram", ylabel="Frequency bins"):
    # spec expected shape: [freq, time]
    spec_db = 10 * torch.log10(spec + 1e-10)
    plt.figure(figsize=(10, 4))
    plt.imshow(spec_db.detach().cpu(), origin="lower", aspect="auto")
    plt.title(title)
    plt.xlabel("Frames")
    plt.ylabel(ylabel)
    plt.colorbar(label="dB")
    plt.tight_layout()
    plt.show()

n_fft = 1024
hop_length = 256
win_length = n_fft

spec_transform = T.Spectrogram(
    n_fft=n_fft,
    hop_length=hop_length,
    win_length=win_length,
    window_fn=torch.hann_window,
    power=2.0,  # power spectrogram
)

spec = spec_transform(waveform)  # [channel, freq, time]
print("spectrogram shape:", spec.shape)

plot_spectrogram(spec[0], title=f"Linear Spectrogram (n_fft={n_fft}, hop={hop_length})")


# Automatic Speech Recognition: Decoders

**Name**:

The first lab introduced the building blocks of an ASR system, including feature extraction and classification with an acoustic model (wav2vec2), which produced an *emission* matrix (probability for each character at each time step). From this emission matrix, we could compute the most likely character at each time step using a naïve *greedy* decoder. The drawback of such an approach is the lack of context, which can produce sequences of characters that do not correspond to actual words, and/or sequences of words that are incorrect / do not correspond to any language rules.

In this lab, we introduce and compare multiple decoding techniques. One of the more advanced techniques is based on [connectionist temporal classification](https://towardsdatascience.com/intuitively-understanding-connectionist-temporal-classification-3797e43a86c) (CTC). The general idea of such a decoder is to consider some context (sequences of characters, possible words, and possible sequences of words), in oder to yield more likely / realistic outputs than those given by the greedy decoder.

<center><a href="https://gab41.lab41.org/speech-recognition-you-down-with-ctc-8d3b558943f0">
    <img src="https://miro.medium.com/v2/resize:fit:640/format:webp/1*XbIp4pn38rxL_DwzSeYVxQ.png" width="400"></a></center>

To do so, the CTC decoder relies on three main components:
    
- A **beam search**, which is an algorithm to efficently find the *best path* from the emission matrix, that is, the sequence of characters with highest probability (rather than the sequence of individually most likely characters).
- A **lexicon**, which is a mapping between token sequences (list of characters) and words. It is used to restrict the search space of the decoder to words that only belong to this dictionary (e.g., the word "azfpojazflj" does not exist in the English vocabulary).
- A **language model**, which specifies sequences of words that are more likely to occur than others. A common choice of language model is an $n$-gram, which is a statistical model for the probability of occurrence of a given word based on the previous $n$ ones (for instance, the sequence "the sky is" is more likely to be followed with "blue" rather than "trumpet").

The CTC decoder combines these ingredients to compute the score of several word sequences (or *hypothesis*), in order to find the best possible transcript. In this lab, we study the influence of the lexicon, language model, and the beam search size onto ASR performance from a practical perspective, without going into the technical details of the [beam search algorithm](https://www.width.ai/post/what-is-beam-search) or the [CTC loss](https://distill.pub/2017/ctc/), which can also be used for training the network (as we will see in lab 3).

**Note**: This lab is based on this [tutorial](https://pytorch.org/audio/main/tutorials/asr_inference_with_ctc_decoder_tutorial.html), which you can check for more details on CTC decoder parameters in torchaudio.

In [None]:
import torch
import torchaudio
from torchaudio.models.decoder import ctc_decoder, download_pretrained_files
import IPython
import os
import fnmatch
import matplotlib.pyplot as plt
import time
torch.random.manual_seed(0)
MAX_FILES = 2 # lower this number for processing a subset of the dataset

In [None]:
# Main dataset path - If needed, you can change it HERE but NOWHERE ELSE in the notebook!
data_dir = "../dataset/"

In [None]:
# Speech and transcripts sub-directories paths
data_speech_dir = os.path.join(data_dir, 'speech')
data_transc_dir = os.path.join(data_dir, 'transcription')

## Preparation

As in the previous lab, we first load an example speech signal, and we display it. We also provide the function to get the true transcript and compute the WER. Finally, we load the wav2vec2 acoustic model.

In [None]:
# Example file
audio_file = '61-70968-0001.wav'
audio_file_path = os.path.join(data_speech_dir, audio_file)
print(f"Audio file path: {audio_file_path}")

waveform, sr = torchaudio.load(audio_file_path, channels_first=True)
IPython.display.Audio(data=waveform, rate=sr)

In [None]:
# We provide the function for loading the true transcript and computing the WER
def get_true_transcript(transc_file_path):
    with open(transc_file_path, "r") as f:
        true_transcript = f.read()
    true_transcript = true_transcript.lower().split()
    return true_transcript

def get_wer(true_transcript, est_transcript):
    wer = torchaudio.functional.edit_distance(true_transcript, est_transcript) / len(true_transcript)
    return wer
    

In [None]:
# Load and display the true transcription
transc_file_path = os.path.join(data_transc_dir, audio_file.replace('wav', 'txt'))
true_transcript = get_true_transcript(transc_file_path)
print(true_transcript)

In [None]:
# Load the acoustic model
model_name = 'WAV2VEC2_ASR_BASE_100H'
bundle = getattr(torchaudio.pipelines, model_name)
acoustic_model = bundle.get_model()
labels = bundle.get_labels()

# Apply the model to the waveform to get the emission tensor
with torch.inference_mode():
    emission, _ = acoustic_model(waveform)
    num_time_steps = emission.shape[1]

## CTC Decoder

The CTC decoder can be constructed directly by using the `ctc_decoder` function in torchaudio. In addition to the parameters related to the beam search (we will study them later on), it takes as inputs:
- the list of tokens, in order to map emissions to characters in the classifier.
- the path to the lexicon, expected as a .txt file containing, on each line, a word followed by its space-split tokens (and special end-of-sequence token `|`):

```
# lexicon.txt
a     a |
able  a b l e |
about a b o u t |
...
```
- the path to the language model, expected as a .bin file.

All these are assembled in pretrained files that can be downloaded using the `download_pretrained_files` function (this might take some time as the language model can be large), and then used to contruct the decoder.

In [None]:
# Download the files corresponding to (pretrained) language model, which comes with lexicon and tokens
files = download_pretrained_files("librispeech")
path_lm_tokens = files.tokens
path_lm_lexicon = files.lexicon
path_lm = files.lm

print(path_lm_tokens)
print(path_lm_lexicon)
print(path_lm)

In [None]:
# Vizualize the first 10 tokens (includes the blank and end-of-word token)
with open(path_lm_tokens, 'r') as f:
    tok = f.read().splitlines()
print("\n".join(tok[:10]))

In [None]:
# Vizualize the lexicon content (first 10 entries)
with open(path_lm_lexicon, 'r') as f:
    lex = f.read().splitlines()
print("\n".join(lex[:10]))

In [None]:
# To obtain the line(s) corresponding to a word in the lexicon, you can use the following:
w = 'hello'
lex_w = [line for line in open(path_lm_lexicon) if line.startswith(w + '\t')] # it's a list since there could be different pronunciation for the same word
print(lex_w)

In [None]:
# Instanciate the CTC decoder
decoder = ctc_decoder(
    lexicon=path_lm_lexicon,
    tokens=path_lm_tokens,
    lm=path_lm,
)

# Apply the decoder to the `emission` tensor, and get the first element (batch_size=1) and best hypothesis
ctc_decoder_result = decoder(emission)[0][0]

The decoder output `ctc_decoder_result` contains many fields, including the predicted token IDs, and a `.words` field that contains the transcript as a list of strings.

In [None]:
# Get the token IDs using the .tokens field
ctc_decoder_indices = ctc_decoder_result.tokens
print(f"Token indices: {ctc_decoder_indices}")

# Display the transript using the .words field
print(f"Words: {ctc_decoder_result.words}")

In [None]:
# Get the token IDs using the .tokens field
ctc_decoder_indices = ctc_decoder_result.tokens
print(f"Token indices: {ctc_decoder_indices}")

# Get the words vis the .words field
print(f"Words: {ctc_decoder_result.words}")

### Greedy decoder

The greedy decoder we have seen in the first lab is a particular case of the CTC decoder when no langage model / lexicon is provided. It can be constructed by simply passing `None` as corresponding input arguments in the `ctc_decoder` function.

In [None]:
# TO DO: Construct a greedy decoder (no LM / lexicon), and apply it to the emission matrix
decoder_nolm = ctc_decoder(
    lexicon=None,
    tokens=path_lm_tokens,
    lm=None
)

ctc_nolm_result = decoder_nolm(emission)[0][0]

In [None]:
# Since no language model is provided, the .words field returns an empty list
print(ctc_nolm_result.words)

# Then we have to manually convert token IDs to tokens using the decoder.idxs_to_tokens method
# (+ a bit of postprocessing)
ctc_nolm_tokens = decoder.idxs_to_tokens(ctc_nolm_result.tokens)
ctc_nolm_transcript = "".join(ctc_nolm_tokens).replace("|", " ").strip().split() 
ctc_nolm_transcript = [w.lower() for w in ctc_nolm_transcript]

print(f"No LM Transcript: {ctc_nolm_transcript}")

## Influence of the language model

The language model is also expected to have a strong impact onto performance, since it guides the decoder towards more likely word sequences.

<span style="color:red"> **Exercise 1**</span>. Compare the CTC decoder using `librispeech-4-gram` files (lexicon, token, and language model downloaded above in this script) and the greedy decoder. To that end, for each decoder, perform ASR on the whole dataset (feel free to reuse/adapt code from the previous lab) and compute the mean WER. Can you interprete the results?

In [None]:
def find_files(directory, pattern='*.wav'):
    """Recursively finds all files matching the pattern."""
    files = []
    for root, _, filenames in os.walk(directory):
        for filename in fnmatch.filter(filenames, pattern):
            files.append(filename)
    files = sorted(files)
    return files


def process_folder(data_speech_dir, data_transc_dir, acoustic_model, decoder, verbose=True, max_files=None):

    # Get the list of files in the dataset folder
    audio_files = find_files(data_speech_dir)

    # Take a subset of files
    nfiles = len(audio_files)
    if max_files:
        nfiles = min(nfiles, max_files)
    audio_files = audio_files[:nfiles]

    # Initialize lists containing true and estimated transcripts, as well as WER
    true_transcript_all = []
    est_transcript_all = []
    wer_all = []

    for iaf, audio_file in enumerate(audio_files):
        
        # Get files path
        audio_file_path = os.path.join(data_speech_dir, audio_file)
        transc_file_path = os.path.join(data_transc_dir, audio_file.replace('wav', 'txt'))

        # Load an audio signal
        waveform, sr = torchaudio.load(audio_file_path, channels_first=True)

        # Apply acoustic model and decoder
        with torch.inference_mode():
            emission, _ = acoustic_model(waveform)
            ctc_decoder_results = decoder(emission)[0][0]

        ctc_tokens = decoder.idxs_to_tokens(ctc_decoder_results.tokens)
        est_transcript = "".join(ctc_tokens).replace("|", " ").strip().split()
        est_transcript = [w.lower() for w in est_transcript]

        # Load the true transcription
        true_transcript = get_true_transcript(transc_file_path)
        
        # Compute WER
        wer = get_wer(true_transcript, est_transcript)
        wer_all.append(wer)

        est_transcript_all.append(est_transcript)
        true_transcript_all.append(true_transcript)
        
        # Display results
        if verbose:
            print(f"File {iaf+1} / {nfiles}")
            print('Estimated transcript: ', est_transcript)
            print('True transcript: ', true_transcript)
            print(f"WER: {wer*100:.1f} %")

    wer_mean = torch.FloatTensor(wer_all).mean()

    return wer_mean, est_transcript_all, true_transcript_all

In [None]:
# CTC decoder with the 4-gram language model
decoder = ctc_decoder(
    lexicon=path_lm_lexicon,
    tokens=path_lm_tokens,
    lm=path_lm,
)
wer_mean = process_folder(data_speech_dir, data_transc_dir, acoustic_model, decoder, verbose=False, max_files=MAX_FILES)[0]
print(f"LM: --- WER: {wer_mean*100:.1f} %")

# Greedy decoder
decoder = ctc_decoder(
    lexicon=None,
    tokens=labels,
    lm=None,
)
wer_mean = process_folder(data_speech_dir, data_transc_dir, acoustic_model, decoder, verbose=False, max_files=MAX_FILES)[0]
print(f"Greedy Decoder --- WER: {wer_mean*100:.1f} %")

## Influence of the lexicon

The lexicon is expected to have a strong influence on ASR performance, since it constrains the decoder to produce only words that belong to this lexicon, therefore avoiding to procude words that potentially do not exist in a language or given corpus. Here, we propose to use a custom lexicon that only contains words that are in the dataset.

<span style="color:red"> **Exercise 2**</span>. Perform ASR using such a custom lexicon. To that end:
- Build a vocabulary (list of words) from the transcript files in the dataset (load the transcripts and remove duplicates).
- Filter the downloaded lexicon by only keeping words from the vocabulary. Write this custom lexicon as a `.txt` file.
- Create a dedocer that uses the 4-gram language model and this custom lexicon. Perform ASR on the dataset / compute the WER. What do you observe?

In [None]:
# Fonction to get a flat list from a list of list
def flatten_list(list_of_lists):
    return [x for L in list_of_lists for x in L]

# Read all the transcripts in the dataset
transcr_files = find_files(data_transc_dir, pattern='*.txt')
true_transcript = []
for f in transcr_files:
    transc_file_path = os.path.join(data_transc_dir, f)
    true_transcript.append(get_true_transcript(transc_file_path))

# Flatten the list and remove duplicates
vocab_dataset = list(set(flatten_list(true_transcript)))
print('Words in the dataset:', len(vocab_dataset))

# Filter the provided lexicon by keeping only words in our dataset
custom_lexicon = []
for w in vocab_dataset:
    wl = [line for line in open(files.lexicon) if line.startswith(w + '\t')]
    custom_lexicon.append(wl)

# again, flatten it
custom_lexicon = flatten_list(custom_lexicon)

# There are less entries in the lexicon than in our list of words from the dataset: several words are indeed not registered in the lexicon (we could add them manually)
print('Entries in the custom lexicon: ', len(custom_lexicon))

# Record this lexicon
with open("mylexicon.txt", "w") as f:
    f.writelines(custom_lexicon)

In [None]:
# CTC decoder with the language model + a custom lexicon
decoder = ctc_decoder(
    lexicon="mylexicon.txt",
    tokens=path_lm_tokens,
    lm=path_lm,
)
wer_mean = process_folder(data_speech_dir, data_transc_dir, acoustic_model, decoder, verbose=False, max_files=MAX_FILES)[0]
print(f"Custom lexicon --- WER: {wer_mean*100:.1f} %")

**Comment**: This custom lexicon produces a better transcript (lower WER) than the more general lexicon. This makes sense since our custom lexicon is specifically tailored for this dataset, and then words outside the dataset cannot be predicted. Nonetheless there are still some errors: besides the acoustic model, these can be due to several words in the dataset not being actually in the original downloaded lexicon, therefore the model cannot predict them using solely the language model.

## Beam search parameters

The beam search algorithm used in the CTC decoder depends on other parameters, such as `nbest` which determines the number of hypotheses to return, or `lm_weight` which adjust the relative importance of the language model vs. the acoustic model predictions. Here we only focus on `beam_size`, which determines the maximum number of best hypotheses to hold after each decoding step. Using larger beam sizes allows for exploring a larger range of possible hypotheses which can produce hypotheses with higher scores, which really is [the core](https://distill.pub/2017/ctc/) of the beam search algorithm.

<span style="color:red"> **Exercise 3**</span>. Perform ASR on the whole dataset folder for several values of the beam search size parameter: `beam_size` $\in [1, 10, 100]$ (use the original downloaded lexicon, not the custom one). Compute the WER and the computation time (e.g., via the [time](https://docs.python.org/3/library/time.html#time.time) package). What do you observe?

In [None]:
beam_sizes = [1, 10, 100]

for beam_size in beam_sizes:
    decoder = ctc_decoder(
        lexicon=path_lm_lexicon,
        tokens=path_lm_tokens,
        lm=path_lm,
        beam_size=beam_size,
    )

    time_start = time.time()
    wer_mean = process_folder(data_speech_dir, data_transc_dir, acoustic_model, decoder, verbose=False, max_files=MAX_FILES)[0]
    time_ellapsed = time.time() - time_start

    print(f"Beam search size: {beam_size} --- WER: {wer_mean*100:.1f} % --- Time: {time_ellapsed:.1f}")

**Comment**: Using a beam search size that is too large (i.e., building too many hypothesis when decoding) does not improve performance compared to a lower value, but increases the computational cost. Thus, it is recommended to adjust this value in order to find a tradeoff between accuracy and speed.

## Segmentation / alignment

**Unfinished / non working : can be changed to add an extra exercice if needed**

In [None]:
print(ctc_decoder_result.timesteps)

In [None]:
word = {}
word['start'] = 0
word['end'] = 10
word['label'] = 'l'
word_segments = []
word_segments.append(word)

In [None]:
# TODO: faire coder le time end (en utilisant '|' ) 

In [None]:
def plot_alignments_wav(waveform, emission, tokens, timesteps, sample_rate):
    waveform /= waveform.max()
    t = torch.arange(waveform.size(0)) / sample_rate
    ratio = waveform.size(0) / emission.size(1) / sample_rate

    chars = []
    words = []
    word_start = None
    for token, timestep in zip(tokens, timesteps * ratio):
        if token == "|":
            if word_start is not None:
                words.append((word_start, timestep))
            word_start = None
        else:
            chars.append((token, timestep))
            if word_start is None:
                word_start = timestep

    fig = plt.figure(figsize=(12,4))
    plt.plot(t, waveform)
    for token, timestep in chars:
        plt.annotate(token, (timestep, 1))
    for word_start, word_end in words:
        plt.axvspan(word_start, word_end, alpha=0.1, color="red")
    plt.yticks([])
    plt.xlabel("Time (s)")
    plt.tight_layout()
    plt.show()


ctc_nolm_tokens = decoder.idxs_to_tokens(ctc_nolm_result.tokens)
plot_alignments_wav(waveform[0], emission, ctc_nolm_tokens, ctc_decoder_result.timesteps, sr)

In [None]:
def plot_alignments(segments, word_segments, waveform, sample_rate=bundle.sample_rate):

    # The original waveform
    ratio = waveform.size(0) / sample_rate / trellis.size(0)
    ax2.specgram(waveform, Fs=sample_rate)
    for word in word_segments:
        x0 = ratio * word.start
        x1 = ratio * word.end
        ax2.axvspan(x0, x1, facecolor="none", edgecolor="white", hatch="/")
        ax2.annotate(f"{word.score:.2f}", (x0, sample_rate * 0.51), annotation_clip=False)

    for seg in segments:
        if seg.label != "|":
            ax2.annotate(seg.label, (seg.start * ratio, sample_rate * 0.55), annotation_clip=False)
    ax2.set_xlabel("time [second]")
    ax2.set_yticks([])
    fig.tight_layout()


plot_alignments(
    segments,
    word_segments,
    waveform[0],
)

In [None]:
def display_segment(waveform, word, num_time_steps, sr=16000):
    ratio = waveform.shape[-1] / num_time_steps
    x0 = int(ratio * word['start'])
    x1 = int(ratio * word['end'])
    print(f"Character: {word['label']}")
    segment = waveform[:, x0:x1]
    return IPython.display.Audio(segment, rate=sr)

In [None]:
word = word_segments[0]
display_segment(waveform, word, num_time_steps, sr=16000)