In [None]:
!pip install audiomentations

In [None]:
!pip install --upgrade pip
!pip install --upgrade datasets transformers accelerate evaluate jiwer tensorboard gradio

In [None]:
!pip list | grep numpy

Make sure numpy 1.26.4 is available else there are conflicting errors due to the imports from audiomentations (for data augmentation). In Colab, after running the above cells, restart the session and then run the cells below.

In [None]:
import numpy
print(numpy.__version__)
print(numpy.__file__)

In [None]:
from huggingface_hub import notebook_login

notebook_login()

**Load data.**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import load_dataset, DatasetDict

my_dataset = DatasetDict()
my_dataset["train"] = load_dataset("/content/drive/My Drive/dataset_new", split='train')
print(my_dataset)

**Preprocessing and EDA.**

In [None]:
# Get column names to identify the second column
column_names = my_dataset["train"].column_names
second_column = column_names[1]
second_column

In [None]:
import re

# filter rows that have anything other than letters
def filter_dataset(dataset):
    filtered_rows = []
    for i in range(len(dataset['transcription'])):
        if bool(re.search(r'[^A-Za-z\u0621-\u064A\s]', dataset['transcription'][i])):
            filtered_rows.append(i)

    return dataset.select(filtered_rows)

filtered_dataset2 = filter_dataset(my_dataset['train'])

print(filtered_dataset2)

In [None]:
for i in range(len(filtered_dataset2)):
  print(filtered_dataset2[i]['audio']['path'])
  print(filtered_dataset2[i]['transcription'])
  print("\n")

In [None]:
import re

# filter rows that have both eng and arabic
def filter_dataset(dataset):
    filtered_rows = []
    for i in range(len(dataset['transcription'])):
        if bool(re.search(r'[A-Za-z]', dataset['transcription'][i]) and re.search(r'[\u0600-\u06FF]', dataset['transcription'][i])):
            filtered_rows.append(i)

    return dataset.select(filtered_rows)

filtered_dataset3 = filter_dataset(my_dataset['train'])

print(filtered_dataset3)

Checking whether the tokenizer can handle the english text

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="Arabic", task="transcribe")

In [None]:
input_str = filtered_dataset3[2]["transcription"]
labels = tokenizer(input_str).input_ids # it returns a dict of input ids and attention mask so just get the input ids
print(labels)
print(tokenizer.tokenize(input_str))
decoded_with_special = tokenizer.decode(labels, skip_special_tokens=False)
decoded_str = tokenizer.decode(labels, skip_special_tokens=True)

print(f"Input:                 {input_str}")
print(f"Decoded w/ special:    {decoded_with_special}")
print(f"Decoded w/out special: {decoded_str}")
print(f"Are equal:             {input_str == decoded_str}")

Clean text

In [None]:
!pip install PyArabic

In [None]:
import pyarabic.araby as araby

before_filter="هو أننا فقط أخذنا وقت طويل جداً بالتفكير بالماضي حيناً وبالمستقبل حيناً آخر في نفس الفترة الزمنية وراح الحاضر بدون ما نحس هذا بالضبط اللي خلينا نفقد التركيز ويسمح للقلق والتوتر بالسيطرة على عقولنا وأفكارنا وهنا تجي فائدة اللي يقظه الذهنية اللي تساعدنا على تجنب هذا التأثير السلبي"
after_filter = araby.strip_diacritics(before_filter)
print(before_filter)
print(after_filter)
print(before_filter==after_filter)

In [None]:
import re
import pyarabic.araby as araby

# remove punctuation etc
def remove_punctuation(text):
    return re.sub(r'[.?،\-؟]', '', text)

def clean_text(example):
    example['transcription'] = remove_punctuation(example['transcription'])
    return example

my_dataset['train'] = my_dataset['train'].map(clean_text)

In [None]:
# remove diacritic
def remove_diacritic(example):
    example['transcription'] = araby.strip_diacritics(example['transcription'])
    return example

my_dataset['train'] = my_dataset['train'].map(remove_diacritic)
print(my_dataset)

In [None]:
# check if removed
import re

# filter rows that have anything other than letters
def filter_dataset(dataset):
    filtered_rows = []
    for i in range(len(dataset['transcription'])):
        if bool(re.search(r'[^A-Za-z\u0621-\u064A\s]', dataset['transcription'][i])):
            filtered_rows.append(i)

    return dataset.select(filtered_rows)

