<a href="https://colab.research.google.com/github/karank85/speech-recognition/blob/main/Project2_DL_Speech_Recognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os

import numpy as np
from numpy import ndarray
import pandas as pd
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

import librosa

from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import IPython.display as ipd
import librosa.display
from tqdm import tqdm

import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn

import glob




In [2]:
librosa.__version__

'0.10.1'

In [3]:
# Setup device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
!pip install transformers datasets evaluate jiwer accelerate

Collecting datasets
  Downloading datasets-2.18.0-py3-none-any.whl (510 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting evaluate
  Downloading evaluate-0.4.1-py3-none-any.whl (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jiwer
  Downloading jiwer-3.0.3-py3-none-any.whl (21 kB)
Collecting accelerate
  Downloading accelerate-0.28.0-py3-none-any.whl (290 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m290.1/290.1 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2

In [6]:
# Assumptions:
# - The transcription file is located in the same directory as the audio files.
class AudioDataset:
  """
  Class for loading and storing audio data.
  """

  def __init__(self):
    self.df = pd.DataFrame(columns=['id', 'path', 'transcription'])

  def load_transcriptions(self, directory_path: str) -> bool:
    """
    Load all transcriptions from a given directory, including subdirectories.
    Returns False if no transcription files were found, or if any failed to load.
    """
    transcriptions_path = glob.glob(
        f"{directory_path}/**/*.trans.txt",
        recursive=True
    )

    if len(transcriptions_path) == 0:
      return False

    for path in transcriptions_path:
      if not self.load_transcription_file(path):
        return False

    return True



  def load_transcription_file(self, file_path: str) -> bool:
    """
    Parse transcription file and records the audio ID - subtitle mapping.
    Returns False if the file could not be read.
    """
    with open(file_path, "r") as file:
      file_directory = os.path.dirname(file_path)

      lines = file.read().split("\n")
      for line in lines:
        if len(line.strip()) == 0:
          continue
        splitter = line.split(" ")
        file_name = splitter[0]
        file_content = ' '.join(splitter[1:])
        self.df.loc[len(self.df)] = {
            'id':file_name,
            'transcription':file_content,
            'path': f'{file_directory}/{file_name}.flac'
        }
      return True
    return False

  def keys(self):
    return iter(self.df['id'])

  def get(self, id: int):
    """
    Retrieve a dataframe row from ID.
    """
    return self.df.loc[self.df['id'] == id]

In [7]:
ds = AudioDataset()

In [8]:
ds.load_transcriptions("/content/drive/MyDrive/")

True

In [9]:
audio_list = []
sampling_freq_list = []

for index, row in ds.df.iterrows():
  audio, sampling_freq = librosa.load(row['path'], sr=16_000)

  audio_list.append(audio)
  sampling_freq_list.append(sampling_freq)

ds.df['audio'] = audio_list
ds.df['sampling_freq'] = sampling_freq_list

In [10]:
from datasets import Dataset, Features, Array3D, Value

custom_dataset = Dataset.from_pandas(ds.df)
custom_dataset

Dataset({
    features: ['id', 'path', 'transcription', 'audio', 'sampling_freq', '__index_level_0__'],
    num_rows: 252
})

In [11]:
custom_dataset = custom_dataset.remove_columns(['path', 'id', '__index_level_0__'])
# transcription: str, audio: list[int], sampling_freq: int
custom_dataset

Dataset({
    features: ['transcription', 'audio', 'sampling_freq'],
    num_rows: 252
})

In [12]:
custom_dataset = custom_dataset.train_test_split(test_size=0.2)
custom_dataset

DatasetDict({
    train: Dataset({
        features: ['transcription', 'audio', 'sampling_freq'],
        num_rows: 201
    })
    test: Dataset({
        features: ['transcription', 'audio', 'sampling_freq'],
        num_rows: 51
    })
})

In [13]:
# Making sure it's all uppercase characters as Wav2Vec Tokenizer is only trained
# on uppercase characters and we need to match the tokenizer's vocabulary.
all_upper = True
for item in custom_dataset["train"]:
  if not item["transcription"].isupper():
    all_upper = False

all_upper

True

In [14]:
from transformers import AutoProcessor

processor = AutoProcessor.from_pretrained("facebook/wav2vec2-base")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/159 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/163 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.84k [00:00<?, ?B/s]



vocab.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

In [15]:
def apply_processor(batch):
  batch = processor(batch["audio"], sampling_rate=batch["sampling_freq"], text=batch["transcription"])
  batch["input_length"] = len(batch["input_values"][0])
  return batch

encoded_datasets = custom_dataset.map(apply_processor, remove_columns=custom_dataset.column_names["train"], num_proc=4)

Map (num_proc=4):   0%|          | 0/201 [00:00<?, ? examples/s]

Map (num_proc=4):   0%|          | 0/51 [00:00<?, ? examples/s]

In [16]:
# Need to create a data collator to prepare batches of data suitable for training CTC loss-based models
# Pad text and labels to length of the longest element in its batch to make it uniform length

@dataclass
class DataCollatorCTCLossWithPadding:
  processor: AutoProcessor
  padding: Union[bool, str] = "longest" # pad to the longest sequence

  def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:

    #audio features
    input_features = [{"input_values": feature["input_values"][0]} for feature in features]
    # tokenized labels
    label_features = [{"input_ids": feature["labels"]} for feature in features]

    batch = self.processor.pad(input_features, padding=self.padding, return_tensors="pt")

    labels_batch = self.processor.pad(labels=label_features, padding=self.padding, return_tensors="pt")

    # replace padding with -100 to ignore loss correctly
    labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

    batch["labels"] = labels

    return batch

In [17]:
data_collator = DataCollatorCTCLossWithPadding(processor=processor, padding="longest")

## Wave2Vec2.0

In [18]:
import evaluate

wer_metric = evaluate.load("wer")

Downloading builder script:   0%|          | 0.00/4.49k [00:00<?, ?B/s]

In [19]:
import numpy as np

def compute_metrics(pred):
    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.batch_decode(pred_ids)
    label_str = processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


In [20]:
from transformers import AutoModelForCTC

model = AutoModelForCTC.from_pretrained(
    "facebook/wav2vec2-base",
    ctc_loss_reduction="mean",
    pad_token_id=processor.tokenizer.pad_token_id,
)



pytorch_model.bin:   0%|          | 0.00/380M [00:00<?, ?B/s]

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-base and are newly initialized: ['lm_head.bias', 'lm_head.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [21]:
model.to(device)

model.freeze_feature_extractor()



In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="wav2vec_model_v2",  # Output directory to save model checkpoints and logs
    group_by_length=True,  # Group samples of roughly the same length together to minimize padding
    per_device_train_batch_size=16,  # Batch size per GPU/device during training
    evaluation_strategy="steps",  # Evaluate every `eval_steps`
    fp16=True,  # Use mixed precision training with automatic mixed precision scaler
    save_steps=500,  # Save model checkpoint every `save_steps`
    eval_steps=500,  # Evaluate model every `eval_steps`
    logging_steps=100,  # Log training metrics every `logging_steps`
    learning_rate=1e-4,  # Learning rate
    weight_decay=0.01,  # Weight decay to prevent overfitting
    warmup_steps=500,  # Warmup steps for learning rate scheduler
    save_total_limit=2,  # Limit the total number of saved checkpoints
    max_steps=2000,  # Maximum number of training steps
)

# Define Trainer
trainer = Trainer(
    model=model,  # Model to be trained
    args=training_args,  # Training arguments
    train_dataset=encoded_datasets["train"],  # Training dataset
    eval_dataset=encoded_datasets["test"],  # Evaluation dataset
    tokenizer=processor,  # Tokenizer for preprocessing inputs
    data_collator=data_collator,  # Data collator for batching and padding
    compute_metrics=compute_metrics,  # Function to compute evaluation metrics
)

trainer.train()

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


Step,Training Loss,Validation Loss


Step,Training Loss,Validation Loss,Wer
500,0.0244,0.101277,0.062655


Checkpoint destination directory wav2vec_model_v2/checkpoint-500 already exists and is non-empty. Saving will proceed but saved results may be invalid.


In [None]:
class ModelEngine:


  def __init__(self, model_name, custom_dataset):
    self.processor = AutoProcessor.from_pretrained(model_name)
    self.data_collator = DataCollatorCTCLossWithPadding(processor=processor, padding="longest")
    self.model = AutoModelForCTC.from_pretrained(
        model_name,
        ctc_loss_reduction="mean",
        pad_token_id=self.processor.tokenizer.pad_token_id,
    )
    self.encoded_datasets = custom_dataset.map(self.apply_processor, remove_columns=custom_dataset.column_names["train"], num_proc=4)

  def apply_processor(self, batch):
    batch = self.processor(batch["audio"], sampling_rate=batch["sampling_freq"], text=batch["transcription"])
    batch["input_length"] = len(batch["input_values"][0])
    return batch

  def compute_metrics(self, pred):

    wer_metric = evaluate.load("wer")

    pred_logits = pred.predictions
    pred_ids = np.argmax(pred_logits, axis=-1)

    pred.label_ids[pred.label_ids == -100] = self.processor.tokenizer.pad_token_id

    pred_str = self.processor.batch_decode(pred_ids)
    label_str = self.processor.batch_decode(pred.label_ids, group_tokens=False)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

  def train(self,output_dir, encoded_datasets):

    self.model.to(device)

    self.model.freeze_feature_extractor()

    training_args = TrainingArguments(
    output_dir="wav2vec_model_v2",  # Output directory to save model checkpoints and logs
    group_by_length=True,  # Group samples of roughly the same length together to minimize padding
    per_device_train_batch_size=16,  # Batch size per GPU/device during training
    evaluation_strategy="steps",  # Evaluate every `eval_steps`
    fp16=True,  # Use mixed precision training with automatic mixed precision scaler
    save_steps=500,  # Save model checkpoint every `save_steps`
    eval_steps=500,  # Evaluate model every `eval_steps`
    logging_steps=100,  # Log training metrics every `logging_steps`
    learning_rate=1e-4,  # Learning rate
    weight_decay=0.01,  # Weight decay to prevent overfitting
    warmup_steps=500,  # Warmup steps for learning rate scheduler
    save_total_limit=2,  # Limit the total number of saved checkpoints
    max_steps=2000,  # Maximum number of training steps
    )

    # Define Trainer
    trainer = Trainer(
        model=self.model,  # Model to be trained
        args=training_args,  # Training arguments
        train_dataset=encoded_datasets["train"],  # Training dataset
        eval_dataset=encoded_datasets["test"],  # Evaluation dataset
        tokenizer=self.processor,  # Tokenizer for preprocessing inputs
        data_collator=self.data_collator,  # Data collator for batching and padding
        compute_metrics=self.compute_metrics,  # Function to compute evaluation metrics
    )

    trainer.train()
