# Finetuning for Swiss German Detection

## Load the Dataset

In [None]:
import huggingface_hub
import os
import wandb

huggingface_username = 'notebotIE'
competition = 'swiss_german_dialects_raw'
hf_token = os.getenv("HF_TOKEN")

print("Logging in to Hugging Face Hub and W&B...")
huggingface_hub.login(token=hf_token)
print("Login successful.")

wandb_project = 'NoteBotFinetuning'
wandb_entity = 'bunduli'
wandb.login(key=os.getenv('WANDB_API_TOKEN'))


In [None]:
from datasets import load_dataset, DatasetDict

print("Loading the entire dataset from Hugging Face...")
dataset = load_dataset(f"{huggingface_username}/{competition}")
print("Dataset loaded successfully.")

print("Inspecting the dataset...")
print(dataset)
print(dataset['train'][0])

In [None]:
from collections import Counter

dialect_counts = Counter(dataset['train']['Dialect'])

for dialect, count in dialect_counts.items():
    print(f"{dialect}: {count}")


In [None]:
# Filter only for the 'zh' dialect
dataset_zh = dataset.filter(lambda example: example['Dialect'] == 'zh')

In [None]:
# Split the 'zh' DatasetDict by operating on the 'train' Dataset
train_test = dataset_zh['train'].train_test_split(test_size=0.2, seed=42) 
eval_test = train_test['test'].train_test_split(test_size=0.5, seed=42)   

# Combine back into a new DatasetDict
zh_split = DatasetDict({
    "train": train_test['train'],
    "eval": eval_test['train'],  
    "test": eval_test['test']    
})

print(zh_split)


In [None]:
def add_dialect_token(example):
    dialect_map = {
        'zh': 'zc',  
    }
    dialect_token = f"<{dialect_map.get(example['Dialect'], 'unknown')}>"
    example['Transcription'] = f"{dialect_token} {example['Transcription']}"
    return example

zh_split = zh_split.map(add_dialect_token)


In [None]:
zh_split

In [None]:
def simplify_dataset(example):
    return {
        "audio": example["audio"],
        "sentence": example["Transcription"]
    }

# Apply the simplification to all splits
zh_split = zh_split.map(simplify_dataset, remove_columns=zh_split["train"].column_names)

In [None]:
zh_split

## Preprocessing

In [None]:
from transformers import WhisperProcessor

# Load the Whisper processor for the large model, with language set to German
processor = WhisperProcessor.from_pretrained(
    "openai/whisper-large-v2", language="german", task="transcribe"
)

processor


In [None]:
zh_split["train"].features

In [None]:
from datasets import Audio

sampling_rate = processor.feature_extractor.sampling_rate
zh_split = zh_split.cast_column("audio", Audio(sampling_rate=sampling_rate))

In [None]:
zh_split["train"].features

In [None]:
sample = zh_split["train"][0].copy()
sample

In [None]:
from IPython.display import Audio as IPyAudio


IPyAudio(sample["audio"]["array"], rate=sample["audio"]["sampling_rate"])

In [None]:
def prepare_dataset(example):
    """Transform the raw sample into the preprocessed one:
    * Audio: Extract the log-mel spectrogram features from the 1-d array
    * Text: Tokenize it into the label ids
    * input_length: extra feature, useful for statistics and filtering"""
    audio = example["audio"]

    example = processor(
        audio=audio["array"],
        sampling_rate=audio["sampling_rate"],
        text=example["sentence"],
    )

    # compute input length of audio sample in seconds
    example["input_length"] = len(audio["array"]) / audio["sampling_rate"]

    return example

In [None]:
zh_split_preprocessed = zh_split.map(
    prepare_dataset,
    batched=False,
    remove_columns=zh_split.column_names["train"],
    num_proc=1
)

In [None]:
def is_audio_in_length_range(length, max_input_length = 30.):
    return length < max_input_length

# notice that the filter is done in-place
zh_split_preprocessed["train"] = zh_split_preprocessed["train"].filter(
    is_audio_in_length_range,
    input_columns=["input_length"],
    fn_kwargs = {"max_input_length":30.},
    num_proc=1
)

In [None]:
zh_split_preprocessed

In [None]:
dataset_name = "zh_split_preprocessed"
zh_split_preprocessed.push_to_hub(
    f"{huggingface_username}/{dataset_name}",
    private=True,
    token=hf_token
)

## Training and Evaluation

In [None]:
from datasets import load_dataset

zh_split_preprocessed = load_dataset(f"{huggingface_username}/{dataset_name}")
zh_split_preprocessed

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(
        self, features: List[Dict[str, Union[List[int], torch.Tensor]]]
    ) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [
            {"input_features": feature["input_features"][0]} for feature in features
        ]
        batch = self.processor.feature_extractor.pad(
            input_features, return_tensors="pt"
        )

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(
            labels_batch.attention_mask.ne(1), -100
        )

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
import evaluate

metric = evaluate.load("wer")
# we want to also show the Character Error Rate (CER)
cer_metric = evaluate.load("cer")

In [None]:
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

normalizer = BasicTextNormalizer()


def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id, since we added this for padding
    # this pad token will be removed in the decoding step
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    # compute orthographic wer
    wer_ortho = metric.compute(predictions=pred_str, references=label_str)

    # compute normalised WER
    pred_str_norm = [normalizer(pred) for pred in pred_str]
    label_str_norm = [normalizer(label) for label in label_str]
    # filtering step to only evaluate the samples that correspond to non-zero references:
    pred_str_norm = [
        pred_str_norm[i] for i in range(len(pred_str_norm)) if len(label_str_norm[i]) > 0
    ]
    label_str_norm = [
        label_str_norm[i]
        for i in range(len(label_str_norm))
        if len(label_str_norm[i]) > 0
    ]

    wer = metric.compute(predictions=pred_str_norm, references=label_str_norm)
    cer = cer_metric.compute(predictions=pred_str_norm, references=label_str_norm)

    return {"wer_ortho": wer_ortho, "wer": wer, "cer": cer}

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v2")

In [None]:
from functools import partial

# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(
    model.generate, language="german", task="transcribe", use_cache=True
)

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-large-v2",  # name on the HF Hub
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    lr_scheduler_type="constant_with_warmup",
    warmup_steps=50,
    max_steps=500,  # increase to 4000 if you have your own GPU or a Colab paid plan
    gradient_checkpointing=True, # save memory in GPU at the expense of more time
    fp16=True,
    fp16_full_eval=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=16,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=250,
    eval_steps=250,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False, # wer should be lower to be an improvement
    push_to_hub=True,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=zh_split_preprocessed["train"],
    eval_dataset=zh_split_preprocessed["eval"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor,
)

In [None]:
trainer.train()

In [None]:
kwargs = {
    "dataset_tags": "notebotIE/zh_split_preprocessed",
    "dataset": "SwissDialDataset_ETH",  # a 'pretty' name for the training dataset
    "language": "german",
    "model_name": "Whisper Large V2 - Swiss German",  # a 'pretty' name for your model
    "finetuned_from": "openai/whisper-large-v2",
    "tasks": "automatic-speech-recognition",
}

trainer.push_to_hub(**kwargs)