# Mount Drive and Set File Directories

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


You will want to change these values for your model and perspective paths.

In [None]:
model_path = 'facebook/wav2vec2-large-960h-lv60' 
output_filename = "DEMO-output.txt"
output_filepath = '/content/drive/MyDrive/path/to/output/folder'
path_to_kenlm   = '/content/drive/MyDrive/path/to/arpa/file'
path_to_hotwords = '/content/drive/MyDrive/path/to/hotwords_formatted.txt'

# Installs

Run following cells in order:

In [None]:
!pip install datasets
!pip install transformers
!pip install soundfile
!pip install jiwer
!pip install torch==1.10.0+cu102 torchaudio===0.10.0+cu102 -f https://download.pytorch.org/whl/cu102/torch_stable.html

In [None]:
!pip install git+https://github.com/huggingface/transformers

In [None]:
from jiwer import wer

# If Using KenLM:

Run following cells in order:

In [None]:
!pip install https://github.com/kpu/kenlm/archive/master.zip

Collecting https://github.com/kpu/kenlm/archive/master.zip
  Downloading https://github.com/kpu/kenlm/archive/master.zip
[K     / 541 kB 745 kB/s
[?25hBuilding wheels for collected packages: kenlm
  Building wheel for kenlm (setup.py) ... [?25l[?25hdone
  Created wheel for kenlm: filename=kenlm-0.0.0-cp37-cp37m-linux_x86_64.whl size=2332621 sha256=d6b448ae2f7732c2174261b9c0d99ee1a23a3760259a437b93a9e8af7724bcbc
  Stored in directory: /tmp/pip-ephem-wheel-cache-4lfzszav/wheels/3d/aa/02/7b4a2eab5d7a2a9391bd9680dbad6270808a147bc3b7047e4e
Successfully built kenlm
Installing collected packages: kenlm
Successfully installed kenlm-0.0.0


In [None]:
!pip install pyctcdecode

Collecting pyctcdecode
  Downloading pyctcdecode-0.2.0-py2.py3-none-any.whl (43 kB)
[K     |████████████████████████████████| 43 kB 572 kB/s 
[?25hCollecting hypothesis<7,>=6.14
  Downloading hypothesis-6.31.0-py3-none-any.whl (388 kB)
[K     |████████████████████████████████| 388 kB 3.6 MB/s 
[?25hCollecting pygtrie<3.0,>=2.1
  Downloading pygtrie-2.4.2.tar.gz (35 kB)
Building wheels for collected packages: pygtrie
  Building wheel for pygtrie (setup.py) ... [?25l[?25hdone
  Created wheel for pygtrie: filename=pygtrie-2.4.2-py3-none-any.whl size=19062 sha256=c64c1652f19709fa841889d8ab71b226d6df92fa88af031e02fc38a5b0bcaae5
  Stored in directory: /root/.cache/pip/wheels/d3/f8/ba/1d828b1603ea422686eb694253a43cb3a5901ea4696c1e0603
Successfully built pygtrie
Installing collected packages: pygtrie, hypothesis, pyctcdecode
Successfully installed hypothesis-6.31.0 pyctcdecode-0.2.0 pygtrie-2.4.2


The hotwords file is a .txt file that has the tab-separated word and the count for that word on each line of the file (in other words separated by newlines).

EXAMPLE:

apple\t745

cherry\t236

lime\t112

In [None]:
# process hotwords
file = open(path_to_hotwords, 'r', encoding='utf-8')
lines = file.readlines()
file.close()

HOTWORDS = []
for line in lines:
  line = line.split('\t')
  HOTWORDS.append(line[0].upper())

# Load Dataset

You'll need to change the path here to your 'test' csv file. There **needs** to be two columns in your csv: a 'file' column with a path to the audio file within your Google drive and a 'text' column that contains the gold standard transcript for that audio file.

In [None]:
from datasets import load_dataset, load_metric
dataset = load_dataset("csv", data_files={"test":  '/content/drive/MyDrive/path/to/your-csv-file.csv'})

In [None]:
dataset

DatasetDict({
    test: Dataset({
        features: ['file', 'text'],
        num_rows: 40
    })
})

This function is useful for getting a peep at your data to see what it looks like and is borrowed from Patrick von Platen's blog posts as he typically includes this function.

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(dataset["test"].remove_columns(["file"]), 5)

This section removes any unnecessary punctuation and converts audio files to arrays. If there are any additional punctuation symbols you wish to remove and\or keep, you will need to modify the *chars_to_ignore_regex* here.

In [None]:
import re
import torch
import torchaudio
import soundfile as sf
import os

chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"]'

def remove_special_characters(batch):
    batch["text"] = re.sub(chars_to_ignore_regex, '', batch["text"]).lower() + " "
    return batch

def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = sf.read(batch["file"])
    batch["speech"] = speech_array
    batch["sampling_rate"] = sampling_rate
    batch["target_text"] = batch["text"]
    return batch

In [None]:
dataset = dataset.map(remove_special_characters)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
dataset = dataset.map(speech_file_to_array_fn, num_proc=4)

# Inferencing

In [None]:
# Installs
import torch
from transformers import AutoModelForCTC, Wav2Vec2Processor
import time
from pyctcdecode import build_ctcdecoder
import kenlm

Download model and saved processor from provided path stored in *model_path*.

In [None]:
model = AutoModelForCTC.from_pretrained(model_path)
processor = Wav2Vec2Processor.from_pretrained(model_path)

Downloading:   0%|          | 0.00/841 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/1.18G [00:00<?, ?B/s]

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at facebook/wav2vec2-large-960h-lv60 and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Downloading:   0%|          | 0.00/158 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/291 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/162 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

Get vocabulary from tokenizer and build decoder.

In [None]:
vocab_dict = processor.tokenizer.get_vocab()
sorted_dict = {k.lower(): v for k, v in sorted(vocab_dict.items(), key=lambda item: item[1])}

In [None]:
decoder = build_ctcdecoder(
    list(sorted_dict.keys()),
    path_to_kenlm,
)

Functions for getting results without KenLM, with KenLM and with KenLM *and* hotwords.

In [None]:
def map_to_wer_no_lm(batch):
  model.to("cuda")
  input_values = processor(
      batch["speech"], 
      sampling_rate=batch["sampling_rate"], 
      return_tensors="pt"
  ).input_values.to("cuda")
  
  with torch.no_grad():
    pred_ids = torch.argmax(model(input_values).logits, -1)

  pred_str = processor.batch_decode(pred_ids)
  batch["pred_str"] = pred_str[0]
  batch["ref_str"] = batch["text"]
  return batch

def map_to_wer_with_lm(batch):
  model.to("cuda")
  input_values = processor(
      batch["speech"], 
      sampling_rate=batch["sampling_rate"], 
      return_tensors="pt"
  ).input_values.to("cuda")

  with torch.no_grad():
    logits = model(input_values).logits.cpu().numpy()[0]

  batch["pred_str"] = decoder.decode(logits)
  batch["ref_str"] = batch["text"]
  return batch

def map_to_wer_with_lm_hotwords(batch):
  model.to("cuda")
  input_values = processor(
      batch["speech"], 
      sampling_rate=batch["sampling_rate"], 
      return_tensors="pt"
  ).input_values.to("cuda")

  with torch.no_grad():
    logits = model(input_values).logits.cpu().numpy()[0]

  batch["pred_str"] = decoder.decode(logits,hotwords=HOTWORDS,hotword_weight=10.0)
  batch["ref_str"] = batch["text"]
  return batch

Map test set to results:

In [None]:
result_with_no_lm = dataset["test"].map(map_to_wer_no_lm)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
result_with_lm = dataset["test"].map(map_to_wer_with_lm)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
result_with_lmhotwords = dataset["test"].map(map_to_wer_with_lm_hotwords)

  0%|          | 0/40 [00:00<?, ?ex/s]

Get the individual and average Word Error Rate for each result (without KenLM, with KenLM and with KenLM + hotwords)

In [None]:
AVG_WER = []

def get_wer_breakdown(batch):
  global AVG_WER
  target = batch['target_text']
  predic = batch['pred_str']
  batch['individual_wer'] = wer(target, predic)
  AVG_WER.append(batch['individual_wer'])
  return batch

In [None]:
result_reg = result_with_no_lm.map(get_wer_breakdown)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
print(f"Average WER: {sum(AVG_WER)/len(AVG_WER)}")

Average WER: 1.0295869408369407


In [None]:
AVG_WER = []

In [None]:
result_lm  = result_with_lm.map(get_wer_breakdown)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
print(f"Average WER: {sum(AVG_WER)/len(AVG_WER)}")

Average WER: 0.9729750326625327


In [None]:
AVG_WER = []

In [None]:
# with hotwords
result_lm_hotwords = result_with_lmhotwords.map(get_wer_breakdown)

  0%|          | 0/40 [00:00<?, ?ex/s]

In [None]:
print(f"Average WER: {sum(AVG_WER)/len(AVG_WER)}")

Average WER: 0.9163631244881245


See Predictions:

In [None]:
# No KenLM
show_random_elements(result_reg, 5)

In [None]:
# With KenLM
show_random_elements(result_lm, 5)

In [None]:
# With KenLM + Hotwords
show_random_elements(result_lm_hotwords, 5)