Acoustic Model. Please read the following definitions and
proceed to additional instructions at the end of the file.

You will need to install these packages: g2p-en, torch, torchaudio

In [56]:
from g2p_en import G2p

import torch
import torch.nn as nn
import torchaudio

from praatio import textgrid
from praatio.data_classes.interval_tier import Interval
from tqdm import tqdm

In [57]:
def make_frames(wav):
    return torchaudio.compliance.kaldi.mfcc(wav)

In [58]:
class LibriSpeech(torch.utils.data.Dataset):
    def __init__(self, url='dev-clean'):
        super().__init__()
        self.librispeech = torchaudio.datasets.LIBRISPEECH('.', url=url, download=True)

    def __len__(self):
        return len(self.librispeech)

    def __getitem__(self, index):
        wav, sr, text, speaker_id, chapter_id, utterance_id = self.librispeech[index]
        return make_frames(wav), text, (speaker_id, chapter_id, utterance_id) # changed return, if we have those id returning why not use them?

In [59]:
class Encoder(nn.Module):
    def __init__(self, input_dim=13, subsample_dim=128, hidden_dim=1024):
        super().__init__()
        self.subsample = nn.Conv1d(input_dim, subsample_dim, 5, stride=4, padding=3)
        self.lstm = nn.LSTM(subsample_dim, hidden_dim, batch_first=True, num_layers=3, dropout=0.2)

    def subsampled_lengths(self, input_lengths):
        # https://github.com/vdumoulin/conv_arithmetic
        p, k, s = self.subsample.padding[0], self.subsample.kernel_size[0], self.subsample.stride[0]
        o = input_lengths + 2 * p - k
        o = torch.floor(o / s + 1)
        return o.int()

    def forward(self, inputs):
        x = inputs
        x = self.subsample(x.mT).mT
        x = x.relu()
        x, _ = self.lstm(x)
        return x.relu()

In [60]:
class Vocabulary:
    def __init__(self):
        self.g2p = G2p()

        # http://www.speech.cs.cmu.edu/cgi-bin/cmudict
        self.rdictionary = ["ε", # CTC blank
                            " ",
                            "AA0", "AA1", "AE0", "AE1", "AH0", "AH1", "AO0", "AO1", "AW0", "AW1", "AY0", "AY1",
                            "B", "CH", "D", "DH",
                            "EH0", "EH1", "ER0", "ER1", "EY0", "EY1",
                            "F", "G", "HH",
                            "IH0", "IH1", "IY0", "IY1",
                            "JH", "K", "L", "M", "N", "NG",
                            "OW0", "OW1", "OY0", "OY1",
                            "P", "R", "S", "SH", "T", "TH",
                            "UH0", "UH1", "UW0", "UW1",
                            "V", "W", "Y", "Z", "ZH"]

        self.dictionary = {c: i for i, c in enumerate(self.rdictionary)}

    def __len__(self):
        return len(self.rdictionary)

    def encode(self, text):
        labels = [c.replace('2', '0') for c in self.g2p(text) if c != "'"]
        targets = torch.LongTensor([self.dictionary[phoneme] for phoneme in labels])
        return targets
    
    def to_phonems(self, tokens): # 
        return [self.rdictionary[token] for token in tokens]

In [61]:
class Recognizer(nn.Module):
    def __init__(self, feat_dim=1024, vocab_size=55+1):
        super().__init__()
        self.classifier = nn.Linear(feat_dim, vocab_size)

    def forward(self, features):
        features = self.classifier(features)
        return features.log_softmax(dim=-1)

In [62]:
vocab = Vocabulary()
encoder = Encoder()
recognizer = Recognizer()

In [63]:
ckpt = torch.load('lstm_p3_360+500.pt', map_location='cpu')
encoder.load_state_dict(ckpt['encoder'])
recognizer.load_state_dict(ckpt['recognizer'])

<All keys matched successfully>

In [64]:
audio_frames, text, IDTuple = LibriSpeech()[0]
phonemes = vocab.encode(text)

features = encoder(audio_frames)
outputs = recognizer.forward(features) # (T, 55+1)

#
# Your task is to decode a sequence of vocabulary entries from a sequence of distributions
# over vocabulary entries (including blank ε that means "no output").
#
# outputs have dimension (T, V) where V is vocab_size+1 and T is time.
# outputs[:,0] is the log probability of a blank emission at every time step.
#
# Because of the subsampling done by Conv1d the time dimension in the outputs is 4 times smaller
# than in the inputs.
#

# Decoding

In [65]:
print(f'text: {text}')
print(f'true phonemes: {"".join(vocab.to_phonems(phonemes))}')
print(f'pred phonemes (with deleted silence): {"".join([phonem for phonem in vocab.to_phonems(torch.argmax(outputs, dim=1)) if phonem != "ε"])}')

text: MISTER QUILTER IS THE APOSTLE OF THE MIDDLE CLASSES AND WE ARE GLAD TO WELCOME HIS GOSPEL
true phonemes: MIH1STER0 KWIH1LTER0 IH1Z DHAH0 AH0PAA1SAH0L AH1V DHAH0 MIH1DAH0L KLAE1SAH0Z AH0ND WIY1 AA1R GLAE1D TUW1 WEH1LKAH0M HHIH1Z GAA1SPAH0L
pred phonemes (with deleted silence): MIH1STER0  KRIH1LTTER0  IH1Z DHAH0  AH0PPAA1SAH0AH0LL  AH1V DHAH0 MIH1DAH0LL KLLAE1SAH0Z   AH0ND WIH1RR  GLLAE1D TWWWEH1LKAH0M   HHIH1Z   GAA1SSPPAH0L


In [69]:
for audio_frames, text, IDTuple in tqdm(LibriSpeech()):
    features = encoder(audio_frames)
    outputs = recognizer.forward(features)

    tg = textgrid.Textgrid(minTimestamp=0, maxTimestamp=audio_frames.size()[0]/100) # length from 0 to length of audio_file in seconds

    phones_tier = textgrid.IntervalTier('phonemes', [], 0, audio_frames.size()[0]/100)

    intervals = []
    decoded_output_tokens = vocab.to_phonems(torch.argmax(outputs, dim=1))
    prev_token, prev_start = None, 0

    for i, token in enumerate(decoded_output_tokens):
        if prev_token != token and prev_token:
            intervals.append(Interval(prev_start, i/25 - 0.22, prev_token)) # since output frame is 25 frames per second we divide i by 25
            prev_token = token
            prev_start = i/25 - 0.22
        elif not prev_token:
            prev_token = token
    if prev_token:
        intervals.append(Interval(prev_start, tg.maxTimestamp, prev_token))
        
    try:
        new_phonemes_tier = phones_tier.new(entries=intervals)
        tg.addTier(new_phonemes_tier)
    except:
        print(f'problem with intervals: {intervals[-1]}, id: {IDTuple[0]}-{IDTuple[1]}-{IDTuple[2]}')
        intervals = intervals[:-1]
        new_phonemes_tier = phones_tier.new(entries=intervals)
        tg.addTier(new_phonemes_tier)
    finally:
        tg.save(f'textgrids/{IDTuple[0]}-{IDTuple[1]}-{IDTuple[2]}.TextGrid',
                includeBlankSpaces=True,
                format='long_textgrid',
                reportingMode='error')

100%|██████████| 2703/2703 [40:59<00:00,  1.10it/s]  
