In [2]:
import os
import torch
import pandas as pd
import torchaudio
from datasets import Dataset, DatasetDict, load_dataset, Audio
from peft import get_peft_model, LoraConfig, TaskType 
from transformers import (
    WhisperProcessor,
    WhisperForConditionalGeneration,
    WhisperFeatureExtractor,
    WhisperTokenizer,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments
)
from sklearn.model_selection import train_test_split
from torchaudio.transforms import Resample
import numpy as np
from dataclasses import dataclass

In [3]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
# Load dataset from Hugging Face Hub
dataset = load_dataset("irasalsabila/javanese_asr_dataset_20k")

# Access train and test splits
train_dataset = dataset["train"]
test_dataset = dataset["test"]

In [5]:
audio_dir = "javanese_data"
model_id = "openai/whisper-tiny"
language = "javanese"
task = "transcribe"

In [6]:
# Load Whisper feature extractor, tokenizer, and processor
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_id)
# model = WhisperForConditionalGeneration.from_pretrained(model_id)
# model.to(device)
tokenizer = WhisperTokenizer.from_pretrained(model_id, language=language, task=task)
processor = WhisperProcessor.from_pretrained(model_id, language=language, task=task)

In [7]:
MAX_LENGTH = 30 * 16000  # 30 seconds * 16,000 samples/second

def load_audio(file_name):
    file_path = os.path.join(audio_dir, file_name + ".flac")

    try:
        speech, sr = torchaudio.load(file_path)

        # Resample if necessary
        if sr != 16000:
            resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
            speech = resampler(speech)

        speech = speech.squeeze(0)  # Remove extra dimensions

        # Ensure consistent length (pad or truncate)
        if speech.shape[0] > MAX_LENGTH:
            speech = speech[:MAX_LENGTH]  # Truncate
        else:
            pad = MAX_LENGTH - speech.shape[0]
            speech = torch.cat([speech, torch.zeros(pad)])  # Pad with zeros

        return speech

    except Exception as e:
        print(f"⚠️ Error loading {file_path}: {e}")
        return None 

In [8]:
def prepare_dataset(batch):
    speeches = []
    valid_labels = []

    for filename, label in zip(batch["filename"], batch["label"]):
        speech = load_audio(filename)  # Load audio
        if speech is not None:
            speeches.append(speech)
            valid_labels.append(label)

    if len(speeches) == 0:
        return {}  

    speeches = torch.stack(speeches).to(device)

    inputs = feature_extractor(speeches.cpu().numpy(), sampling_rate=16000, return_tensors="pt", padding=True)

    labels = tokenizer(valid_labels, return_tensors="pt", padding=True).input_ids
    labels = [l.tolist() for l in labels] 

    return {
        "input_features": inputs.input_features.squeeze(0).to(device),  
        "labels": labels  
    }

In [9]:
train_dataset = train_dataset.map(prepare_dataset, num_proc=1, batched=True, batch_size=16)
test_dataset = test_dataset.map(prepare_dataset, num_proc=1, batched=True, batch_size=16)

In [10]:
# from pprint import pprint
# pprint(dataset['train'][0])

In [11]:
train_dataset

Dataset({
    features: ['filename', 'userid', 'label', 'input_features', 'labels'],
    num_rows: 16000
})

In [12]:
# # Save the processed dataset
train_dataset.save_to_disk("processed_data/train_dataset")
test_dataset.save_to_disk("processed_data/test_dataset")

# # from datasets import load_from_disk
# # train_dataset = load_from_disk("processed_data/train_dataset")
# # test_dataset = load_from_disk("processed_data/test_dataset")

Saving the dataset (31/31 shards): 100%|██████████| 16000/16000 [04:52<00:00, 54.69 examples/s]
Saving the dataset (8/8 shards): 100%|██████████| 4000/4000 [01:11<00:00, 56.33 examples/s]


In [13]:
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Handle audio input padding
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # Handle text input padding
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # Replace padding tokens with -100 for loss masking
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # Remove BOS token if present
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [14]:
# Initialize data collator
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=tokenizer.bos_token_id
)

In [15]:
import evaluate
# Load WER evaluation metric
metric = evaluate.load("wer")

