<a href="https://colab.research.google.com/github/uhalpern/ICS-435-Final-Project/blob/main/Whisper_Finetuning_Korean.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **ICS 435 Final - Fine-tuning Whisper From OpenAI**

Setup code for the model and datasets was adapted from this [blog post](https://huggingface.co/blog/fine-tune-whisper) on huggingface.

### **Preparing Environment**

In [None]:
# Run if you would like to work in Google Drive Directory

from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/Github/ICS-435-Final-Project

In [None]:
!pip install --upgrade datasets[audio] transformers accelerate evaluate jiwer tensorboard gradio evaluate
# !pip install -r requirements.txt

## **Load Dataset Through Hugging Face**

In [None]:
from huggingface_hub import notebook_login
notebook_login()

In [None]:
from datasets import load_dataset, DatasetDict

# Load the dataset
common_voice = load_dataset("mozilla-foundation/common_voice_17_0", "ko", split="train+validation+test", use_auth_token=True)

# Remove uneeded columns
common_voice = common_voice.remove_columns(["accent", "age", "client_id", "down_votes", "gender", "locale", "path", "segment", "up_votes", "variant"])

# Randomly permute the dataset
common_voice = common_voice.shuffle(seed=42)

# Calculate the sizes for train, validation, and test sets
total_size = len(common_voice)
train_size = int(0.6 * total_size)
val_size = int(0.25 * total_size)
test_size = total_size - train_size - val_size

# Adjust sizes to include all examples
if train_size + val_size + test_size < total_size:
    test_size += total_size - (train_size + val_size + test_size)
elif train_size + val_size + test_size > total_size:
    # Adjust train size to avoid exceeding total size
    train_size -= (train_size + val_size + test_size) - total_size

# Create DatasetDict with train, validation, and test splits
splits = {
    "train": common_voice.select(range(train_size)),
    "validation": common_voice.select(range(train_size, train_size + val_size)),
    "test": common_voice.select(range(train_size + val_size, total_size))
}

common_voice = DatasetDict(splits)

Define feature extractor that pads or truncates audio inputs into 30 second segments. Then, the audio input is converted into log-Mel spectrogram form.

In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

Load Whisper tokenizer that maps token ids to corresponding text string.

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="Korean", task="transcribe")

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="Korean", task="transcribe")

Downsample audio to 16kHz as expected by the Whisper model

In [None]:
from datasets import Audio

common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))

Function to prepare the dataset


1.   Resample audio to 16000 kHz
2.   Compute log-Mel spectrogram on input features in order to separate individual frequencies
3. Assign labels to transcriptions

In [None]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

Map the pre-processing function to the entire dataset

In [None]:
common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names["train"], num_proc=2)

## **Training and Evaluation**

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

In [None]:
# Define language and task for model
model.generation_config.language = "korean"
model.generation_config.task = "transcribe"

model.generation_config.forced_decoder_ids = None

Define data collator that will convert the input_features to batched PyTorch sensors. The labels will also be padded in order to match the padding given to the input_features. Padding tokens are represented by -100 so that it does not affect the loss.

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

Set up evaluation metrics

In [None]:
import evaluate

metric = evaluate.load("wer")

Define a function to return WER metric from model predictions to handle padded tokens. Decode the predicted and label ids to strings and compute the WER between predicted strings and labels.

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
# Create lists with three options for each hyperparameter
learning_rate_options = [1e-6, 1e-5, 1e-4]
warmup_steps_options = [10, 20, 30]
batch_gradient_tuples = [(32, 4), (64, 4), (128, 2)]
weight_decay_options = [0, 0.2, 0.3]
num_epochs_options = [5, 10, 15]

In [None]:
import random
from transformers import Seq2SeqTrainingArguments

random.seed(42)
training_args_list = []

# Perform random search with 5 passes
for i in range(5):
    # Randomly select hyperparameter values
    learning_rate = random.choice(learning_rate_options)
    warmup_steps = random.choice(warmup_steps_options)
    batch_size, gradient_accumulation_steps = random.choice(batch_gradient_tuples)
    weight_decay = random.choice(weight_decay_options)
    num_epochs = random.choice(num_epochs_options)

    total_examples = len(common_voice["train"])

    # Calculate steps per epoch based on batch size
    steps_per_epoch = total_examples // batch_size

    # Calculate total training steps
    total_steps = steps_per_epoch * num_epochs

    # Create model and evaluation checkpoints at half the total steps
    save_steps = total_steps // 2
    eval_steps = total_steps // 2

    training_args = Seq2SeqTrainingArguments(
        output_dir=f"./whisper-small-ko_pass_{i+1}",
        per_device_train_batch_size=batch_size,
        gradient_accumulation_steps=gradient_accumulation_steps,
        learning_rate=learning_rate,
        warmup_steps=warmup_steps,
        max_steps=total_steps,
        gradient_checkpointing=True,
        fp16=False,
        evaluation_strategy="steps",
        per_device_eval_batch_size=4,
        predict_with_generate=True,
        generation_max_length=225,
        save_steps=save_steps,
        eval_steps=eval_steps,
        logging_steps=25,
        report_to=["tensorboard"],
        load_best_model_at_end=True,
        metric_for_best_model="wer",
        greater_is_better=False,
        push_to_hub=True,
        weight_decay=weight_decay
    )

    training_args_list.append(training_args)

    print(f"Pass {i+1}:")
    print(f"Learning Rate: {learning_rate}")
    print(f"Warmup Steps: {warmup_steps}")
    print(f"Batch Size: {batch_size}")
    print(f"Gradient Accumulation Steps: {gradient_accumulation_steps}")
    print(f"Weight Decay: {weight_decay}")
    print(f"Num Epochs: {num_epochs}")
    print("\n")

In [None]:
notebook_login()

In [None]:
from transformers import Seq2SeqTrainer

# Iterate over training arguments list and train the models
for i, training_args in enumerate(training_args_list):

    trainer = Seq2SeqTrainer(
        args=training_args,
        model=model,
        train_dataset=common_voice["train"],
        eval_dataset=common_voice["validation"],
        data_collator=data_collator,
        compute_metrics=compute_metrics,
        tokenizer=processor.feature_extractor,
    )

    # Train the model
    print(f"Training model {i+1}...")
    trainer.train()

## Evaluate The Base Model on The Held Out Test Set

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=common_voice["train"],
    eval_dataset=common_voice["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.evaluate()

## Import Best Finetuned Model and Evaluate on Held Out Test Set

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("uhalpern/whisper-small-ko_pass_2")

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=common_voice["train"],
    eval_dataset=common_voice["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.evaluate()