In [None]:
!add-apt-repository -y ppa:jonathonf/ffmpeg-4
!apt update
!apt install -y ffmpeg

In [None]:
!pip install datasets>=2.6.1
!pip install librosa
!pip install evaluate>=0.30
!pip install jiwer
!pip install gradio
!pip install -q bitsandbytes datasets accelerate
!pip install -q git+https://github.com/huggingface/transformers.git@main git+https://github.com/huggingface/peft.git@main

In [None]:

from huggingface_hub import notebook_login
notebook_login()

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
model_name_or_path = "clu-ling/whisper-large-v2-japanese-5k-steps"
language = "Japanese"
language_abbr = "default"
task = "transcribe"
split = "train"
dataset_name = "joujiboi/japanese-anime-speech"

In [None]:
from datasets import interleave_datasets, load_dataset

def load_streaming_dataset(dataset_name, dataset_config_name, split, **kwargs):
    if "+" in split:
        dataset_splits = [load_dataset(dataset_name, dataset_config_name, split=split_name, trust_remote_code=True, streaming=True, **kwargs) for split_name in split.split("+")]
        interleaved_dataset = interleave_datasets(dataset_splits)
        return interleaved_dataset
    else:
        dataset = load_dataset(dataset_name, dataset_config_name, split=split, trust_remote_code=True, streaming=True, **kwargs)
        return dataset

In [None]:
from datasets import load_dataset, interleave_datasets, concatenate_datasets, Audio, Dataset, IterableDatasetDict

raw_datasets = IterableDatasetDict()
raw_datasets["train"] = load_streaming_dataset(dataset_name, language_abbr, split) 

print(raw_datasets)

In [None]:
from transformers import WhisperFeatureExtractor
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path, language=language, task=task)

from transformers import WhisperTokenizer
tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)

from transformers import WhisperProcessor
processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)

In [None]:
raw_datasets["train"].features

In [None]:
from datasets import Audio
raw_datasets = raw_datasets.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
from transformers.models.whisper.english_normalizer import BasicTextNormalizer
do_lower_case = False
do_remove_punctuation = False
normalizer = BasicTextNormalizer()

In [None]:
def prepare_dataset(batch):
    audio = batch["audio"]
    batch["input_features"] = processor.feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
    batch["input_length"] = len(audio["array"]) / audio["sampling_rate"]
    transcription = batch["sentence"]
    if do_lower_case:
        transcription = transcription.lower()
    if do_remove_punctuation:
        transcription = normalizer(transcription).strip()

    batch["labels"] = processor.tokenizer(transcription).input_ids
    return batch

In [None]:
vectorized_datasets = raw_datasets.map(prepare_dataset, remove_columns=list(next(iter(raw_datasets.values())).features)).with_format("torch")

In [None]:
vectorized_datasets["train"] = vectorized_datasets["train"].shuffle(
    buffer_size=500,
    seed=0,
)

In [None]:
max_input_length = 30.0
def is_audio_in_length_range(length):
    return length < max_input_length

In [None]:
vectorized_datasets["train"] = vectorized_datasets["train"].filter(
    is_audio_in_length_range,
    input_columns=["input_length"],
)

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels
        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:

import evaluate
metric = evaluate.load("wer")

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids
    label_ids[label_ids == -100] = tokenizer.pad_token_id
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)
    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:

from transformers import WhisperForConditionalGeneration
model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path, load_in_8bit=True, device_map="auto")

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [None]:
from peft import prepare_model_for_int8_training
model = prepare_model_for_int8_training(model)

In [None]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model
config = LoraConfig(r=32, lora_alpha=64, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")
model = get_peft_model(model, config)
model.print_trainable_parameters()


In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="whisper-large-v2-anime-stream",  
    push_to_hub=True,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,  
    learning_rate=1e-5,
    num_train_epochs=5,
    evaluation_strategy="epoch",
    fp16=True,
    per_device_eval_batch_size=8,
    generation_max_length=128,
    logging_steps=15,
    max_steps=5000,
    eval_steps=500,
    save_steps=500,
    remove_unused_columns=False, 
    label_names=["labels"], 
    optim="adafactor",
    report_to=["tensorboard"],
    predict_with_generate=True,
    gradient_checkpointing=False,
    warmup_steps=100,
)

In [None]:
from transformers import Seq2SeqTrainer, TrainerCallback, TrainingArguments, TrainerState, TrainerControl
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR

class SavePeftModelCallback(TrainerCallback):
    def on_save(
        self,
        args: TrainingArguments,
        state: TrainerState,
        control: TrainerControl,
        **kwargs,
    ):
        checkpoint_folder = os.path.join(args.output_dir, f"{PREFIX_CHECKPOINT_DIR}-{state.global_step}")
        peft_model_path = os.path.join(checkpoint_folder, "adapter_model")
        kwargs["model"].save_pretrained(peft_model_path)
        pytorch_model_path = os.path.join(checkpoint_folder, "pytorch_model.bin")
        if os.path.exists(pytorch_model_path):
            os.remove(pytorch_model_path)
        return control

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=vectorized_datasets["train"],
    # eval_dataset=vectorized_datasets["test"],
    data_collator=data_collator,
    # compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
    callbacks=[SavePeftModelCallback],
)
model.config.use_cache = False 