In [16]:
# Function to compute Word Error Rate (WER)
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids if hasattr(pred, "label_ids") else None

    # Replace -100 with the pad token ID
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # Decode predictions and labels
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    # Compute WER score
    wer = 100 * metric.compute(predictions=pred_str, references=label_str)
    return {"wer": wer}

In [17]:
from transformers import WhisperForConditionalGeneration, BitsAndBytesConfig

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,  
    bnb_4bit_compute_dtype=torch.bfloat16  
)

model = WhisperForConditionalGeneration.from_pretrained(model_id, \
    quantization_config=quantization_config, device_map="auto")

In [18]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [19]:
from peft import prepare_model_for_kbit_training

model = prepare_model_for_kbit_training(model)

In [20]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

config = LoraConfig(r=2, lora_alpha=4, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")

model = get_peft_model(model, config)
model.print_trainable_parameters()

trainable params: 36,864 || all params: 37,797,504 || trainable%: 0.0975


In [21]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="temp",  # change to a repo name of your choice
    per_device_train_batch_size=4,
    gradient_accumulation_steps=2,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-3,
    warmup_steps=50,
    num_train_epochs=3,
    eval_strategy="epoch",
    fp16=True,
    per_device_eval_batch_size=4,
    generation_max_length=128,
    logging_steps=250,
    eval_steps=2000,
    remove_unused_columns=False,  # required as the PeftModel forward doesn't have the signature of the wrapped model's forward
    label_names=["labels"],  # same reason as above
)

In [22]:
from transformers import Seq2SeqTrainer, TrainerCallback, TrainingArguments, TrainerState, TrainerControl
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR


class SavePeftModelCallback(TrainerCallback):
    def on_save(
        self,
        args: TrainingArguments,
        state: TrainerState,
        control: TrainerControl,
        **kwargs,
    ):
        checkpoint_folder = os.path.join(args.output_dir, f"{PREFIX_CHECKPOINT_DIR}-{state.global_step}")

        peft_model_path = os.path.join(checkpoint_folder, "adapter_model")
        kwargs["model"].save_pretrained(peft_model_path)

        pytorch_model_path = os.path.join(checkpoint_folder, "pytorch_model.bin")
        if os.path.exists(pytorch_model_path):
            os.remove(pytorch_model_path)
        return control


trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
    # compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
    callbacks=[SavePeftModelCallback],
)
model.config.use_cache = False 

  trainer = Seq2SeqTrainer(


In [23]:
trainer.train()

  return fn(*args, **kwargs)


Epoch,Training Loss,Validation Loss
1,1.0544,1.061775
2,1.0274,1.009856
3,0.9917,0.992899


  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)
  return fn(*args, **kwargs)


TrainOutput(global_step=6000, training_loss=1.0966273956298829, metrics={'train_runtime': 4654.1162, 'train_samples_per_second': 10.313, 'train_steps_per_second': 1.289, 'total_flos': 1.18425452544e+18, 'train_loss': 1.0966273956298829, 'epoch': 3.0})

In [27]:
model = get_peft_model(model, config)
print(model)  # ✅ Check if model is a PEFT model

print("Model ID:", model_id)  # ✅ Check model_id value