filtered_dataset2 = filter_dataset(my_dataset['train'])

print(filtered_dataset2)

for i in range(len(filtered_dataset2)):
  print(filtered_dataset2[i]['audio']['path'])
  print(filtered_dataset2[i]['transcription'])
  print("\n")

Hear some audios

In [None]:
import random

rand_int = random.randint(0, len(my_dataset['train'])-1)
print(my_dataset['train'][rand_int])

In [None]:
import IPython.display as ipd

print(my_dataset['train'][rand_int]["transcription"])
ipd.Audio(data=my_dataset['train'][rand_int]["audio"]["array"], autoplay=True, rate=my_dataset['train'][rand_int]["audio"]["sampling_rate"])

Split dataset

In [None]:
# split the dataset for testing n validation

my_dataset = my_dataset["train"].train_test_split(test_size=0.3, seed=42)
my_dataset

In [None]:
my_dataset_test = my_dataset['test'].train_test_split(test_size=0.5, seed=42)
print(my_dataset_test)

In [None]:
my_dataset['validation'] = my_dataset_test['train']
my_dataset['test'] = my_dataset_test['test']
print(my_dataset)

In [None]:
# Tokenize the sentences and calculate their lengths to find max length
tokenized_lengths = [len(tokenizer.encode(sentence)) for sentence in my_dataset['validation']['transcription']]

print("Tokenized Lengths of Sentences:", tokenized_lengths)

import numpy as np
print("Mean Length:", np.mean(tokenized_lengths))
print("Max Length:", np.max(tokenized_lengths))

# x = np.where(np.array(tokenized_lengths) > 250)
# print(len(x[0]))

import matplotlib.pyplot as plt
plt.hist(tokenized_lengths)
plt.show()

In [None]:
# saving the audios in val n test set for checking

# test_set = []
# for i in range(len(my_dataset['validation'])):
#   x = {}
#   x['file'] = my_dataset['validation'][i]['audio']['path']
#   x['transcription'] = my_dataset['validation'][i]['transcription']
#   test_set.append(x)

In [None]:
# print(test_set)

# with open('val.txt', 'w') as f:
#   for item in test_set:
#     f.write(f"{item['file']}\n")

Preprocessing

In [None]:
# input to whisper should be log-mel, this is done automatically by the whisper feature extractor
# it also performs padding and truncation

from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

In [None]:
# load the whisper tokenizer to convert map the indices predicted by model to text

from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="Arabic", task="transcribe")

In [None]:
input_str = my_dataset["train"][0]["transcription"]
labels = tokenizer(input_str).input_ids # it returns a dict of input ids and attention mask so just get the input ids
decoded_with_special = tokenizer.decode(labels, skip_special_tokens=False)
decoded_str = tokenizer.decode(labels, skip_special_tokens=True)

print(f"Input:                 {input_str}")
print(f"Decoded w/ special:    {decoded_with_special}")
print(f"Decoded w/out special: {decoded_str}")
print(f"Are equal:             {input_str == decoded_str}")

In [None]:
# can combine the tokenizer and feature extractor into one object

from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="Arabic", task="transcribe")

In [None]:
print(my_dataset["train"][0])

In [None]:
# need to sample the audio to match whisper's sampling rate, this does it on the fly when audio is loaded

from datasets import Audio

my_dataset = my_dataset.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
print(my_dataset["train"][0])

Augmentation

In [None]:
from audiomentations import Compose, AddGaussianNoise, PitchShift, Gain

augment_waveform = Compose([
    AddGaussianNoise(min_amplitude=0.005, max_amplitude=0.015, p=0.2), # add gausian noise with 20% probability
    PitchShift(min_semitones=-4, max_semitones=4, p=0.2), # change pitch with 20% probability
    Gain(min_gain_db=-6, max_gain_db=6, p=0.1), # change volume level with 10% probability
    ])

def augment_dataset(batch):
    audio = batch["audio"]["array"]
    augmented_audio = augment_waveform(samples=audio, sample_rate=16000)
    batch["audio"]["array"] = augmented_audio
    return batch

my_dataset['train'] = my_dataset['train'].map(augment_dataset, num_proc=4)

Prepare data for training

