In [None]:
import torch
import evaluate # For metrics
from datasets import DatasetDict, Audio, Dataset
from transformers import (
    WhisperFeatureExtractor,
    WhisperTokenizer,
    WhisperProcessor,
    WhisperForConditionalGeneration,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    DataCollatorForSeq2Seq
)
import os
from pathlib import Path
import pandas as pd

In [None]:
from pathlib import Path


In [None]:
DATA_DIR = Path("../data")
CSV_PATH = (DATA_DIR / "musiccaps-public.csv").resolve()
AUDIO_DIR = DATA_DIR / "audio" 
AUDIO_EXTENSION = ".wav"


In [None]:
print("Loading metadata using pandas...")
try:
    df = pd.read_csv(CSV_PATH)
    print(f"CSV loaded successfully with pandas. Shape: {df.shape}")

    df['audio_path'] = [str(AUDIO_DIR / f"audio_{index}{AUDIO_EXTENSION}") for index in df.index]
    
    # Convert the pandas DataFrame to a datasets.Dataset object
    full_dataset = Dataset.from_pandas(df)
    print("Converted pandas DataFrame to datasets.Dataset object:")

    print(full_dataset)

except FileNotFoundError:
    print(f"ERROR (pandas): File not found during read_csv at {CSV_PATH}")
    raise
except Exception as e:
    print(f"Failed during pandas load or Dataset conversion from {CSV_PATH}:")
    raise 

In [None]:
print("\nFiltering dataset for existing audio files...")

def check_audio_file_exists(example):
    from pathlib import Path
    return Path(example['audio_path']).exists()

initial_count = len(full_dataset)
dataset_with_audio = full_dataset.filter(check_audio_file_exists, num_proc=4)
final_count = len(dataset_with_audio)
print(f"full dataset size: {final_count}")

In [None]:
TARGET_SAMPLING_RATE = 16000
print(f"\nCasting audio column and resampling to {TARGET_SAMPLING_RATE}Hz...")
dataset_with_audio = dataset_with_audio.cast_column(
    "audio_path", # The column containing paths
    Audio(sampling_rate=TARGET_SAMPLING_RATE) # Target object type and sampling rate
)
dataset_with_audio = dataset_with_audio.rename_column("audio_path", "audio")
print(dataset_with_audio)

In [None]:
MODEL_NAME = "openai/whisper-small" # Or "openai/whisper-base", etc.
LANGUAGE = "English" 
TASK = "transcribe" 

try:
    # Load the feature extractor (processes audio input)
    # We load it now even though we skipped audio loading, as it's part of the processor bundle
    print("Loading feature extractor...")
    feature_extractor = WhisperFeatureExtractor.from_pretrained(MODEL_NAME)

    print("Loading tokenizer...")
    tokenizer = WhisperTokenizer.from_pretrained(MODEL_NAME, language=LANGUAGE, task=TASK)

    print("Loading processor...")
    processor = WhisperProcessor.from_pretrained(MODEL_NAME, language=LANGUAGE, task=TASK)

    print("\nProcessor components loaded successfully:")
    print(f"  Feature Extractor: {type(feature_extractor)}")
    print(f"  Tokenizer: {type(tokenizer)}")
    print(f"  Processor: {type(processor)}")

except Exception as e:
    print(f"Error: {e}")
    print("-------------------------------")

In [None]:
def prepare_dataset(batch):
    """
    Prepares a batch of data for the Whisper model.

    Input batch EXPECTS:
      - An 'audio' column: Containing dictionaries from datasets.Audio. Example:
          {'path': '...', 'array': numpy.ndarray, 'sampling_rate': 16000}
      - A 'caption' column: Containing the text transcriptions/captions.

    Output batch CONTAINS:
      - 'input_features': Processed audio data (log-Mel spectrogram) for the model.
      - 'labels': Tokenized text caption IDs for the model.
    """

    
    audio_data = batch["audio"]
    # Extract log-Mel spectrogram features from the raw audio array
    batch["input_features"] = feature_extractor(
        audio_data["array"], sampling_rate=audio_data["sampling_rate"]
    ).input_features[0]
    # Note: We take [0] because feature_extractor processes one sample at a time
        
    # --- 2. Process Text ---
    captions = batch["caption"]
    current_caption = captions 
    processed_caption = "" if None else str(current_caption)

    batch["labels"] = tokenizer(processed_caption).input_ids
    return batch

