In [None]:
from pathlib import Path

credentials_path = Path('/content/ServiceAccountKey.json')
audio_store_dir = Path('./audio-data/downloaded')
normalized_store_dir = Path('./audio-data/normalized')

# check if the key exist
assert credentials_path.exists(), "The %s can't be found. It must be store in the %s" % (credentials_path.name, credentials_path.parent)

# constants
# -----------------------------

DATAPOINT_NAME = 'idata-a3aae'
BUCKET_NAME = '%s.appspot.com' % (DATAPOINT_NAME)
DATABASE_URL = 'https://%s.firebaseio.com' % (DATAPOINT_NAME)

SERVICE_ACCOUNT_JSON = str(credentials_path)

# METADATA_CSV = str(Path('./metadata-40229.csv'))
METADATA_CSV = str(Path('./metadata.csv'))

# make the directory
audio_store_dir.mkdir(parents=True, exist_ok=True)
normalized_store_dir.mkdir(parents=True, exist_ok=True)


## Downloading and Mapping the audio files with the Text

This section downloads the audio data and the text files


In [None]:
# Loading the firestore compnent
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore

# Use a service account
cred = credentials.Certificate(str(credentials_path))
firebase_admin.initialize_app(cred)

db = firestore.client()

In [None]:
# checking the data
import re
import requests
from tqdm import tqdm

REGEX_URL = r"https://firebasestorage.googleapis.com/v0/b/{}/o/speech%2F([\w.-]+)?.".format(BUCKET_NAME)
ALLOWED_AUDIO = ['aac', 'wav']

def is_allowed_audio(file_name: str):
    """Checks if the audio file is allowed"""
    return file_name.split(".")[-1] in ALLOWED_AUDIO

def get_file_name(url_audio: str):
    """gets the file name from the path"""
    return re.findall(REGEX_URL, url_audio)[0]

def download_audio_file(url_audio: str, file_name: str):
    """downloads the audio file from the given"""
    r = requests.get(url_audio, allow_redirects=True)

    with open(file_name, 'wb') as af:
        af.write(r.content)


LIMIT = 1621 # to set the number of audio files to download + 1

# load them into ram first
samples = [snapshot for snapshot in db.collection('speech_to_text').stream()]
skipped = 0

with open('metadata.csv', mode='w') as mdb:
    mdb.write('id,name,ext,text\n')
    for ix, sample in tqdm(enumerate(samples)):
        entry = sample.to_dict()
        url_audio = entry['sound']

        audio_file_name = get_file_name(url_audio)
        write_name = audio_file_name.split(".")[0]
        extension = audio_file_name.split(".")[-1]

        if not is_allowed_audio(audio_file_name):
            skipped += 1
            continue
            
        # downloads the audio
        download_audio_file(url_audio, str(audio_store_dir.joinpath(audio_file_name)))

        # adds a record of the audio file to the 'metadata.csv'
        mdb.write("%s\n" % ",".join([sample.id, write_name, extension, entry['text'].strip().lower()]))

        if ix + 1 == LIMIT: break
    
print('Skipped audio count:', skipped)

1620it [05:11,  5.61it/s]

Skipped audio count: 0


## Perfoming Inspection of the metadata

Should also perform storage of the data as well

In [None]:
import pandas as pd

md_df = pd.read_csv(METADATA_CSV, index_col=0, header=0, sep=','); md_df.head()

Unnamed: 0_level_0,name,ext,text
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
00AUezIZGu7x3C1SaIgJ,502bcb3c-d2f8-493f-929c-6928f8554d59,aac,hilo
00BDU6nJK1PNjAInoaon,8112c994-1810-4a04-ae78-62974bcee082,aac,kati
00E5cDFUpKTYFM4hAQzd,377d5c20-17ef-4534-8fa2-af9c9b18a630,wav,wali na jodari
00KEMdZxRUekU6U2B0xt,ab3236c8-eebe-4086-8964-06f38af38b3c,wav,saivi
00eGG97rFTfplqGzPAtL,eece8aea-0313-45b7-82e4-7184b4c39776,aac,kikao hicho


In [None]:
import itertools as it

word_ls = md_df.loc[:, 'text'].unique().tolist()

# Only take proper strings
word_ls = [ w for w in word_ls if isinstance(w, str)]

In [None]:
from collections import Counter
from functools import reduce
import operator

