# Finetune wav2vec2 for Lingala
This notebook is adapted from Fine-tuning Wav2Vec2 for Turkish ASR to train Lingala ASR using our own dataset.

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)

# installs and imports

In [None]:
import sys

In [None]:
!{sys.executable} -m pip install datasets==1.11.0
!{sys.executable} -m pip install transformers==4.9.1
!{sys.executable} -m pip install jiwer
!{sys.executable} -m pip install ipywidgets

In [None]:
import numpy as np
import zipfile
import csv

In [None]:
with open('/home/ubuntu/LingalaAudio/pyLingala-master/data/train.csv', newline='',encoding='UTF-8') as f:
      reader = csv.reader(f)
      data = list(reader)
      train_data = [data[i] for i in range(len(data)) if i!=0]

with open('/home/ubuntu/LingalaAudio/pyLingala-master/data/test.csv', newline='',encoding='UTF-8') as f:
      reader = csv.reader(f)
      data = list(reader)
      t_data = [data[i] for i in range(len(data)) if i!=0]

In [None]:
#Get valid indices
import random
random.seed(42) #this seed was used specifically to compare with Okwugbe model



v = 300 #200 samples for valid. Change as you want
test_list = [i for i in range(len(t_data))]
valid_indices = random.choices(test_list, k=v)


test_data = [t_data[i] for i in range(len(t_data)) if i not in valid_indices]
valid_data = [t_data[i] for i in range(len(t_data)) if i in valid_indices]

In [None]:
def create_json_file(d):
  utterance = d[2]
  wav_path =d[0]
  wav_path = wav_path.replace("/home/ubuntu/organised_recording","/home/ubuntu/LingalaAudio/pyLingala-master")
  return {
      "path": wav_path,
      "sentence": utterance
  }

train_json = [create_json_file(i) for i in train_data]
test_json = [create_json_file(i) for i in test_data]
valid_json = [create_json_file(i) for i in valid_data]

In [None]:
import os

In [None]:
#Make folder to store files


train_path = '/home/ubuntu/model_output/lingala_xlsr4/train'
test_path = '/home/ubuntu/model_output/lingala_xlsr4/test'
valid_path = '/home/ubuntu/model_output/lingala_xlsr4/valid'

if not os.path.isdir(train_path):
  print("Creating paths")
  os.makedirs(train_path)
  os.makedirs(test_path)
  os.makedirs(valid_path)

In [None]:
import json
#for train
for i, sample in enumerate(train_json):
  file_path = os.path.join(train_path,'train_lingala_{}.json'.format(i))
  with open(file_path, 'w') as outfile:
    json.dump(sample, outfile)

#for test
for i, sample in enumerate(test_json):
  file_path = os.path.join(test_path,'test_lingala_{}.json'.format(i))
  with open(file_path, 'w') as outfile:
    json.dump(sample, outfile)

#for valid
for i, sample in enumerate(valid_json):
  file_path = os.path.join(valid_path,'valid_lingala_{}.json'.format(i))
  with open(file_path, 'w') as outfile:
    json.dump(sample, outfile)

In [None]:
from ipywidgets import FloatProgress

#run the second time after the error
from datasets import load_dataset, load_metric

#for train
for root, dirs, files in os.walk(train_path):
  lingala_train = load_dataset("json", data_files=[os.path.join(root,i) for i in files],split="train")

#for test
for root, dirs, files in os.walk(test_path):
  lingala_test = load_dataset("json", data_files=[os.path.join(root,i) for i in files],split="train")

#for valid
for root, dirs, files in os.walk(valid_path):
  lingala_valid = load_dataset("json", data_files=[os.path.join(root,i) for i in files],split="train")

In [None]:
lingala_test

In [None]:
from datasets import ClassLabel
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
    print(len(dataset))
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)
    
    df = pd.DataFrame(dataset[picks])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(lingala_test, num_examples=20)

In [None]:
import torch
import torchvision
torch.cuda.get_device_name()

In [None]:
import re
chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�]'

def remove_special_characters(batch):
    batch["sentence"] = re.sub(chars_to_ignore_regex, '', batch["sentence"]).lower() + " "
    return batch

In [None]:
lingala_train = lingala_train.map(remove_special_characters)
lingala_test = lingala_test.map(remove_special_characters)
lingala_valid = lingala_valid.map(remove_special_characters)

In [None]:
show_random_elements(lingala_train.remove_columns(["path"]))

In [None]:
def extract_all_chars(batch):
  all_text = " ".join(batch["sentence"])
  vocab = list(set(all_text))
  return {"vocab": [vocab], "all_text": [all_text]}

In [None]:
vocab_train = lingala_train.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=lingala_train.column_names)
vocab_test = lingala_test.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=lingala_test.column_names)
vocab_valid = lingala_valid.map(extract_all_chars, batched=True, batch_size=-1, keep_in_memory=True, remove_columns=lingala_valid.column_names)

