# Whisper Fine-tuning


## Initial Setup

In [None]:
!add-apt-repository -y ppa:jonathonf/ffmpeg-4 -q
!apt update -q
!apt install -y ffmpeg -q

In [None]:
!pip install -q datasets>=2.6.1
!pip install -q git+https://github.com/huggingface/transformers
!pip install -q librosa
!pip install -q evaluate>=0.30
!pip install -q jiwer
!pip install -q gradio
!pip install -q bitsandbytes datasets accelerate loralib
!pip install -q git+https://github.com/huggingface/transformers.git@main git+https://github.com/huggingface/peft.git@main

Link to hub:

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
# Select CUDA device index
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Input model
model_name_or_path = "openai/whisper-medium"
language = "Dutch"
language_abbr = "nl"
task = "transcribe"
dataset_name = "mariatepei/synthetic_accented_Dutch"

# Output model path
org = "mariatepei"
trained_adapter_name = "whisper-medium-synthetic"
trained_model_name = "whisper-medium-synthetic_model"

trained_adapter_repo = org + "/" + trained_adapter_name
trained_model_repo = org + '/' + trained_model_name

## Load Dataset

In [None]:
from datasets import load_dataset, DatasetDict

dataset = DatasetDict()

dataset["train"] = load_dataset("mariatepei/synthetic_accented_Dutch", split="train")
dataset["validation"] = load_dataset("mariatepei/synthetic_accented_Dutch", split="validation")

print(dataset)

## Prepare Feature Extractor, Tokenizer and Data

In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)

### Prepare Data

In [None]:
print(dataset["train"][0])

In [None]:
from datasets import Audio

dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
print(dataset["train"][0])

In [None]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["text"]).input_ids
    return batch

apply the data preparation function to all training examples using dataset's `.map` method.

In [None]:
print(dataset)

In [None]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names["train"], num_proc=1)

In [None]:
dataset["train"]

In [None]:
dataset["validation"]

In [None]:
print(dataset)

## Training and Evaluation

### Define a Data Collator

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

initialise the data collator

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

### Evaluation Metrics

In [None]:
import evaluate

metric = evaluate.load("wer")

Define a function that takes the model
predictions and returns the WER metric.

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

### Load a Pre-Trained Checkpoint

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path)

#model.hf_device_map # this should be {" ": 0}

Override generation arguments - no tokens are forced as decoder outputs (see [`forced_decoder_ids`](https://huggingface.co/docs/transformers/main_classes/text_generation#transformers.generation_utils.GenerationMixin.generate.forced_decoder_ids)), no tokens are suppressed during generation (see [`suppress_tokens`](https://huggingface.co/docs/transformers/main_classes/text_generation#transformers.generation_utils.GenerationMixin.generate.suppress_tokens)):

In [None]:
model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)
model.config.suppress_tokens = []

In [None]:
model.config.forced_decoder_ids

### Apply LoRA

Load a `PeftModel` and specify the use low-rank adapters (LoRA) using `get_peft_model` utility function from `peft`.

In [None]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

config = LoraConfig(r=32, lora_alpha=64, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")

model = get_peft_model(model, config)

model.print_trainable_parameters()

### Define the Training Configuration

Define all the parameters related to training.

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir=trained_model_name,  # change to a repo name of your choice
    per_device_train_batch_size=16, # increase to 16 for larger datasets
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=5e-6,
    warmup_steps=200,
    max_steps=1500,
    #num_train_epochs=30,
    evaluation_strategy="steps",
    fp16=True,
    per_device_eval_batch_size=8,
    generation_max_length=128,
    logging_steps=25,
    save_steps=50,
    eval_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    remove_unused_columns=False,  # required as the PeftModel forward doesn't have the signature of the wrapped model's forward
    label_names=["labels"],  # same reason as above
    predict_with_generate=True,
    resume_from_checkpoint="whisper-medium-synthetic_model/checkpoint-1000"
)

In [None]:
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validation"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)
#model.config.use_cache = False  # silence the warnings. Please re-enable for inference!

In [None]:
trainer.train()

### Push adapters to hub and merge adapters onto the model

In [None]:
adapter_to_push = f"{trained_model_name}/checkpoint-1000"

In [None]:
print(adapter_to_push)

In [None]:
from peft import PeftModel
model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path, load_in_8bit=False, device_map="auto")

model = PeftModel.from_pretrained(
    model,
    adapter_to_push,
)

In [None]:
notebook_login()

In [None]:
# Push adapters to hub
model.push_to_hub(trained_adapter_repo)
print(f"Model pushed to {trained_adapter_repo}")

In [None]:
model = model.merge_and_unload()

In [None]:
print(model)

## Push Model and Processor to Hub

In [None]:
model.push_to_hub(trained_model_repo, safe_serialization=True)
processor.push_to_hub(trained_model_repo)

In [None]:
## Run this code if you face utf locale errors
# https://stackoverflow.com/questions/56081324/why-are-google-colab-shell-commands-not-working
import locale
def getpreferredencoding(do_setlocale = True):
    return "UTF-8"
locale.getpreferredencoding = getpreferredencoding