In [None]:
def prepare_dataset(batch):
    # load and resample audio data 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0] # its a batch

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["transcription"]).input_ids
    return batch

In [None]:
my_dataset = my_dataset.map(prepare_dataset, remove_columns=my_dataset.column_names["train"], num_proc=4) # use num_proc=4 to make it process faster, if gives error remove it


**Training**

In [None]:
# load the model

from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

In [None]:
# tokens in transcript can't be more than 448

max_label_length = model.config.max_length
def is_labels_in_length_range(labels):
    return len(labels) < max_label_length

my_dataset = my_dataset.filter(is_labels_in_length_range, num_proc=4, input_columns=["labels"])
print(my_dataset)

In [None]:
model.generation_config.language = "Arabic"
model.generation_config.task = "transcribe"

model.generation_config.forced_decoder_ids = None # don't use the legacy method instead use the config above

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass #decorator that provides init function
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt") # pad the input audio and return tensors

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt") # pad the transcript and return tensors

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100) #ne not equal to 1, means get padding tokens from attention mask and replace with -100 so the loss function can ignore them

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch


In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

In [None]:
import evaluate

metric = evaluate.load("wer")
metric2 = evaluate.load("cer")

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True) # use batch decode to get literal tokens for calculating error
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    with open('refs_and_preds.txt', 'w') as f:
      for ref, pred in zip(label_str, pred_str):
          f.write(f"Ref: {ref}\n")
          f.write(f"Pred: {pred}\n\n")

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)
    cer = 100 * metric2.compute(predictions=pred_str, references=label_str)

    return {"wer": wer, "cer":cer}

In [None]:
# model.config.dropout = 0.05

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-informal-arabic-aug2",  # change small if diff checkpoint
    per_device_train_batch_size=16, # this can be reduced if out of memory
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size, accumulate gradients before updating weights when using big batch size to help w memory
    learning_rate=1e-5,
    warmup_steps=100, # for lr
    max_steps=1000, # train for max steps
    gradient_checkpointing=True, # keep subset of activatons in fp n calculate again in bp for memory
    fp16=True, # mixed preciison training with 16 bits instead of 32 for faster training n memory
    evaluation_strategy="steps", # steps not epoch
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=270, # tokens
    save_steps=200,
    eval_steps=200,
    logging_steps=200,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False, # because lower wer is better
    push_to_hub=True,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=my_dataset["train"],
    eval_dataset=my_dataset["validation"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
# write meta data
kwargs = {
    "dataset": "Informal Arabic",
    "language": ["ar"],
    "model_name": "Whisper Small Informal Arabic",
    "finetuned_from": "openai/whisper-small",
    "tags": ["automatic-speech-recognition", "arabic"],
    "tasks": "automatic-speech-recognition",
}
trainer.push_to_hub(**kwargs)

In [None]:
repo_name = "whisper-small-informal-arabic-aug2" # should be same as what was defined when saving model above
tokenizer.push_to_hub(repo_name)

References:

-https://huggingface.co/blog/fine-tune-whisper


**Inference**

In [None]:
# adding tokenizer as it wasn't added before
# from transformers import WhisperTokenizer

# tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-large", language="Arabic", task="transcribe")
# repo_name = "itskavya/whisper-large-informal-arabic-new"
# tokenizer.push_to_hub(repo_name)

In [None]:
from transformers import pipeline

pipe = pipeline("automatic-speech-recognition", model="itskavya/whisper-large-informal-arabic-new")

In [None]:
audio_file = "/content/drive/My Drive/dataset_new/train/audio/11_chunk_6.wav"
transcript = pipe(audio_file)['text']
print(transcript)

In [None]:
!pip install jiwer

In [None]:
from jiwer import wer

reference = "شوفي المجال هذا دخلو فيه ناس كثير للأسف الي فاهم والي مو فاهم يبين انا لما اجي اقعد مع ال wedding planner لما يجي يقولك وبعملك وبسويلك وبعملك وبسوي ابد تبغين زي كذا ابدا واسويلك واعملك اذا قال لك ماعندي عقود هذا انحاشي منه معليش ليش ماعندك عقود لازم يكون عقود والعقود فيه بنود وتقرين عقدك زين مازي"
print("Reference:\n", reference)
print("Predicted:\n", transcript)
error = wer(reference, transcript)
print("WER: ", error*100)