In [None]:
vocab_list = list(set(vocab_train["vocab"][0]) | set(vocab_test["vocab"][0]) | set(vocab_valid["vocab"][0]))

In [None]:
vocab_dict = {v: k for k, v in enumerate(vocab_list)}
vocab_dict

In [None]:
vocab_dict["|"] = vocab_dict[" "]
del vocab_dict[" "]

In [None]:
vocab_dict["[UNK]"] = len(vocab_dict)
vocab_dict["[PAD]"] = len(vocab_dict)
len(vocab_dict)

In [None]:
import json
with open('/home/ubuntu/model_output/lingala_xlsr4/vocab.json', 'w') as vocab_file:
    json.dump(vocab_dict, vocab_file)

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer("/home/ubuntu/model_output/lingala_xlsr4/vocab.json", unk_token="[UNK]", pad_token="[PAD]", word_delimiter_token="|")

In [None]:
from transformers import Wav2Vec2FeatureExtractor

feature_extractor = Wav2Vec2FeatureExtractor(feature_size=1, sampling_rate=16000, padding_value=0.0, do_normalize=True, return_attention_mask=True)

In [None]:
from transformers import Wav2Vec2Processor

processor = Wav2Vec2Processor(feature_extractor=feature_extractor, tokenizer=tokenizer)

In [None]:
#If you are running this Colab for the first time and have not saved the processor, uncomment code below
processor.save_pretrained("/home/ubuntu/model_output/lingala_xlsr4/wav2vec2-large-xlsr-lingala")

#To load trained processor
model_dir='/home/ubuntu/model_output/lingala_xlsr4/wav2vec2-large-xlsr-lingala'
processor = Wav2Vec2Processor.from_pretrained(model_dir)

In [None]:
lingala_train[197]

In [None]:
import torchaudio

def speech_file_to_array_fn(batch):
    speech_array, sampling_rate = torchaudio.load(batch["path"])
    batch["speech"] = speech_array[0].numpy()
    batch["sampling_rate"] = sampling_rate
    batch["target_text"] = batch["sentence"]
    return batch

In [None]:
lingala_train = lingala_train.map(speech_file_to_array_fn, remove_columns=lingala_train.column_names)
lingala_test = lingala_test.map(speech_file_to_array_fn, remove_columns=lingala_test.column_names)
lingala_valid = lingala_valid.map(speech_file_to_array_fn, remove_columns=lingala_valid.column_names)

In [None]:
lingala_train

In [None]:
import IPython.display as ipd
import numpy as np
import random

rand_int = random.randint(0, len(lingala_train)-1)

ipd.Audio(data=np.asarray(lingala_train[rand_int]["speech"]), autoplay=True, rate=16000)

In [None]:
#rand_int = random.randint(0, len(fon_train)-1)

print("Target text:", lingala_train[rand_int]["target_text"])
print("Input array shape:", np.asarray(lingala_train[rand_int]["speech"]).shape)
print("Sampling rate:", lingala_train[rand_int]["sampling_rate"])

In [None]:
def prepare_dataset(batch):
    # check that all files have the correct sampling rate
    assert (
        len(set(batch["sampling_rate"])) == 1
    ), f"Make sure all inputs have the same sampling rate of {processor.feature_extractor.sampling_rate}."

    batch["input_values"] = processor(batch["speech"], sampling_rate=batch["sampling_rate"][0]).input_values
    
    with processor.as_target_processor():
        batch["labels"] = processor(batch["target_text"]).input_ids
    return batch

In [None]:
lingala_train = lingala_train.map(prepare_dataset, remove_columns=lingala_train.column_names, batch_size=8, num_proc=4, batched=True)
lingala_test = lingala_test.map(prepare_dataset, remove_columns=lingala_test.column_names, batch_size=8, num_proc=4, batched=True)
lingala_valid = lingala_valid.map(prepare_dataset, remove_columns=lingala_valid.column_names, batch_size=8, num_proc=4, batched=True)

In [None]:
import torch

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lenghts and need
        # different padding methods
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features]

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        with self.processor.as_target_processor():
            labels_batch = self.processor.pad(
                label_features,
                padding=self.padding,
                max_length=self.max_length_labels,
                pad_to_multiple_of=self.pad_to_multiple_of_labels,
                return_tensors="pt",
            )

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

## Metric
We use word error rate with space as word boundary. We also use character error rate without word boundaries

In [12]:
from datasets import (
    load_dataset, 
    load_from_disk,
    load_metric,)

In [13]:
wer_metric = load_metric("wer")

In [14]:
wer_metric.compute(predictions=['สวัสดี ค่า ทุก โคน'],references=['สวัสดี ค่ะ ทุก คน'])

0.5

In [22]:
cer_metric = load_metric('cer/cer.py')

In [24]:
cer_metric.compute(predictions=['aab'],references=['aaac'])

ValueError: number of ground truth inputs (4) and hypothesis inputs (3) must match.