In [None]:
# default_exp data

In [None]:
#all_slow

# Data 

> Functions to process your data

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
import os
import re
import json
import numpy as np
from pathlib import Path
from functools import partial

In [None]:
#export
#all_slow
import librosa
import torchaudio
from datasets import load_dataset, load_metric, concatenate_datasets

## Clean up missing files

In [None]:
#export
def file_exists(e):
    e['file_exists'] = os.path.isfile(e['path'])
    return e

In [None]:
#export
def ds_file_exists(ds):
    l_ds = len(ds)
    ds = ds.map(file_exists)
    l_not_found = l_ds-sum(ds['file_exists'])
    if l_not_found == 0: print('All files found')
    else: print(f"{l_not_found} ({(l_not_found/l_ds)*100}%) files not found")
    return ds

In [None]:
#export
def filter_for_exists(ds, drop_exist_col=True):
    # Filter dataset to only files that exist
    ds = ds.filter(lambda example: example['file_exists'])

    if drop_exist_col:
        # drop file_exists column
        ds = ds.remove_columns('file_exists')
    return ds

In [None]:
#export
def drop_missing_files(ds, drop_exist_col=True):
    ds = ds_file_exists(ds)
    ds = filter_for_exists(ds, drop_exist_col)
    return ds

In [None]:
test_ds = load_dataset("common_voice", "ga-IE", split="test", cache_dir='data')
test_ds = test_ds.select([0,1,2,3,4,5,6,7,8,9])
ds = drop_missing_files(test_ds)

Reusing dataset common_voice (data/common_voice/ga-IE/6.1.0/0041e06ab061b91d0a23234a2221e87970a19cf3a81b20901474cffffeb7869f)
Loading cached processed dataset at data/common_voice/ga-IE/6.1.0/0041e06ab061b91d0a23234a2221e87970a19cf3a81b20901474cffffeb7869f/cache-3443a4ebc22019f7.arrow
Loading cached processed dataset at data/common_voice/ga-IE/6.1.0/0041e06ab061b91d0a23234a2221e87970a19cf3a81b20901474cffffeb7869f/cache-f013f7ba1baffc62.arrow


All files found


### Merge datasets

In [None]:
# export
def add_ds(e, new_ds):
    for f in e.keys():
        e[f] = e[f] + new_ds[f]
    return e

Merge two Datasets, note that they need to have the same columns

In [None]:
# export
def merge_ds(ds, new_ds, shuffle=True): 
    add_ds_func = partial(add_ds, new_ds=new_ds)
    ds = ds.map(add_ds_func, batched=True, batch_size=-1, keep_in_memory=True)
    if shuffle: ds = ds.shuffle()
    return ds

In [None]:
ds = merge_ds(test_ds, test_ds)
assert len(ds) == 2*len(test_ds)

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




## Processing

Remove special characters. This can be dataset and language specific, be careful about removing characters that may change the meaning of a word or sentence

In [None]:
#export 
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\(\)\-\*]'

def remove_special_characters(batch, evaluate=False):
    if evaluate: batch["sentence"] = re.sub(chars_to_ignore_regex, '', 
                                            batch["sentence"]).lower()
    else: batch["sentence"] = re.sub(chars_to_ignore_regex, '', 
                                            batch["sentence"]).lower() + " "
        
    batch["sentence"] = re.sub('[\’]', '\'', batch["sentence"])
    batch["sentence"] = re.sub('[\’]', '\'', batch["sentence"])
    batch["sentence"] = re.sub('[\–]', '-', batch["sentence"])
    batch["sentence"] = re.sub('[\—]', '-', batch["sentence"])
    batch["sentence"] = re.sub('[&]', ' and ', batch["sentence"])
    return batch

In [None]:
from datasets import load_metric
wer = load_metric("wer")

import torchaudio
resampler = torchaudio.transforms.Resample(48_000, 16_000)

