In [None]:
# This mounts your Google Drive to the Colab VM.
from google.colab import drive
drive.mount('/content/drive')

# TODO: Enter the foldername in your Drive where you have saved the unzipped
# assignment folder, e.g. 'cs231/assignment1'
FOLDERNAME = 'Introduction to Speech Processing/'
assert FOLDERNAME is not None, "[!] Enter the folername."

# Now that we've mounted your Drive, this ensures that
# the Python interpreter of the Colab VM can load
# python files from within it
import sys
sys.path.append('/content/drive/MyDrive/{}'.format(FOLDERNAME))

%cd /content/drive/MyDrive/$FOLDERNAME

Mounted at /content/drive
/content/drive/MyDrive/Introduction to Speech Processing


In [None]:
!pip install hmmlearn
!pip install jiwer

Collecting hmmlearn
  Downloading hmmlearn-0.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (160 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.4/160.4 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: hmmlearn
Successfully installed hmmlearn-0.3.0
Collecting jiwer
  Downloading jiwer-3.0.2-py3-none-any.whl (21 kB)
Collecting rapidfuzz==2.13.7 (from jiwer)
  Downloading rapidfuzz-2.13.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.2 rapidfuzz-2.13.7


In [None]:
import os
import torch
import heapq
import librosa
import operator

import numpy as np
import pandas as pd

from jiwer import wer, cer
from hmmlearn import hmm
from statistics import mean

In [None]:
WHITESPACE = " "
TRAIN = 'train'
TEST = 'test'
VAL = 'val'

train_txt = os.listdir(f'./an4/{TRAIN}/an4/txt')
train_wav = [f'{txt.split(".")[0]}.wav' for txt in train_txt]
val_txt = os.listdir(f'./an4/{VAL}/an4/txt')
val_wav = [f'{txt.split(".")[0]}.wav' for txt in val_txt]
test_txt = os.listdir(f'./an4/{TEST}/an4/txt')
test_wav = [f'{txt.split(".")[0]}.wav' for txt in test_txt]

In [None]:
def preprocess_audio(file, directory):
  audio, sr = librosa.load(f'{directory}/{file}', sr=None)
  intervals = librosa.effects.split(audio, top_db=5)
  if(intervals.shape[0] > 1):
    splitted_audio = []
    for interval in intervals:
      splitted_audio.append(audio[interval[0]:interval[1]])

    audio = np.concatenate(splitted_audio)

  audio = librosa.to_mono(audio)
  return audio, intervals

In [None]:
X_train = []
interval_train = []
X_val = []
interval_val = []
X_test = []
interval_test = []

y_train = []
y_val = []
y_test = []

for wav in train_wav:
  audio, intervals = preprocess_audio(wav, f'./an4/{TRAIN}/an4/wav/')
  X_train.append(audio)
  interval_train.append(intervals)

for txt in train_txt:
  with open(f'./an4/{TRAIN}/an4/txt/{txt}') as file:
    y_train.append(file.read().split(" "))

for wav in val_wav:
  audio, intervals = preprocess_audio(wav, f'./an4/{VAL}/an4/wav/')
  X_val.append(audio)
  interval_val.append(intervals)

for txt in val_txt:
  with open(f'./an4/{VAL}/an4/txt/{txt}') as file:
    y_val.append(file.read().split(" "))

for wav in test_wav:
  audio, intervals = preprocess_audio(wav, f'./an4/{TEST}/an4/wav/')
  X_test.append(audio)
  interval_test.append(intervals)

for txt in test_txt:
  with open(f'./an4/{TEST}/an4/txt/{txt}') as file:
    y_test.append(file.read().split(" "))

In [None]:
class LanguageModel:

  def __init__(self, n_gram=1):
    self.probabilities = {}
    self.n_gram = n_gram
    self.instances = []

  def calculate_probabilities(self, train_txt, directory):
    bag_of_ngrams = []
    for file in train_txt:
      with open(f'{directory}/{file}') as txt:
        tokens = txt.read().split(" ")
        for i in range(len(tokens)):
          n_gram = tokens[i:i + self.n_gram]
          if len(n_gram) == self.n_gram:
            bag_of_ngrams.append(tuple(n_gram))

    number_of_ngrams = len(bag_of_ngrams)
    for ngram in bag_of_ngrams:
        if ngram[:-1] not in self.probabilities.keys():
          self.probabilities[ngram[:-1]] = {}

        if ngram[-1] not in self.probabilities[ngram[:-1]].keys():
          self.probabilities[ngram[:-1]][ngram[-1]] = 1
          self.instances.append(ngram)
        else:
          self.probabilities[ngram[:-1]][ngram[-1]] += 1

    for window in self.probabilities.keys():
      number_of_instances = sum(self.probabilities[window].values())
      for word in self.probabilities[window].keys():
        self.probabilities[window][word] =\
          float(self.probabilities[window][word] / number_of_instances)

  def get_probability(self, ngram):
    return self.probabilities[ngram[:-1]][ngram[-1]]

  def get_instances(self):
    return self.instances


In [None]:
def get_mfcc_features(audio, sr=16000):
  mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13, n_mels=40, n_fft=512,
                               hop_length=128, fmin=0, fmax=None, htk=False)
  delta_mfccs = librosa.feature.delta(mfccs, mode='nearest')
  delta2_mfccs = librosa.feature.delta(mfccs, order=2, mode='nearest')
  mfccs_features = np.concatenate((mfccs, delta_mfccs, delta2_mfccs))

  return mfccs_features.T