PeftModel(
  (base_model): LoraModel(
    (model): PeftModel(
      (base_model): LoraModel(
        (model): PeftModel(
          (base_model): LoraModel(
            (model): WhisperForConditionalGeneration(
              (model): WhisperModel(
                (encoder): WhisperEncoder(
                  (conv1): Conv1d(80, 384, kernel_size=(3,), stride=(1,), padding=(1,))
                  (conv2): Conv1d(384, 384, kernel_size=(3,), stride=(2,), padding=(1,))
                  (embed_positions): Embedding(1500, 384)
                  (layers): ModuleList(
                    (0-3): 4 x WhisperEncoderLayer(
                      (self_attn): WhisperSdpaAttention(
                        (k_proj): Linear4bit(in_features=384, out_features=384, bias=False)
                        (v_proj): lora.Linear4bit(
                          (base_layer): Linear4bit(in_features=384, out_features=384, bias=True)
                          (lora_dropout): ModuleDict(
                            (def

In [28]:
print(model.peft_config)

{'default': LoraConfig(peft_type=<PeftType.LORA: 'LORA'>, auto_mapping=None, base_model_name_or_path=None, revision=None, task_type=None, inference_mode=False, r=2, target_modules={'q_proj', 'v_proj'}, lora_alpha=4, lora_dropout=0.05, fan_in_fan_out=False, bias='none', use_rslora=False, modules_to_save=None, init_lora_weights=True, layers_to_transform=None, layers_pattern=None, rank_pattern={}, alpha_pattern={}, megatron_config=None, megatron_core='megatron.core', loftq_config={}, use_dora=False, layer_replication=None, runtime_config=LoraRuntimeConfig(ephemeral_gpu_offload=False))}


In [30]:
print(model.peft_config["default"].peft_type.value)  # Should print "LORA"

LORA


In [34]:
if isinstance(model.peft_config, dict) and "default" in model.peft_config:
    peft_type = model.peft_config["default"].peft_type.value 
else:
    peft_type = "lora" 

peft_model_id = f"test/{model_id}-{peft_type}".replace("/", "-")
print(peft_model_id) 

test-openai-whisper-tiny-LORA


In [35]:
model.save_pretrained(peft_model_id)
print(f"LoRA model saved to: {peft_model_id}")

LoRA model saved to: test-openai-whisper-tiny-LORA


In [40]:
from peft import PeftModel, PeftConfig
from transformers import WhisperForConditionalGeneration, Seq2SeqTrainer

peft_model_id = "test-openai-whisper-tiny-LORA"
peft_config = PeftConfig.from_pretrained(peft_model_id)

In [46]:
peft_config

LoraConfig(peft_type=<PeftType.LORA: 'LORA'>, auto_mapping={'base_model_class': 'PeftModel', 'parent_library': 'peft.peft_model'}, base_model_name_or_path=None, revision=None, task_type=None, inference_mode=True, r=2, target_modules={'q_proj', 'v_proj'}, lora_alpha=4, lora_dropout=0.05, fan_in_fan_out=False, bias='none', use_rslora=False, modules_to_save=None, init_lora_weights=True, layers_to_transform=None, layers_pattern=None, rank_pattern={}, alpha_pattern={}, megatron_config=None, megatron_core='megatron.core', loftq_config={}, use_dora=False, layer_replication=None, runtime_config=LoraRuntimeConfig(ephemeral_gpu_offload=False))

In [42]:
# model = WhisperForConditionalGeneration.from_pretrained(
#     peft_config.base_model_name_or_path, quantization_config=quantization_config, 
#     device_map="auto"
# )
base_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")  

# model = PeftModel.from_pretrained(model, peft_model_id)
model = PeftModel.from_pretrained(base_model, peft_model_id)


In [49]:
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import gc

# Ensure the model is on GPU
model = model.to("cuda")  # ✅ Move entire model to CUDA
model_dtype = next(model.parameters()).dtype  # Get model dtype

# Ensure LoRA layers are on GPU (sometimes required)
if hasattr(model, "base_model"):
    model.base_model.to("cuda")

eval_dataloader = DataLoader(test_dataset, batch_size=8, collate_fn=data_collator)

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.no_grad():  # No need for autocast since Whisper already handles mixed precision
        # Ensure correct dtype for input_features
        input_features = batch["input_features"].to("cuda").to(model_dtype)  # Convert to model dtype
        decoder_input_ids = batch["labels"][:, :4].to("cuda")

        generated_tokens = (
            model.generate(
                input_features=input_features,
                decoder_input_ids=decoder_input_ids,
                max_new_tokens=255,
            )
            .cpu()
            .numpy()
        )

        labels = batch["labels"].cpu().numpy()
        labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
        decoded_preds = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

        metric.add_batch(
            predictions=decoded_preds,
            references=decoded_labels,
        )

    del generated_tokens, labels, batch
    gc.collect()

wer = 100 * metric.compute()
print(f"{wer=}")

  0%|          | 0/500 [00:00<?, ?it/s]Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.43.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
100%|██████████| 500/500 [10:40<00:00,  1.28s/it]


wer=128.9467055309655