character_list = reduce(operator.add, [Counter(w) for w in word_ls])

In [None]:
# the information summary information
# -----------------------

# contains the character count
# convert ' ' into <SPACE>
import re
from collections import OrderedDict

charMapCounter = OrderedDict()
for k, v in character_list.items():
    if re.match(r'^\s+$', k):
        charMapCounter['<SPACE>'] = v
    else:
        charMapCounter[k] = v


Saving the summary file

In [None]:
char_summary = pd.DataFrame.from_dict({
    'char': list(charMapCounter.keys()),
    'count': list(charMapCounter.values())
})
char_summary.to_csv('char_summary.csv')

## Converting the audio data

Converts the audio data into appropriate formats


In [None]:
RATE = 16000
BITRATE = '192k'
AUDIO_CHANNELS = 1 # 1 is 'mono'

In [None]:
"""
Using ffmpeg, normalized all the audio files into a 
stable format for further processing
"""

import os
from tqdm import tqdm

for audio_file in tqdm(audio_store_dir.glob('*.*')):    
    input_path = str(audio_file)
    output_path = "%s.wav" % (str(audio_file.name).split(".")[0])
    output_path = str(normalized_store_dir.joinpath(output_path))

    os.system('ffmpeg  -i {0} -vn -ar {2} -ab {3} -ac {4} -f wav {1} -y'.\
                format(
                    input_path, output_path,
                    RATE,
                    BITRATE,
                    AUDIO_CHANNELS
                )
            )