In [None]:
print("\nApplying dataset preparation function...")
prepared_dataset = dataset_with_audio.map(
    prepare_dataset,
    remove_columns=dataset_with_audio.column_names, # Remove old columns
    num_proc=1
)
print("Dataset preparation complete.")
print(prepared_dataset)

In [None]:
# Split into training and evaluation sets (e.g., 90% train, 10% test)
print("\nSplitting dataset into train and test sets...")
dataset_splits = prepared_dataset.train_test_split(test_size=0.1, seed=456)
train_dataset = dataset_splits["train"]
eval_dataset = dataset_splits["test"]
print(f"Train dataset size: {len(train_dataset)}")
print(f"Eval dataset size: {len(eval_dataset)}")

In [None]:
# --- Step 7: Initialize Data Collator ---
from transformers import DataCollatorForSeq2Seq
import dataclasses
from typing import Any, Dict, List, Union

@dataclasses.dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    """
    Custom data collator that correctly pads `input_features` and `labels`
    for speech-to-text models using the processor.
    """
    processor: Any # Should be WhisperProcessor or similar

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Split inputs and labels since they have to be of different lengths
        # and need different padding methods.
        # `features` is a list of dicts, e.g. [{'input_features': ..., 'labels': ...}, ...]
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        label_features = [{"input_ids": feature["labels"]} for feature in features] # Use 'input_ids' key for tokenizer padding

        # Pad the audio features (input_features)
        # The feature_extractor's `pad` method handles this correctly
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # Pad the text labels (labels)
        # The tokenizer's `pad` method handles this correctly
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # Replace tokenizer's pad_token_id with -100 in labels to ignore padding in loss calculation
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # If the model expects decoder_input_ids, create them by shifting labels
        # (Whisper models typically handle this internally when labels are provided)
        # batch["decoder_input_ids"] = shift_tokens_right(labels, self.processor.tokenizer.pad_token_id)
        # Note: Usually not needed explicitly for WhisperForConditionalGeneration if labels are passed

        # Add the padded labels to the batch dictionary
        batch["labels"] = labels

        return batch

print("\nInitializing Custom Data Collator...")
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)
print("Custom Data Collator initialized.")

In [None]:
# Define Evaluation Metrics ---
print("\nLoading ROUGE metric...")
metric = evaluate.load("rouge")

import numpy as np

def compute_metrics(eval_pred):
    """Computes ROUGE scores from model predictions."""
    predictions, labels = eval_pred
    # Decode predictions
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    # Decode labels, replacing -100 padding token
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    # Simple cleaning
    decoded_preds = [pred.strip() for pred in decoded_preds]
    decoded_labels = [label.strip() for label in decoded_labels]
    # Compute ROUGE
    result = metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    # Multiply scores by 100
    result = {key: value * 100 for key, value in result.items()}
    # Round results
    result = {k: round(v, 4) for k, v in result.items()}
    return result

print("Compute metrics function defined.")

In [None]:
# Load Pretrained Model

from transformers import WhisperForConditionalGeneration
import torch # Usually needed implicitly by transformers

print(f"\nLoading model '{MODEL_NAME}'...")
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME)
model.config.use_cache = True # Enable cache for generation

# Move model to GPU if available
if torch.cuda.is_available():
    device = torch.device("cuda")
    model.to(device)
    print("Model moved to GPU.")
else:
    device = torch.device("cpu")
    print("No GPU detected. Model running on CPU.")

print(f"Model '{MODEL_NAME}' loaded successfully on {model.device}.")

In [None]:
from transformers import Seq2SeqTrainingArguments

# --- Step 10: Define Training Arguments ---
print("\nDefining Training Arguments...")
OUTPUT_DIR_MODEL = "./whisper-musiccaps-finetuned-local" # Model output directory

training_args = Seq2SeqTrainingArguments(
    output_dir=OUTPUT_DIR_MODEL,
    num_train_epochs=3,
    per_device_train_batch_size=8, # Lower if you encounter CUDA Out-of-Memory errors
    per_device_eval_batch_size=8,
    learning_rate=1e-5, # $[1 \times 10^{-5}]$
    weight_decay=0.01,
    warmup_steps=500,
    logging_dir=f"{OUTPUT_DIR_MODEL}/logs",
    logging_strategy="steps",
    logging_steps=25,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    fp16=torch.cuda.is_available(), # Enable mixed precision only if GPU is available
    gradient_accumulation_steps=2, # Effective batch size = 8 * num_gpus * 2
    # gradient_checkpointing=True, # Uncomment if memory is very limited (slows training)
    predict_with_generate=True,
    generation_max_length=225,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss", # Or "eval_rougeL" etc. if using ROUGE
    greater_is_better=False, # False for loss, True for ROUGE
    remove_unused_columns=False, # Important: Keep as False after custom processing
    label_names=["labels"],
    report_to=["tensorboard"],
    disable_tqdm=False,
)
print("Training Arguments defined.")

