### This code comes from Sanchit Gandhi blog Fine-Tune Whisper for Multilingual ASR with Transformers on Hugging Face - 3 November 2022
### Here you just need to add your own audio files and corresponding transcriptions 
####    train.tsv and test.tsv files expect two tab-separated columns: 
####    path (audio/myfile1.mp3) and script (verbatim transcript of audio)

### Load data into train and test dataset dictionary

In [None]:
from datasets import Dataset, load_dataset, DatasetDict
import pandas as pd

data_train = pd.read_csv('train.tsv', delimiter='\t')
data_test = pd.read_csv('test.tsv', delimiter='\t')

cv = DatasetDict()
cv['train'] = Dataset.from_pandas(data_train)
cv['test'] = Dataset.from_pandas(data_test)

# sanity check
print(cv)

### Load WhisperFeatureExtractor

In [None]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained('openai/whisper-small')


### Load WhisperTokenizer

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained('openai/whisper-small', language='Russian', task='transcribe')


In [None]:
# sanity check
# verify that the tokenizer is working

input_str = cv['train'][0]['script']
labels = tokenizer(input_str).input_ids
decoded_with_special = tokenizer.decode(labels, skip_special_tokens=False)
decoded_str = tokenizer.decode(labels, skip_special_tokens=True)

print(f"Input:                 {input_str}")
print(f"Decoded w/ special:    {decoded_with_special}")
print(f"Decoded w/out special: {decoded_str}")
print(f"Are equal:             {input_str == decoded_str}")

### Combine into a WhisperProcessor

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained('openai/whisper-small', language='Russian', task='transcribe')

### Prepare the data

In [None]:
# sanity check
# see what form the data are in 
print(cv['train'][0])

## note that the example has additional fields, e.g. array of floats

In [None]:
from datasets import Audio

# set the sampling rate to 16kHz
cv = cv.cast_column("path", Audio(sampling_rate=16000))

In [None]:
import os

print(cv['train'][0])

In [None]:
def prep_dataset(batch):
    # load and resample audio data to 16kHz
    audio = batch['path']

    # compute log-mel spectrograms
    batch['input_features'] = feature_extractor(audio['array'],sampling_rate=16000).input_features[0]
   
    # encode target text to label ids
    batch['labels'] = tokenizer(batch['script']).input_ids

    return batch

In [None]:
cv = cv.map(prep_dataset, remove_columns=cv.column_names['train'], num_proc=4)

### Training and Evaluation

#### Define and initialize a data collator

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
# initialize data collator
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

#### Evaluation metrics

In [None]:
import evaluate

metric = evaluate.load("wer")

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

#### Load a pretrained checkpoint

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

#### Define the training configuration

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./wspr-smll-ru",  # repo name of your choice
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase with decrease in batch size
    learning_rate=1e-5,
    warmup_steps=500,
    max_steps=2500,
    gradient_checkpointing=True,
    fp16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=1000,
    eval_steps=1000,
    logging_steps=25,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=cv["train"],
    eval_dataset=cv["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
processor.save_pretrained(training_args.output_dir)

#### Launch training
##### The training process will likely take hours depending on your compute resources

In [None]:
trainer.train()

### Processing Pipeline for Tuned Model
#### Launch user interface

In [None]:
from transformers import pipeline
from tokenizers import Tokenizer
import gradio as gr
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("./wspr-smll-ru3/", language='Russian', task='transcribe')

pipe = pipeline(task="automatic-speech-recognition", model="./wspr-smll-ru3/checkpoint-2000/", tokenizer=tokenizer)

def transcribe(audio):
    text = pipe(audio)["text"]
    return text

iface = gr.Interface(
    fn=transcribe, 
    inputs=gr.Audio(type="filepath"), 
    outputs="text",
    title="Whisper small model - air traffic control",
    description="Demo of Whisper small model fine-tuned with ATC comms.",
)

iface.launch()