0it [00:00, ?it/s][A
1it [00:01,  1.51s/it][A
3it [00:01,  1.08s/it][A
5it [00:01,  1.30it/s][A
7it [00:01,  1.79it/s][A
9it [00:01,  2.45it/s][A
11it [00:02,  3.28it/s][A
13it [00:02,  4.32it/s][A
15it [00:02,  5.54it/s][A
17it [00:02,  6.93it/s][A
19it [00:02,  8.40it/s][A
21it [00:02,  9.81it/s][A
23it [00:02, 11.18it/s][A
25it [00:02, 12.43it/s][A
27it [00:03, 13.55it/s][A
29it [00:03, 14.41it/s][A
31it [00:03, 15.03it/s][A
33it [00:03, 15.40it/s][A
35it [00:03, 15.80it/s][A
37it [00:03, 15.91it/s][A
39it [00:03, 15.97it/s][A
41it [00:03, 16.07it/s][A
43it [00:04, 16.21it/s][A
45it [00:04, 16.34it/s][A
47it [00:04, 16.22it/s][A
49it [00:04, 16.17it/s][A
51it [00:04, 16.40it/s][A
53it [00:04, 16.66it/s][A
55it [00:04, 16.47it/s][A
57it [00:04, 16.49it/s][A
59it [00:05, 16.60it/s][A
61it [00:05, 16.71it/s][A
63it [00:05, 16.58it/s][A
65it [00:05, 16.21it/s][A
67it [00:05, 16.29it/s][A
69it [00:05, 16.56it/s][A
71it [00:05, 16.65it/s][A
73it [00:0

## Sanity Checking

Checking if the audio data is properly mapped with the text

In [None]:
# Sanity checking the audio files
# ------------------------------------
"""
Findings; There are some funny audio with details
7 - (long timestamp, low voiced and quick phrase)
"""

import pandas as pd
import IPython.display as ipd

md_df = pd.read_csv(METADATA_CSV, index_col=0, header=0, sep=',')

AUDIO_INDEX = 28  # Points to the audio data to read

audio_full_path = list(normalized_store_dir.glob('*.*'))[AUDIO_INDEX]
file_name_wo_ext = str(audio_full_path.name.split(".")[0])
print("Audio filename:", file_name_wo_ext)

spoken_text = str(md_df.loc[md_df.name == file_name_wo_ext, 'text'].values[0])
print("Text: '%s'\n" % spoken_text)

ipd.Audio(filename=str(audio_full_path))

Audio filename: 4ac94ee3-c552-43dd-8029-96804df66e96
Text: 'kujitokeza nyakati za jioni'



# Where it all begins

This section is where we set up the collected data for model training and evaluation


## Data preparation

In [None]:
!pip install torchaudio -f https://download.pytorch.org/whl/torch_stable.html

Looking in links: https://download.pytorch.org/whl/torch_stable.html
Collecting torchaudio
[?25l  Downloading https://files.pythonhosted.org/packages/e9/0a/40e53c686c2af65b2a4e818d11d9b76fa79178440caf99f3ceb2a32c3b04/torchaudio-0.5.1-cp36-cp36m-manylinux1_x86_64.whl (3.2MB)
[K     |████████████████████████████████| 3.2MB 2.8MB/s 
Installing collected packages: torchaudio
Successfully installed torchaudio-0.5.1


### Data preprocessing

Setting up Helper functions that map out characters


In [None]:
import re
from typing import Dict, List

class CharacterEncoder(object):
    """Maps characters in a string as sequence of characters
    the space character should be <SPACE>
    Args:
        file_name (str): The path of the csv file that contains
            the characters to map
        data (`List[str]`): The list of characters that are 
            to be used for mapping
    """

    # to indicate space
    _SPACE_ = '<SPACE>'

    def __init__(self, file_name: str=None, data: List[str]=None):
        # TODO: This currently assumes data only.
        #  add feature to support file_name
        self.char2ix = dict(zip(data, range(len(data) + 1)))
        self.ix2char = { v: k for k, v in self.char2ix.items() }
    
    def encode(self, text: str) -> List[int]:
        """
        Use a character map and convert text to an integer sequence.
        Notice that spaces in the text are also encoded with 1.
        
        args: text - text to be encoded
        """
        try:
            text = text.strip()
        except:
            print("ERR Text:", text)
            assert False
            
        characters = [ c if not re.match(r'\s', c) else self._SPACE_ for c in list(text) ]
        return [ self.char2ix[c] for c in characters ]
    
    def decode(self, indices: List[int]) -> str:
        """ 
        Use a character map and convert integer labels to a text sequence.
        It converts each integer into its corresponding char and joins the chars to form strings.
        Notice that the strings will be separated wherever the integer 1 appears.
        
        args: labels - integer values to be converted to texts(chars)
        """
        characters = [ self.ix2char[ix] for ix in indices ]
        return "".join([c if c != self._SPACE_ else ' ' for c in characters ])

    @property
    def BLANK_INDEX(self):
        return self.count
        
    @property
    def count(self):
        """Returns the number of characters"""
        return len(self.char2ix)


Setting up the the tools for preprocessing

### Setting up the data

Setting up the data for preprocessing. As well training, validation and testing



In [None]:
from pathlib import Path
from typing import List, Tuple
import torchaudio

class AudioDataBuilder(object):
    def __init__(self, metadata_csv: str):
        self.md_df = pd.read_csv(metadata_csv, index_col=0, header=0, sep=',').dropna()
        pass

    def get_file_names(self, path_list: List[Path]):
        """
        Gets the file name from the audio file path
        """
        return [apth.name.split(".")[0] for apth in path_list]
    
    def get_speech_text(self, audio_name: str) -> str:
        """
        Gets the list of the corresponding speech file
        """
        df = self.md_df
        output = df.loc[df.name == audio_name, 'text'].values.tolist()
        return output[0] if len(output) > 0 else None

    def load_data(self, dir: str, validation_split: int=0.0) -> Tuple[tuple, tuple]:
        audio_file_paths = list(Path(dir).glob('*.*'))

        ix_count = len(audio_file_paths)
        print("Number of audio files in '%s': %d" % (dir, ix_count))
        assert 0 < validation_split < 1, "Value has to be between (0, 1)"

        train_count = int(ix_count * (1 - validation_split))
        
        if validation_split == 0.0:
            # no validation split
            train_data_paths = audio_file_paths
            val_data_paths = None
        else:
            # splitting into train and validation
            train_data_paths = audio_file_paths[:train_count]
            val_data_paths = audio_file_paths[train_count:]

        # train data (and validation data)
        # --------------------------------
        # contains audio, sample_data, speech_text
        train_speech_texts = [self.get_speech_text(name) for name in self.get_file_names(train_data_paths) if bool(self.get_speech_text(name))]
        train_output = list(zip([torchaudio.load(str(path))[0] for path in train_data_paths],
                            train_speech_texts))
        
        if validation_split > 0.0:
            # return both the train and validation data
            val_speech_texts = [self.get_speech_text(name) for name in self.get_file_names(val_data_paths) if bool(self.get_speech_text(name))]
            val_output = list(zip([torchaudio.load(str(path))[0] for path in val_data_paths],
                                val_speech_texts))
        
            return list(zip(*train_output)), list(zip(*val_output))
        else:
            # if there is not validation split, 
            #  only return the train output
            return list(zip(*train_output))     # this trick is equivalent to transposing

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence

# FIXME: padd everything first
# TODO: might want to turn this into a generator
def preprocessor(input_data: List[torch.Tensor],
                 text_data: List[str],
                 transform: nn.Module,
                 text_encoder: CharacterEncoder,
                 batch_size=0):
    
    count = len(input_data)

    # items should match the number of batches
    collection = []
    input_lengths = []
    label_lengths = []

    assert isinstance(batch_size, int), "Batch size has to be indicated by integer"

    if batch_size <= 1:
        batches = 1
        print("No batches")
    else:
        batches = count // batch_size
        print("# batches:", batches)

    for batch in range(batches):
        frm = int(count* (batch / batches))
        to = int(count* ((batch+1) / batches))

        inps = input_data[frm: to]
        texts = text_data[frm: to]

        # decode the information appropriately
        data = [transform(inp).squeeze(0).transpose(0, 1) for inp in inps]
        encoded_texts = [torch.tensor(text_encoder.encode(text)) for text in texts]

        # information for different lengths in each item in batch
        input_length = [ dt.shape[0] // 2 for dt in data ]
        label_lengths = [ len(label) for label in encoded_texts ]

        # pad the data to appropriate with
        data = pad_sequence(data, batch_first=True).unsqueeze(1).transpose(2,3)
        labels = pad_sequence(encoded_texts, batch_first=True)


        # store them in batches
        collection.append((data, labels, input_length, label_lengths))

    assert len(collection) == batches, "The number of batches are not the same. %d !== %d" % (len(collection), batches)
    
    return collection

#### Performing the preprocessing


In [None]:
import pandas as pd

# get the sorted characters
# and sets up the character encoder
charset = sorted(pd.read_csv('./char_summary.csv', index_col=0)['char'].values.tolist())
text_encoder = CharacterEncoder(data=charset)

In [None]:
import torch.nn as nn
import torchaudio

# Setting up the audio transformers
# useful for preprocessing

# For training
# -------------
train_audio_transforms = nn.Sequential(
    torchaudio.transforms.MelSpectrogram(sample_rate=RATE, n_mels=128),
    torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
    torchaudio.transforms.TimeMasking(time_mask_param=35)
)

# For validation
# --------------
val_audio_transforms = torchaudio
val_audio_transforms = torchaudio.transforms.MelSpectrogram()


In [None]:
abuilder = AudioDataBuilder(metadata_csv='./metadata.csv')
train_data, val_data = abuilder.load_data(dir='./audio-data/normalized', 
                                          validation_split=0.3)

Number of audio files in './audio-data/normalized': 1615


In [None]:
BATCH_SIZE = 64

# pre processing for the training data
tinp, tlabel = train_data
train_batch_data = preprocessor(input_data=tinp,
                                text_data=tlabel,
                                transform=train_audio_transforms, 
                                text_encoder=text_encoder,
                                batch_size=BATCH_SIZE),

# preprocessing for validation data
vinp, vlabel = val_data
val_batch_data = preprocessor(input_data=vinp,
                                text_data=vlabel,
                                transform=val_audio_transforms, 
                                text_encoder=text_encoder),
          

# batches: 17
No batches


## Modelling

### Model Structuring

Components to structure the different parts of the model


In [None]:
class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 

class ResidualCNN(nn.Module):
    """
    Residual CNN inspired by https://arxiv.org/pdf/1603.05027.pdf except with layer norm instead of batch norm.
    Used to extract features from the spectrograms
        
    """
    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.gelu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.gelu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)

class BidirectionalGRU(nn.Module):
    """
    BiGRU network for the encoder to learn the pattern in the features extracted using the ResCNN
    """

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(input_size=rnn_dim, hidden_size=hidden_size, num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x

class SpeechRecognitionModel(nn.Module):
    '''
    Pipeline for the entire encoder architecture
    
    '''
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
                                            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
                                            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
                                            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2, hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
                                            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
                                        nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
                                        nn.GELU(),
                                        nn.Dropout(dropout),
                                        nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x


### Model Training

Setting configuration to train on

In [None]:
import torch

# setting random seed
torch.manual_seed(7)

# checks if gpu is available
use_cuda = torch.cuda.is_available() 
device = torch.device("cuda:0" if use_cuda else "cpu")

learning_rate = 1e-4

CHARS_COUNT = text_encoder.count

# Configurations
model_params = dict(
    n_cnn_layers = 3, 
    n_rnn_layers = 5, 
    rnn_dim = 512,
    n_class = CHARS_COUNT + 1, # the plus 1 is for the blank index
    n_feats = 128
)

EPOCHS = 100

In [None]:
"""
Training the model on the appropriate configs (set above)
"""
import torch.optim as optim
import torch.nn as nn

model = SpeechRecognitionModel(**model_params).to(device)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
loss_fn = nn.CTCLoss(blank=text_encoder.BLANK_INDEX).to(device)

# # Scheduler
# scheduler = optim.lr_scheduler.OneCycleLR(optimizer, 
#                                           max_lr=hparams['learning_rate'],
#                                           steps_per_epoch=int(len(train_loader)),
#                                           epochs=EPOCHS,
#                                         anneal_strategy='linear')

### Storing the model  evaluations metrics and algorithms

Setting up code to store the evaluations / model


In [None]:
def avg_wer(wer_scores, combined_ref_len):
    return float(sum(wer_scores)) / float(combined_ref_len)


def _levenshtein_distance(ref, hyp):
    """Levenshtein distance is a string metric for measuring the difference
    between two sequences. Informally, the levenshtein disctance is defined as
    the minimum number of single-character edits (substitutions, insertions or
    deletions) required to change one word into the other. We can naturally
    extend the edits to word level when calculate levenshtein disctance for
    two sentences.
    """
    m = len(ref)
    n = len(hyp)

    # special case
    if ref == hyp:
        return 0
    if m == 0:
        return n
    if n == 0:
        return m

    if m < n:
        ref, hyp = hyp, ref
        m, n = n, m

    # use O(min(m, n)) space
    distance = np.zeros((2, n + 1), dtype=np.int32)

    # initialize distance matrix
    for j in range(0,n + 1):
        distance[0][j] = j

    # calculate levenshtein distance
    for i in range(1, m + 1):
        prev_row_idx = (i - 1) % 2
        cur_row_idx = i % 2
        distance[cur_row_idx][0] = i
        for j in range(1, n + 1):
            if ref[i - 1] == hyp[j - 1]:
                distance[cur_row_idx][j] = distance[prev_row_idx][j - 1]
            else:
                s_num = distance[prev_row_idx][j - 1] + 1
                i_num = distance[cur_row_idx][j - 1] + 1
                d_num = distance[prev_row_idx][j] + 1
                distance[cur_row_idx][j] = min(s_num, i_num, d_num)

    return distance[m % 2][n]


def word_errors(reference, hypothesis, ignore_case=False, delimiter=' '):
    """Compute the levenshtein distance between reference sequence and
    hypothesis sequence in word-level.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param delimiter: Delimiter of input sentences.
    :type delimiter: char
    :return: Levenshtein distance and word number of reference sentence.
    :rtype: list
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    ref_words = reference.split(delimiter)
    hyp_words = hypothesis.split(delimiter)

    edit_distance = _levenshtein_distance(ref_words, hyp_words)
    return float(edit_distance), len(ref_words)


def char_errors(reference, hypothesis, ignore_case=False, remove_space=False):
    """Compute the levenshtein distance between reference sequence and
    hypothesis sequence in char-level.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Levenshtein distance and length of reference sentence.
    :rtype: list
    """
    if ignore_case == True:
        reference = reference.lower()
        hypothesis = hypothesis.lower()

    join_char = ' '
    if remove_space == True:
        join_char = ''

    try:
        reference = join_char.join(filter(None, reference.split(' ')))
        hypothesis = join_char.join(filter(None, hypothesis.split(' ')))
    except:
        print('Reference:', reference)
        print('Hypothesis:', hypothesis)

        assert False, "Check out the error"
        
    edit_distance = _levenshtein_distance(reference, hypothesis)
    return float(edit_distance), len(reference)


def wer(reference, hypothesis, ignore_case=False, delimiter=' '):
    """Calculate word error rate (WER). WER compares reference text and
    hypothesis text in word-level. WER is defined as:
    .. math::
        WER = (Sw + Dw + Iw) / Nw
    where
    .. code-block:: text
        Sw is the number of words subsituted,
        Dw is the number of words deleted,
        Iw is the number of words inserted,
        Nw is the number of words in the reference
    We can use levenshtein distance to calculate WER. Please draw an attention
    that empty items will be removed when splitting sentences by delimiter.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param delimiter: Delimiter of input sentences.
    :type delimiter: char
    :return: Word error rate.
    :rtype: float
    :raises ValueError: If word number of reference is zero.
    """
    edit_distance, ref_len = word_errors(reference, hypothesis, ignore_case,
                                         delimiter)

    if ref_len == 0:
        raise ValueError("Reference's word number should be greater than 0.")

    wer = float(edit_distance) / ref_len
    return wer


def cer(reference, hypothesis, ignore_case=False, remove_space=False):
    """Calculate charactor error rate (CER). CER compares reference text and
    hypothesis text in char-level. CER is defined as:
    .. math::
        CER = (Sc + Dc + Ic) / Nc
    where
    .. code-block:: text
        Sc is the number of characters substituted,
        Dc is the number of characters deleted,
        Ic is the number of characters inserted
        Nc is the number of characters in the reference
    We can use levenshtein distance to calculate CER. Chinese input should be
    encoded to unicode. Please draw an attention that the leading and tailing
    space characters will be truncated and multiple consecutive space
    characters in a sentence will be replaced by one space character.
    :param reference: The reference sentence.
    :type reference: basestring
    :param hypothesis: The hypothesis sentence.
    :type hypothesis: basestring
    :param ignore_case: Whether case-sensitive or not.
    :type ignore_case: bool
    :param remove_space: Whether remove internal space characters
    :type remove_space: bool
    :return: Character error rate.
    :rtype: float
    :raises ValueError: If the reference length is zero.
    """
    edit_distance, ref_len = char_errors(reference, hypothesis, ignore_case,
                                         remove_space)

    if ref_len == 0:
        raise ValueError("Length of reference should be greater than 0.")

    cer = float(edit_distance) / ref_len
    return cer

In [None]:
import torch

def GreedyDecoder(output, 
                  labels, 
                  label_lengths,
                  text_encoder: CharacterEncoder, 
                  collapse_repeated=True):
    
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    targets = []
    blank_label = text_encoder.BLANK_INDEX
    
    for i, args in enumerate(arg_maxes):
        decoded = []
        decode_text = text_encoder.decode(labels[i][:label_lengths[i]].tolist())
        targets.append(decode_text)
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue

                decoded.append(index.item())

        decodes.append(text_encoder.decode(decoded))

    return decodes, targets

## Model Training and Evaluation

In [None]:
"""

FIXME: PLEASE
Using training and validation set for training and validation
of the models performance
"""
import numpy as np
import torch.nn.functional as F

# perform evaluation every 20 epochs
eval_period = 2

# print information every 100 epochs
verbose_period = 100

# FIXME: so that there is no need to 0 index the data
# train and validation batch data
#  NOTE: val_data is not in batches
train_bdata, val_data = train_batch_data[0], val_batch_data[0][0]


# for every epoch
for epoch in range(0, EPOCHS):
    if epoch % (verbose_period - 1) == 0 or \
        (epoch + 1) % eval_period == 0 or epoch + 1 == EPOCHS or epoch == 0:
        print('Epoch: [{:03d}/ {:03d}]: '.format(epoch+1, EPOCHS),  end="")

    # for each batch
    for bix, data in enumerate(train_bdata):
        spectrograms, labels, input_lengths, label_lengths = data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        output = model(spectrograms)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1)
        
        loss = loss_fn(output, labels, input_lengths, label_lengths)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # # Print some output after every 'verbose_period' epoch
        # #  or last epoch
        # if epoch % (verbose_period - 1) == 0 or epoch + 1 == EPOCHS:
        #     print('Batch: {:02d}\{:02d}, Loss: {}'. \
        #               format(bix + 1, len(train_bdata), loss.item()))


    # perform evaluation after every 'eval_period' epoch
    #  or last epoch
    if (epoch+1) % eval_period == 0 or epoch + 1 == EPOCHS or epoch == 0:
        # evaluation using 'val_data'
        spectrograms, labels, input_lengths, label_lengths = val_data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        with torch.no_grad():
            test_cer, test_wer = [], []
            output = model(spectrograms)
            output = F.log_softmax(output, dim=2)

            loss = loss_fn(output.transpose(0, 1), labels, input_lengths, label_lengths)
            decoded_preds, decoded_targets = GreedyDecoder(output, 
                                                 labels, 
                                                 label_lengths,
                                                 text_encoder=text_encoder)
            for j in range(len(decoded_preds)):
                test_cer.append(cer(decoded_targets[j], decoded_preds[j]))
                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))

            avg_loss = loss.item() / len(spectrograms)
            avg_cer = np.mean(test_cer)
            avg_wer = np.mean(test_wer)

            print('Avg Val Loss: {} | Avg CER: {} | Avg WER: {}'. \
                  format(avg_loss, avg_cer, avg_wer), end="")  
        
    if epoch % (verbose_period - 1) == 0 or \
        (epoch + 1) % eval_period == 0 or epoch + 1 == EPOCHS:
        print("") 
         

Epoch: [001/ 100]: Avg Val Loss: 0.008858423134715286 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [002/ 100]: Avg Val Loss: 0.007117861324978858 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [004/ 100]: Avg Val Loss: 0.006887579947402796 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [006/ 100]: Avg Val Loss: 0.006868071408615899 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [008/ 100]: Avg Val Loss: 0.006860439556161153 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [010/ 100]: Avg Val Loss: 0.006856068876600757 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [012/ 100]: Avg Val Loss: 0.00684493104207147 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [014/ 100]: Avg Val Loss: 0.006840753555297852 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [016/ 100]: Avg Val Loss: 0.006841117819559943 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [018/ 100]: Avg Val Loss: 0.006836127743278582 | Avg CER: 1.0 | Avg WER: 0.9986254295532646
Epoch: [020

## Model Testing

In [None]:
import torch

def test_decoder(output,
                 text_encoder: CharacterEncoder, 
                 collapse_repeated=True):

    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    blank_label = text_encoder.BLANK_INDEX
    
    for i, args in enumerate(arg_maxes):
        decoded = []
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j -1]:
                    continue
                decoded.append(index.item())
            
        decodes.append(text_encoder.decode(decoded))

    return decodes,

In [None]:
import torchaudio
import torch.nn.functional as F
from pathlib import Path
import IPython.display as ipd

all_audio_files = sorted([file for file in Path('./audio-data/normalized/').glob('*.*')])

# ----------------------------
# First read about the different things that might cause
# model to perform badly interms of speech
#
# You can use these case audio files. If there aret any, 
# this could be our reseach
# ----------------------------------------------------
# Case #    |   index no.   |   description     
#     1     |     535       | pronouciation - bad)[65229a0b-089c-4bab-a8f4-40c18894f3b6.wav]
#     2     |     785       | Account for work which may be difficult to pronounce and capture (sheikh)
#     3     |     600       | Perfect transalation (AWESOME)
#     4     |     983       | Bad examples make the model understand utterances differently[f7539de3-5cae-49df-a04e-37e6bfc46947]

print("Number of files:", len(all_audio_files))
audio_path = all_audio_files[75]

df = pd.read_csv('./metadata.csv', index_col=0)

name_of_file = audio_path.name.split(".")[0]
# name_of_file = '65229a0b-089c-4bab-a8f4-40c18894f3b6'

results = df.loc[df.name == name_of_file, 'text'].values.tolist()

if len(results) == 0:
    raise RuntimeError("This audio file doesnt have a corresponding text file. %s" % (name_of_file))

actual_text = results[0]

audio_arr, _ = torchaudio.load(str(audio_path))
test_transform = torchaudio.transforms.MelSpectrogram()

spect = test_transform(audio_arr).to(device).unsqueeze(1); spect.shape

print("Actual text: %s (File name: %s)" % (actual_text, name_of_file))
ipd.Audio(str(audio_path))

Number of files: 1615
Actual text: anapenda kwenda jumuia kila (File name: 191b4603-e98d-42a5-b769-c041db7eaf92)


In [None]:
output = model(spect.to(device))
output = F.log_softmax(output, dim=2)

decoded_text = test_decoder(output, text_encoder)
print("Text:", decoded_text)
# blank_label = text_encoder.BLANK_INDEX

Text: (['anafpena ana jmiumena ksia'],)