In [None]:
# Initialize Trainer

from transformers import Seq2SeqTrainer

print("\nInitializing Trainer...")
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    # --- Pass the ACTUAL datasets ---
    train_dataset=train_dataset, # Use the prepared training data
    eval_dataset=eval_dataset,   # Use the prepared evaluation data
    compute_metrics=compute_metrics,
    tokenizer=processor.tokenizer # Pass tokenizer for proper saving
)
print("Trainer initialized successfully.")

In [None]:
# --- Step 12: Start Training ---
print("\n--- Starting Training ---")
try:
    train_result = trainer.train()
    print("Training finished.")

    trainer.save_model() # Saves the tokenizer too
    metrics = train_result.metrics
    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)
    trainer.save_state()
except Exception as e:
    print(f"An error occurred during training: {e}")
    import traceback
    traceback.print_exc()

# Test Run 

In [None]:
import torchaudio

In [None]:
!yt-dlp --extract-audio --postprocessor-args "-ss 00:05:00 -to 00:05:10" https://www.youtube.com/watch?v=_-kssA-FOzU


In [None]:
def load_audio(path, target_sr=16000):
    waveform, sr = torchaudio.load(path)  # shape: [channels, samples]

    # Convert to mono if stereo
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    # Resample if needed
    if sr != target_sr:
        waveform = torchaudio.transforms.Resample(sr, target_sr)(waveform)

    return waveform.squeeze()  # shape: [num_samples]

In [None]:
from transformers import WhisperForConditionalGeneration, WhisperProcessor
import os
from pathlib import Path

In [None]:
from pathlib import Path
from transformers import WhisperForConditionalGeneration, WhisperProcessor

checkpoint_path = Path("checkpoint-351").resolve()  # absolute + OS-safe path

model = WhisperForConditionalGeneration.from_pretrained(checkpoint_path)
processor = WhisperProcessor.from_pretrained(checkpoint_path)


In [None]:
checkpoint_path

In [None]:
model = WhisperForConditionalGeneration.from_pretrained(
    "./checkpoint-351", local_files_only=True
)
processor = WhisperProcessor.from_pretrained(
    "./checkpoint-351", local_files_only=True
)


In [None]:
model.save_pretrained("C:/Users/erics/OneDrive/Desktop/transcriber_server/fine_tuned_model/")
processor.save_pretrained("C:/Users/erics/OneDrive/Desktop/transcriber_server/fine_tuned_model/")

In [None]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration

# Load the processor from the original Whisper Small model
processor = WhisperProcessor.from_pretrained("openai/whisper-small")

# # Load your fine-tuned model from your checkpoint directory
# model = WhisperForConditionalGeneration.from_pretrained("checkpoint-351", local_files_only=True)


In [None]:
model = WhisperForConditionalGeneration.from_pretrained("/checkpoint-351")

In [None]:
from transformers import WhisperProcessor

# Load the original processor (with language and task settings if needed)
processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="English", task="transcribe")

# Save the processor's configuration into your checkpoint folder.
# This will add files such as preprocessor_config.json to the folder.
processor.save_pretrained("./whisper-musiccaps-finetuned-local/checkpoint351")


In [None]:
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Now load the processor and model locally from your updated checkpoint folder.
processor = WhisperProcessor.from_pretrained("checkpoint351")
model = WhisperForConditionalGeneration.from_pretrained("checkpoint351").to(device)

# (Optional) Test by printing the types:
print("Processor loaded as:", type(processor))
print("Model loaded as:", type(model))


In [None]:
%pwd

In [2]:
from safetensors.torch import load_file

# Replace with your safetensors file path.
file_path = "C:/Users/erics/OneDrive/Desktop/several-ducks-datahacks25/music_analyzer/whisper-musiccaps-finetuned-local/model.safetensors"

# Load the file.
state_dict = load_file(file_path, device="cpu")