In [None]:
class AcousticModel:

  def __init__(self):
    self.models_dictionary = {}

  def train(self, X, y, intervals, n_gram, n_mix, n_iter):
    for i in range(len(X)):
      text = y[i]
      audio_length = X[i].shape[0]
      offset = 0
      prev_mfccs = None
      if len(intervals[i]) >= n_gram:
        for j in range(len(text) - n_gram + 1):
          interval_index = min(j, len(intervals[i]) - n_gram)
          interval = intervals[i][interval_index + n_gram - 1][1] - intervals[i][interval_index][0]
          hmm_gmm = None
          key = tuple(text[j:j + n_gram])
          if key not in self.models_dictionary.keys():
            hmm_gmm = hmm.GMMHMM(n_components=3, n_mix=n_mix, n_iter=n_iter, init_params="", params="")
          else:
            hmm_gmm = self.models_dictionary[key]

          start = offset
          end = start + interval
          offset += (intervals[i][interval_index][1] - intervals[i][interval_index][0])
          section = X[i][start:end]
          if section.shape[0] == 0:
            mfccs = prev_mfccs
          else:
            mfccs = get_mfcc_features(section)

          hmm_gmm.fit(mfccs)
          self.models_dictionary[key] = hmm_gmm
          prev_mfccs = mfccs

    return self.models_dictionary

  def get_model(self, n_gram):
    return self.models_dictionary[n_gram]

  def get_words(self):
    return self.models_dictionary.keys()


In [None]:
def predict(X, intervals, language_model, acoustic_model, window_size):
  y_pred = []
  for i in range(len(X)):
    audio_length = X[i].shape[0]
    offset = 0
    prev_mfccs = None
    sample_scores = {(): 1}
    best_text = None
    for j in range(max(len(intervals[i]) - window_size + 1, 1)):
      interval_index = max(0, min(j, len(intervals[i]) - window_size))
      interval = intervals[i][min(len(intervals[i]) - 1, interval_index + window_size - 1)][1] -\
                   intervals[i][interval_index][0]
      start = offset
      end = start + interval
      offset += (intervals[i][interval_index][1] - intervals[i][interval_index][0])
      section = X[i][start:end]
      if section.shape[0] == 0:
        mfccs = prev_mfccs
      else:
        mfccs = get_mfcc_features(section)

      prev_mfccs = mfccs
      interval_scores = {}
      scores_update = {}
      for n_gram in acoustic_model.models_dictionary.keys():
        hmm_gmm = acoustic_model.get_model(n_gram)
        interval_scores[n_gram] = 1 / abs(hmm_gmm.score(get_mfcc_features(section)))

        for sequence in sample_scores.keys():
          for n_gram in interval_scores.keys():
            scores_update[sequence + (n_gram[-1],)] =\
              sample_scores[sequence] * interval_scores[n_gram] * language_model.get_probability(n_gram)

      sample_scores = scores_update
      top_keys = heapq.nlargest(5, sample_scores, key=sample_scores.get)
      top_values = [sample_scores[key] for key in top_keys]
      sample_scores = {top_keys[i]: top_values[i] for i in range(len(top_keys))}
      best_text = top_keys[0]

    y_pred.append(list(best_text))

  return y_pred


