Start by making sure you have the following packages in your environment:

In [None]:
# ! pip install huggingface_hub datasets evaluate jiwer transformers[torch] tensorboard librosa

In [2]:
MY_DATASET = "janaab/supreme-court-speech"

In [3]:
BASE_MODEL = "openai/whisper-small"
MY_MODEL = "janaab/whisper-small-sc"

In [4]:
from IPython.display import display, HTML

# Set a maximum height for all output areas
display(HTML('''<style>.output_area pre {max-height: 100px; overflow-y: scroll;}</style>'''))

# Data Preperation

### Login to HF hub

In [5]:
from huggingface_hub import interpreter_login

interpreter_login()


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To login, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .


Enter your token (input will not be visible):  ········
Add token as git credential? (Y/n)  n


Token is valid (permission: write).
Your token has been saved to /root/.cache/huggingface/token
Login successful


## Load Dataset

In [None]:
from datasets import load_dataset, DatasetDict

sc_speech = DatasetDict()

sc_speech["train"] = load_dataset(
    MY_DATASET, split="train"
)
sc_speech["test"] = load_dataset(
    MY_DATASET, split="test"
)

print(sc_speech)

In [7]:
sc_speech = sc_speech.select_columns(["audio", "transcript"])

## Feature processing for Whisper

In [8]:
from transformers.models.whisper.tokenization_whisper import TO_LANGUAGE_CODE

# TO_LANGUAGE_CODE

In [9]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(
    BASE_MODEL, language="en", task="transcribe"
)

## Process data

In [10]:
sc_speech["train"].features

{'audio': Audio(sampling_rate=None, mono=True, decode=True, id=None),
 'transcript': Value(dtype='string', id=None)}

In [11]:
from datasets import Audio

sampling_rate = processor.feature_extractor.sampling_rate
sc_speech = sc_speech.cast_column("audio", Audio(sampling_rate=sampling_rate))

In [12]:
def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        audio=audio["array"],
        sampling_rate=audio["sampling_rate"],
        text=example["transcript"],
    )

    # compute input length of audio sample in seconds
    example["input_length"] = len(audio["array"]) / audio["sampling_rate"]

    return example

In [None]:
sc_speech = sc_speech.map(
    prepare_dataset, remove_columns=sc_speech.column_names["train"], num_proc=1
)

In [14]:
max_input_length = 30.0


def is_audio_in_length_range(length):
    return length < max_input_length

In [None]:
sc_speech["train"] = sc_speech["train"].filter(
    is_audio_in_length_range,
    input_columns=["input_length"],
)

In [None]:
sc_speech["train"]

# Training and Evaluation

## Data Collector

In [17]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [
            {"input_features": feature["input_features"][0]} for feature in features
        ]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(
            labels_batch.attention_mask.ne(1), -100
        )

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [18]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

## Eval metrics

In [19]:
import evaluate

metric = evaluate.load("wer")

In [20]:
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

normalizer = BasicTextNormalizer()


def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    # compute orthographic wer
    wer_ortho = 100 * metric.compute(predictions=pred_str, references=label_str)

    # compute normalised WER
    pred_str_norm = [normalizer(pred) for pred in pred_str]
    label_str_norm = [normalizer(label) for label in label_str]
    # filtering step to only evaluate the samples that correspond to non-zero references:
    pred_str_norm = [
        pred_str_norm[i] for i in range(len(pred_str_norm)) if len(label_str_norm[i]) > 0
    ]
    label_str_norm = [
        label_str_norm[i]
        for i in range(len(label_str_norm))
        if len(label_str_norm[i]) > 0
    ]

    wer = 100 * metric.compute(predictions=pred_str_norm, references=label_str_norm)

    return {"wer_ortho": wer_ortho, "wer": wer}

## Load model checkpoint

In [21]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(BASE_MODEL)

In [22]:
# adjust dropout ratio
model.config.dropout = 0.1

In [23]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(
    model.generate, language="en", task="transcribe", use_cache=True
)

## Training config

In [24]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir=f'./models/{MY_MODEL}',
    push_to_hub=False,
    # hub_model_id=MY_MODEL,
    
    # training args
    per_device_train_batch_size=64,
    gradient_accumulation_steps=1,
    learning_rate=1e-5,
    lr_scheduler_type="linear",
    warmup_steps=150,
    max_steps=1500,
    gradient_checkpointing=True,
    fp16=True,
    # weight_decay=0.01,
    # optim="adamw_bnb_8bit",

    # eval args
    fp16_full_eval=True,
    eval_strategy="steps",
    per_device_eval_batch_size=32,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=250,
    eval_steps=250,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
)

In [None]:
from transformers import Seq2SeqTrainer, EarlyStoppingCallback

early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=3,  # Number of evaluations to wait for improvement
    early_stopping_threshold=0.0  # Minimum change to qualify as an improvement
)

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=sc_speech["train"],
    eval_dataset=sc_speech["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor,
    callbacks=[early_stopping_callback]
)

## Training loop

In [None]:
# # Clearing cache helps with VRAM fragmentation

# import torch
# torch.cuda.empty_cache()

In [None]:
# # fix AccelerateState bug

# ! pip install accelerate
# from accelerate import Accelerator
# acc = Accelerator()

In [None]:
trainer.train()

  return fn(*args, **kwargs)
  with torch.enable_grad(), device_autocast_ctx, torch.cpu.amp.autocast(**ctx.cpu_autocast_kwargs):  # type: ignore[attr-defined]


Step,Training Loss,Validation Loss


In [None]:
# Push the trained model, config, and tokenizer to the Hub
# trainer.push_to_hub()