# Print out the keys and shapes.
for key, tensor in state_dict.items():
    print(f"{key}: {tensor.shape}, dtype: {tensor.dtype}")


model.decoder.embed_positions.weight: torch.Size([448, 768]), dtype: torch.float32
model.decoder.embed_tokens.weight: torch.Size([51865, 768]), dtype: torch.float32
model.decoder.layer_norm.bias: torch.Size([768]), dtype: torch.float32
model.decoder.layer_norm.weight: torch.Size([768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.k_proj.weight: torch.Size([768, 768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.out_proj.bias: torch.Size([768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.out_proj.weight: torch.Size([768, 768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.q_proj.bias: torch.Size([768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.q_proj.weight: torch.Size([768, 768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.v_proj.bias: torch.Size([768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn.v_proj.weight: torch.Size([768, 768]), dtype: torch.float32
model.decoder.layers.0.encoder_attn_

In [3]:
def remap_key(old_key: str) -> str:
    # Remove the "model." prefix if present.
    if old_key.startswith("model."):
        old_key = old_key[len("model."):]
    
    # Rename "embed_positions" to "positional_embedding"
    old_key = old_key.replace("embed_positions", "positional_embedding")
    
    # Rename "embed_tokens" to "token_embedding"
    old_key = old_key.replace("embed_tokens", "token_embedding")
    
    # Optionally, if there are differences in normalization naming,
    # replace "layer_norm" with "ln"
    old_key = old_key.replace("layer_norm", "ln")
    
    return old_key

# Example: print a few remapped keys.
checkpoint_keys = [
    "model.decoder.embed_positions.weight",
    "model.decoder.embed_tokens.weight",
    "model.decoder.layer_norm.bias",
    "model.decoder.layer_norm.weight"
]

for key in checkpoint_keys:
    print(f"{key} -> {remap_key(key)}")


model.decoder.embed_positions.weight -> decoder.positional_embedding.weight
model.decoder.embed_tokens.weight -> decoder.token_embedding.weight
model.decoder.layer_norm.bias -> decoder.ln.bias
model.decoder.layer_norm.weight -> decoder.ln.weight


In [4]:
import whisper
from safetensors.torch import load_file as load_safetensors

# Load base model
model = whisper.load_model("small")
base_keys = set(model.state_dict().keys())

# Load your checkpoint state dict (from safetensors)
checkpoint_path = "C:/Users/erics/OneDrive/Desktop/several-ducks-datahacks25/music_analyzer/whisper-musiccaps-finetuned-local/model.safetensors"
ckpt_state = load_safetensors(checkpoint_path, device="cpu")
mapped_ckpt = {remap_key(k): v for k, v in ckpt_state.items()}

mapped_keys = set(mapped_ckpt.keys())

missing = base_keys - mapped_keys
unexpected = mapped_keys - base_keys

print("Missing keys after mapping:", missing)
print("Unexpected keys after mapping:", unexpected)


Missing keys after mapping: {'encoder.blocks.2.attn.value.bias', 'decoder.positional_embedding', 'decoder.blocks.6.attn.out.bias', 'encoder.blocks.7.attn_ln.bias', 'encoder.blocks.7.attn.value.bias', 'decoder.blocks.7.cross_attn_ln.bias', 'encoder.blocks.6.mlp.2.bias', 'decoder.blocks.7.attn_ln.bias', 'encoder.blocks.8.mlp.0.weight', 'decoder.blocks.10.cross_attn_ln.weight', 'decoder.blocks.5.attn.query.weight', 'encoder.blocks.0.mlp.0.bias', 'decoder.blocks.11.cross_attn.key.weight', 'encoder.blocks.3.attn_ln.bias', 'decoder.blocks.9.cross_attn.value.weight', 'encoder.blocks.10.attn.query.weight', 'decoder.blocks.0.cross_attn.value.weight', 'encoder.blocks.10.attn.value.bias', 'encoder.blocks.4.attn.value.weight', 'encoder.blocks.9.attn_ln.bias', 'encoder.blocks.10.attn.value.weight', 'encoder.blocks.3.attn.query.bias', 'decoder.blocks.7.cross_attn.query.weight', 'decoder.blocks.8.cross_attn.query.bias', 'decoder.blocks.1.cross_attn.query.weight', 'encoder.blocks.11.attn.out.weight', 

In [7]:
from transformers import WhisperForConditionalGeneration
from safetensors.torch import load_file as load_safetensors
import torch

# Step 1: Load the base model from the original source
base_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

# Step 2: Load your fine-tuned checkpoint (safetensors)
ft_state = load_safetensors("C:/Users/erics/OneDrive/Desktop/several-ducks-datahacks25/music_analyzer/whisper-musiccaps-finetuned-local/model.safetensors", device="cpu")

# Optional: Inspect the keys of both state dicts if needed
base_keys = set(base_model.state_dict().keys())
ft_keys = set(ft_state.keys())
print("Missing in fine-tuned state:", base_keys - ft_keys)
print("Extra in fine-tuned state:", ft_keys - base_keys)

# Step 3: Merge fine-tuned weights into the base model's state dict
# This assumes that the keys now align; if they don’t perfectly, you may need a remapping function.
base_state = base_model.state_dict()
base_state.update(ft_state)
base_model.load_state_dict(base_state)

# Now you can use `base_model` for inference or further fine-tuning.


Missing in fine-tuned state: {'proj_out.weight'}
Extra in fine-tuned state: set()


<All keys matched successfully>

In [8]:
from transformers import WhisperForConditionalGeneration
from safetensors.torch import load_file as load_safetensors

# Load the base model from openai/whisper-small.
base_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")
base_state = base_model.state_dict()

# Load your fine-tuned weights.
ft_state = load_safetensors("C:/Users/erics/OneDrive/Desktop/several-ducks-datahacks25/music_analyzer/whisper-musiccaps-finetuned-local/model.safetensors", device="cpu")

# If 'proj_out.weight' is missing, add it from the base state.
if 'proj_out.weight' not in ft_state:
    ft_state['proj_out.weight'] = base_state['proj_out.weight']

# Merge the states and load them into the model.
base_state.update(ft_state)
base_model.load_state_dict(base_state)

base_model.eval()


WhisperForConditionalGeneration(
  (model): WhisperModel(
    (encoder): WhisperEncoder(
      (conv1): Conv1d(80, 768, kernel_size=(3,), stride=(1,), padding=(1,))
      (conv2): Conv1d(768, 768, kernel_size=(3,), stride=(2,), padding=(1,))
      (embed_positions): Embedding(1500, 768)
      (layers): ModuleList(
        (0-11): 12 x WhisperEncoderLayer(
          (self_attn): WhisperSdpaAttention(
            (k_proj): Linear(in_features=768, out_features=768, bias=False)
            (v_proj): Linear(in_features=768, out_features=768, bias=True)
            (q_proj): Linear(in_features=768, out_features=768, bias=True)
            (out_proj): Linear(in_features=768, out_features=768, bias=True)
          )
          (self_attn_layer_norm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (activation_fn): GELUActivation()
          (fc1): Linear(in_features=768, out_features=3072, bias=True)
          (fc2): Linear(in_features=3072, out_features=768, bias=True)
        

In [10]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import librosa
import torch

# Assume base_model is your fine-tuned model loaded earlier and is in eval mode.
# For consistency, load the processor from the same source.
processor = WhisperProcessor.from_pretrained("openai/whisper-small")

# Load an audio file (ensure it's sampled at 16kHz for Whisper)
audio_file = "C:/Users/erics/OneDrive/Desktop/several-ducks-datahacks25/music_analyzer/wav_folder/audio_1682.wav"
audio, sr = librosa.load(audio_file, sr=16000)

# Use the processor's feature extractor to convert the audio waveform into input features.
# The processor expects the audio to be a NumPy array and will return a tensor.
inputs = processor.feature_extractor(audio, sampling_rate=sr, return_tensors="pt")
input_features = inputs.input_features  # shape: (batch_size, feature_dim, time)

# If your model is on a GPU, make sure to move the inputs to the same device:
input_features = input_features.to(base_model.device)

# Generate model outputs. The generate() method will use the model's beam search or other decoding strategy.
predicted_ids = base_model.generate(input_features)

# Decode the predicted token IDs into text using the processor's tokenizer.
transcription = processor.tokenizer.batch_decode(predicted_ids, skip_special_tokens=True)[0]

print("Transcription:", transcription)


Due to a bug fix in https://github.com/huggingface/transformers/pull/28687 transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English.This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


Transcription: A male vocalist sings this melodious song. The tempo is slow with a lot of reverb and a lot of reverb. The song is melodic and emotional. The audio quality is poor.