import re
def speech_file_to_array_fn(batch):
    batch["sentence"] = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower()
    batch["sentence"] = re.sub('[\’]', '\'', batch["sentence"])
    batch["sentence"] = re.sub('[\’]', '\'', batch["sentence"])
    batch["sentence"] = re.sub('[\–]', '-', batch["sentence"])
    batch["sentence"] = re.sub('[\—]', '-', batch["sentence"])
    batch["sentence"] = re.sub('[&]', ' and ', batch["sentence"])
    
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    batch["speech"] = resampler(speech_array).squeeze().numpy()
    return batch

test_ds = test_ds.map(speech_file_to_array_fn)

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))




In [None]:
ds = ds.map(remove_special_characters)

HBox(children=(FloatProgress(value=0.0, max=20.0), HTML(value='')))




## Vocab
Build a vocabulary from the entire set of letters in the dataset

In [None]:
# export
def extract_all_chars(batch):
    '''merge all texts into one and create set'''
    all_text = " ".join(batch["sentence"])
    vocab = list(set(all_text))
    return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
#export
def get_char_vocab(train_ds, test_ds=None):
    train_vocab = train_ds.map(extract_all_chars, batched=True, batch_size=-1, 
                   keep_in_memory=True, remove_columns=train_ds.column_names)
    if test_ds is not None: 
        test_vocab = test_ds.map(extract_all_chars, batched=True, batch_size=-1, 
                   keep_in_memory=True, remove_columns=test_ds.column_names)
        vocab_list = list(set(train_vocab["vocab"][0]) | set(test_vocab["vocab"][0]))
    else:  
        vocab_list = list(set(train_vocab["vocab"][0]))

    vocab_dict = {v: k for k, v in enumerate(vocab_list)}
    return vocab_dict

In [None]:
vocab = get_char_vocab(ds)
assert len(vocab.keys()) == 24

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




In [None]:
# export
def process_vocab(vocab_dict):
    vocab_dict["|"] = vocab_dict[" "]
    del vocab_dict[" "]

    vocab_dict["[UNK]"] = len(vocab_dict)
    vocab_dict["[PAD]"] = len(vocab_dict)
    return vocab_dict

In [None]:
vocab = process_vocab(vocab)

Extract a processed, character level vocab from a Dataset and optioally save it as a json

In [None]:
# export
def extract_vocab(train_ds, test_ds=None, save=True, save_dir='data', fn='vocab.json'):
    vocab = get_char_vocab(train_ds, test_ds)
    vocab = process_vocab(vocab)
    if save:
        Path(f"{save_dir}").mkdir(parents=True, exist_ok=True)
        with open(f'{save_dir}/{fn}', 'w') as vocab_file:
            json.dump(vocab, vocab_file)
    return vocab

In [None]:
vocab = extract_vocab(ds, save=False)

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))




## Extract Audio Data

Extract the audio array and sampling rate from the file and resample

In [None]:
# export
def speech_file_to_array(batch, resample=True, new_sr=16_000, evaluate=False):
    try:
        speech_array, sampling_rate = torchaudio.load(batch["path"])
        
        if resample: 
            if evaluate:
                resampler = torchaudio.transforms.Resample(sampling_rate, new_sr)
                batch["speech"] = resampler(speech_array).squeeze().numpy()
            else:
                batch["speech"] = librosa.resample(np.asarray(speech_array[0].numpy()), sampling_rate, new_sr)
            batch["sampling_rate"] = new_sr
        else: 
            batch["speech"] = speech_array[0].numpy()
            batch["sampling_rate"] = sampling_rate
    except:
        batch["speech"] = np.array([0])
        batch["sampling_rate"] = 0
    return batch

In [None]:
sp2a = partial(speech_file_to_array, new_sr=8_000)
ds =  ds.map(sp2a)

HBox(children=(FloatProgress(value=0.0, max=20.0), HTML(value='')))




In [None]:
## hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_core.ipynb.
Converted 01_data.ipynb.
Converted 02_aug.ipynb.
Converted 03_training.ipynb.
Converted 04_evaluation.ipynb.
Converted index.ipynb.