In [None]:
models = {}
for i in range(1, 6):
  language_model = LanguageModel(n_gram=i)
  language_model.calculate_probabilities(train_txt, f'./an4/{TRAIN}/an4/txt')

  acoustic_model = AcousticModel()
  acoustic_model.train(X_train, y_train, interval_train, n_gram=i, n_mix=5, n_iter=100)

  models[i] = (language_model, acoustic_model)

In [None]:
best_wer = 1
best_window_size = 1
for i in range(1, 6):
  y_pred = predict(X_val, interval_val, models[i][0], models[i][1], i)

  w_error = []
  c_error = []
  for j in range(len(y_val)):
    w_error.append(wer(WHITESPACE.join(y_val[j]), WHITESPACE.join(y_pred[j])))
    c_error.append(cer(WHITESPACE.join(y_val[j]), WHITESPACE.join(y_pred[j])))

  mean_w_error = mean(w_error)
  mean_c_error = mean(c_error)
  print(f'Size of window: {i}, WER: {mean_w_error}')
  print(f'Size of window: {i}, CER: {mean_c_error}')
  if mean_w_error < best_wer:
    best_wer = mean_w_error
    best_window_size = i

print(f'Best window size: {best_window_size}')

Size of window: 1, WER: 0.9770933744617956
Size of window: 1, CER: 0.871974895888593
Size of window: 2, WER: 1.0
Size of window: 2, CER: 1.5460651132294168
Size of window: 3, WER: 0.9837976351134246
Size of window: 3, CER: 0.9816716159861058
Size of window: 4, WER: 0.9709070753807596
Size of window: 4, CER: 0.97680954313214
Size of window: 5, WER: 0.9726614613456719
Size of window: 5, CER: 0.9711317780237775
Best window size: 4


In [None]:
y_pred = predict(X_test, interval_test, models[best_window_size][0],
                 models[best_window_size][1], best_window_size)

w_error = []
c_error = []
for i in range(len(y_test)):
  w_error.append(wer(WHITESPACE.join(y_test[i]), WHITESPACE.join(y_pred[i])))
  c_error.append(cer(WHITESPACE.join(y_test[i]), WHITESPACE.join(y_pred[i])))

mean_w_error = mean(w_error)
mean_c_error = mean(c_error)
print(f'Size of window: {best_window_size}, WER: {mean_w_error}')
print(f'Size of window: {best_window_size}, CER: {mean_c_error}')

Size of window: 4, WER: 0.9640862449516295
Size of window: 4, CER: 0.9713895867744256


In [None]:
for i in range(len(y_test)):
  print(f'Target: {y_test[i]}')
  print(f'Predicted: {y_pred[i]}')

Target: ['P', 'I', 'T', 'T', 'S', 'B', 'U', 'R', 'G', 'H']
Predicted: ['FIVE']
Target: ['GO']
Predicted: ['SIXTY']
Target: ['ONE', 'FIVE', 'TWO', 'ONE', 'THREE']
Predicted: ['J']
Target: ['FOUR', 'ONE', 'TWO', 'TWO', 'SIX', 'EIGHT', 'FOUR', 'ONE', 'FOUR', 'TWO']
Predicted: ['I']
Target: ['B', 'I', 'R', 'C', 'H', 'W', 'O', 'O', 'D']
Predicted: ['N', 'N']
Target: ['C', 'E', 'D', 'A', 'R', 'V', 'I', 'L', 'L', 'E']
Predicted: ['I', 'I', 'I']
Target: ['M', 'Y', 'E', 'R', 'S']
Predicted: ['I']
Target: ['P', 'I', 'T', 'T', 'S', 'B', 'U', 'R', 'G', 'H']
Predicted: ['N']
Target: ['ERASE', 'A', 'B', 'F', 'N', 'Q', 'FIFTY', 'SEVEN']
Predicted: ['N', 'M', 'SIXTY']
Target: ['TWELVE', 'THIRTY', 'THREE']
Predicted: ['SIXTY']
Target: ['W', 'O', 'O', 'D']
Predicted: ['J']
Target: ['ENTER', 'TWO', 'NINE', 'EIGHT', 'ONE']
Predicted: ['SIXTY', 'J']
Target: ['M', 'O', 'R', 'E', 'W', 'O', 'O', 'D']
Predicted: ['N', 'N', 'N', 'N']
Target: ['J', 'A', 'N', 'E', 'T']
Predicted: ['I']
Target: ['L', 'E', 'V', 'I'