In [None]:
from transformers import Seq2SeqTrainer, TrainerCallback, TrainingArguments, TrainerState, TrainerControl
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR

class SavePeftModelCallback(TrainerCallback):
    def on_save(
        self,
        args: TrainingArguments,
        state: TrainerState,
        control: TrainerControl,
        **kwargs,
    ):
        checkpoint_folder = os.path.join(args.output_dir, f"{PREFIX_CHECKPOINT_DIR}-{state.global_step}")
        peft_model_path = os.path.join(checkpoint_folder, "adapter_model")
        kwargs["model"].save_pretrained(peft_model_path)
        pytorch_model_path = os.path.join(checkpoint_folder, "pytorch_model.bin")
        if os.path.exists(pytorch_model_path):
            os.remove(pytorch_model_path)
        return control

In [None]:
model.save_pretrained("./pretrained/whisper-large-v2-anime-stream/")
#tokenizer.save_pretrained('./pretrained/')
processor.save_pretrained("./pretrained/whisper-large-v2-anime-stream/")

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()
trainer.save_model("./models/")

In [None]:
peft_model_id = "whisper-med-JP"
model.push_to_hub(peft_model_id)
print(peft_model_id)

In [None]:
from peft import PeftModel, PeftConfig
from transformers import WhisperForConditionalGeneration, Seq2SeqTrainer

peft_model_id = "sin2piusc/whisper-med-JP"
peft_config = PeftConfig.from_pretrained(peft_model_id)
model = WhisperForConditionalGeneration.from_pretrained(peft_config.base_model_name_or_path, load_in_8bit=True, device_map="auto")
model = PeftModel.from_pretrained(model, peft_model_id)

In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import gc

eval_dataloader = DataLoader(vectorized_datasets["test"], batch_size=8, collate_fn=data_collator)
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            generated_tokens = (
                model.generate(
                    input_features=batch["input_features"].to("cuda"),
                    forced_decoder_ids=forced_decoder_ids,
                    max_new_tokens=255,
                )
                .cpu()
                .numpy()
            )
            labels = batch["labels"].cpu().numpy()
            labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
            decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
            metric.add_batch(
                predictions=decoded_preds,
                references=decoded_labels,
            )
    del generated_tokens, labels, batch
    gc.collect()
wer = 100 * metric.compute()
print(f"{wer=}")

In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import gc
from transformers.models.whisper.english_normalizer import BasicTextNormalizer
normalizer = BasicTextNormalizer()
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            generated_tokens= model.generate(input_features=batch["input_features"].to("cuda"),
                                             forced_decoder_ids=forced_decoder_ids,
                                             max_new_tokens=255).cpu().numpy()
            labels = batch["labels"].cpu().numpy()
            labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
            decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
            metric.add_batch(
                predictions=[normalizer(pred).strip() for pred in decoded_preds],
                references=[normalizer(label).strip() for label in decoded_labels],
            )
    del generated_tokens, labels, batch
    gc.collect()
normalized_wer = 100 * metric.compute()
print(f"{normalized_wer=}")

In [None]:
for pred,label in zip(decoded_preds,decoded_labels):
  print(f"{pred=}, {label=}")

In [None]:
import torch
import gradio as gr
from transformers import (
    AutomaticSpeechRecognitionPipeline,
    WhisperForConditionalGeneration,
    WhisperTokenizer,
    WhisperProcessor,
)
from peft import PeftModel, PeftConfig

peft_model_id = "sin2piusc/whisper-med-JP"
language = "Japanese"
task = "transcribe"
peft_config = PeftConfig.from_pretrained(peft_model_id)
model = WhisperForConditionalGeneration.from_pretrained(
    peft_config.base_model_name_or_path, load_in_8bit=True, device_map="auto"
)

model = PeftModel.from_pretrained(model, peft_model_id)
tokenizer = WhisperTokenizer.from_pretrained(peft_config.base_model_name_or_path, language=language, task=task)
processor = WhisperProcessor.from_pretrained(peft_config.base_model_name_or_path, language=language, task=task)
feature_extractor = processor.feature_extractor
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)
pipe = AutomaticSpeechRecognitionPipeline(model=model, tokenizer=tokenizer, feature_extractor=feature_extractor)

def transcribe(audio):
    with torch.cuda.amp.autocast():
        text = pipe(audio, generate_kwargs={"forced_decoder_ids": forced_decoder_ids}, max_new_tokens=255)["text"]
    return text

iface = gr.Interface(
    fn=transcribe,
    inputs=[
        gr.inputs.Audio(source="microphone", type="filepath", optional=True),
        gr.inputs.Audio(source="upload", type="filepath", optional=True),
    ],
    outputs="text",
    title="PEFT LoRA + INT8 Whisper medium Japanese",
    description="Whisper medium PEFT-LoRA+INT8 fine-tuned with Common Voice 16 + Google Fleurs + Anime Speech - Japanese).",
)

iface.launch(share=True)