In [None]:
from pathlib import Path
from dotenv import load_dotenv
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import pandas as pd
from datasets import Dataset
import torchaudio
import os
import json

load_dotenv()
TEAM_NAME = os.getenv("TEAM_NAME")
TEAM_TRACK = os.getenv("TEAM_TRACK")

In [None]:
MODEL_NAME = "openai/whisper-small"

model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME)
processor = WhisperProcessor.from_pretrained(MODEL_NAME)

model.generation_config.language = "en"
model.generation_config.task = "transcribe"
model.generation_config.forced_decoder_ids = None

In [None]:
on_gcp = None not in [TEAM_NAME, TEAM_TRACK]

if on_gcp:
    # For running on GCP
    input_dir = Path(f"/home/jupyter/{TEAM_TRACK}")
    results_dir = Path(f"/home/jupyter/{TEAM_NAME}")
else:
    # For running locally
    input_dir = Path("advanced")
    results_dir = Path("results")

# Prepare training data
data = []
with open(input_dir / "asr.jsonl", "r") as f:
    for line in f:
        if line.strip() == "":
            continue
        instance = json.loads(line.strip())
        audio_path = input_dir / "audio" / instance["audio"]
        data.append((instance["key"], audio_path.as_posix(), instance["transcript"]))

df = pd.DataFrame(data, columns=["id", "audio_path", "transcript"])
dataset = Dataset.from_pandas(df)
print(dataset)

def prepare_data(data):
    wave, sr = torchaudio.load(data["audio_path"])
    # assert sr == processor.feature_extractor.sampling_rate, f"Invalid sample rate: {sr}"

    # Extract audio features
    input_features = processor(wave.squeeze().numpy(), sampling_rate=processor.feature_extractor.sampling_rate, return_tensors="pt").input_features

    # Process the transcript
    labels = processor.tokenizer(data["transcript"], return_tensors="pt").input_ids

    return {"input_features": input_features.squeeze(), "labels": labels.squeeze()}

dataset = dataset.map(prepare_data, remove_columns=["id", "audio_path", "transcript"])
print(dataset)

# Split into training and testing
dataset_dict = dataset.train_test_split(test_size=0.1)
print(dataset_dict)

In [None]:
import torch
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch
    
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

In [None]:
import evaluate

metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./asr/models/whisper-small.en-til-24-100",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=8,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    max_steps=100,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=2,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=100,
    eval_steps=100,
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
)

from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()
trainer.save_model()