In [31]:
# In your Jupyter notebook cell:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [1]:
# !pip install --upgrade pip
# !pip install --upgrade datasets[audio] transformers accelerate evaluate jiwer tensorboard gradio


In [2]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration, TrainingArguments, Trainer
from datasets import Dataset, Audio
import torch
import json
import os

device = "cuda" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
print(f"Using device: {device}")


Using device: cpu


# Dataset creation

Using the guideline: https://huggingface.co/blog/fine-tune-whisper

In [3]:
audio_path = "../creolese-audio-dataset/finetune_eligible"
transcription_path = "../creolese-audio-dataset/finetune_eligible/transcripts.json"

# Load transcripts JSON
with open(transcription_path, 'r') as f:
    transcripts = json.load(f)

# Create a list of dicts pairing audio files and transcripts
data = []
for item in transcripts:
    audio_file = os.path.join(audio_path, item['audio'])
    if os.path.exists(audio_file):
        print(f"Found file: {audio_file}")
        data.append({'audio': audio_file, 'text': item['text']})
    else:
        print(f"Missing file: {audio_file}")

Found file: ../creolese-audio-dataset/finetune_eligible/1a.wav
Found file: ../creolese-audio-dataset/finetune_eligible/1b.wav
Found file: ../creolese-audio-dataset/finetune_eligible/1c.wav
Found file: ../creolese-audio-dataset/finetune_eligible/1d.wav
Found file: ../creolese-audio-dataset/finetune_eligible/1e.wav
Found file: ../creolese-audio-dataset/finetune_eligible/3a.wav
Found file: ../creolese-audio-dataset/finetune_eligible/3b.wav
Found file: ../creolese-audio-dataset/finetune_eligible/2.wav
Found file: ../creolese-audio-dataset/finetune_eligible/4a.wav
Found file: ../creolese-audio-dataset/finetune_eligible/4b.wav
Found file: ../creolese-audio-dataset/finetune_eligible/4c.wav
Found file: ../creolese-audio-dataset/finetune_eligible/4d.wav
Found file: ../creolese-audio-dataset/finetune_eligible/4e.wav
Found file: ../creolese-audio-dataset/finetune_eligible/5a.wav
Found file: ../creolese-audio-dataset/finetune_eligible/5b.wav
Found file: ../creolese-audio-dataset/finetune_eligible/

In [4]:
dataset = Dataset.from_list(data)

# Cast the audio column to automatically load audio
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
print(dataset)

split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"]
eval_dataset = split_dataset["test"]

print(train_dataset)
print(eval_dataset)

Dataset({
    features: ['audio', 'text'],
    num_rows: 265
})
Dataset({
    features: ['audio', 'text'],
    num_rows: 212
})
Dataset({
    features: ['audio', 'text'],
    num_rows: 53
})


In [5]:



def prepare_dataset(batch):
        # load and resample audio data from 48 to 16kHz
        audio = batch["audio"]

        # compute log-Mel input features from input audio array
        batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

        # encode target text to label ids
        batch["labels"] = tokenizer(batch["text"]).input_ids
        return batch

#

In [6]:
from transformers import WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor

feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-base")

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-base", task="transcribe")

processor = WhisperProcessor.from_pretrained("openai/whisper-base", task="transcribe")

In [7]:
import librosa
import numpy as np
from datasets import Dataset

def augment_audio(audio_array, sr=16000):
    """Apply random augmentations to audio"""
    augmented_versions = []

    # Original
    augmented_versions.append(audio_array)

    # Speed change (0.9x to 1.1x speed)
    speed_factor = np.random.uniform(0.9, 1.1)
    speed_changed = librosa.effects.time_stretch(audio_array, rate=speed_factor)
    augmented_versions.append(speed_changed)

    # Pitch shift (-2 to +2 semitones)
    pitch_shift = np.random.randint(-2, 3)
    pitch_shifted = librosa.effects.pitch_shift(audio_array, sr=sr, n_steps=pitch_shift)
    augmented_versions.append(pitch_shifted)

    # Add slight noise
    noise_factor = 0.005
    noise = np.random.randn(len(audio_array)) * noise_factor
    noisy = audio_array + noise
    augmented_versions.append(noisy)

    return augmented_versions

def create_augmented_dataset(original_dataset, num_augmentations=2):
    """Create augmented dataset"""
    augmented_data = []

    for item in original_dataset:
        # Original item
        augmented_data.append(item)

        # Create augmented versions
        audio_array = item["audio"]["array"]
        for i in range(num_augmentations):
            augmented_audio = augment_audio(audio_array)[1 + i]  # Skip original

            augmented_item = {
                "audio": {
                    "array": augmented_audio,
                    "sampling_rate": item["audio"]["sampling_rate"]
                },
                "text": item["text"]  # Same text
            }
            augmented_data.append(augmented_item)

    return augmented_data

# Create augmented dataset
print("Creating augmented dataset...")
augmented_train_data = create_augmented_dataset(train_dataset, num_augmentations=2)
augmented_train_dataset = Dataset.from_list(augmented_train_data)

# Apply preprocessing
augmented_train_dataset = augmented_train_dataset.map(
    prepare_dataset, 
    remove_columns=augmented_train_dataset.column_names
)

print(f"Original training size: {len(train_dataset)}")
print(f"Augmented training size: {len(augmented_train_dataset)}")

Creating augmented dataset...


Map:   0%|          | 0/636 [00:00<?, ? examples/s]

Original training size: 212
Augmented training size: 636


In [8]:
train_dataset = augmented_train_dataset
# Apply preprocessing to both splits
# train_dataset = train_dataset.map(prepare_dataset, remove_columns=dataset.column_names, num_proc=1)
eval_dataset = eval_dataset.map(prepare_dataset, remove_columns=dataset.column_names, num_proc=1)

Map:   0%|          | 0/53 [00:00<?, ? examples/s]

## Load the Model

In [22]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base")

In [23]:
model.generation_config.task = "transcribe"
model.generation_config.language = None 
model.generation_config.forced_decoder_ids = None

In [11]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
                # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [24]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

In [25]:
# Freeze encoder layers (keep audio understanding, only train text generation)
def freeze_encoder(model):
    """Freeze the encoder to prevent overfitting"""
    for param in model.model.encoder.parameters():
        param.requires_grad = False

    print("Encoder frozen - only decoder will be trained")

    # Count trainable parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print(f"Total parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    print(f"Frozen parameters: {total_params - trainable_params:,}")

# Apply freezing
freeze_encoder(model)

Encoder frozen - only decoder will be trained
Total parameters: 72,593,920
Trainable parameters: 52,003,328
Frozen parameters: 20,590,592


In [26]:
import evaluate

metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer =  metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


In [27]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="../training_outputs/whisper-tiny-creolese-finetuned",
    # max_grad_norm=1.0,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=1,
    learning_rate=1e-4, 
    num_train_epochs=15,
    gradient_checkpointing=True,
    fp16=torch.cuda.is_available(),
    do_eval= True,
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=50,
    logging_steps=50,
    report_to=None,  
    push_to_hub=False,
    eval_steps=50,          
    eval_strategy="steps",
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
)


In [28]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,  # or add eval split if available
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    processing_class=processor.feature_extractor
)


In [29]:
# trainer.train(resume_from_checkpoint=True)
trainer.train()


Step,Training Loss,Validation Loss,Wer
50,2.8694,1.920511,0.968944
100,1.8704,1.690991,0.830678
150,1.6327,1.6184,0.763975
200,1.2496,1.619106,0.806643
250,0.9926,1.618094,0.719957
300,0.9033,1.571307,0.715636
350,0.6431,1.599145,0.665676
400,0.3972,1.659835,0.712935
450,0.3743,1.6971,0.747502
500,0.3712,1.708322,0.68512


There were missing keys in the checkpoint model loaded: ['proj_out.weight'].


TrainOutput(global_step=4770, training_loss=0.1443366120846657, metrics={'train_runtime': 23641.986, 'train_samples_per_second': 0.404, 'train_steps_per_second': 0.202, 'total_flos': 6.187643633664e+17, 'train_loss': 0.1443366120846657, 'epoch': 15.0})

In [87]:
import gradio as gr
from transformers import pipeline
pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    feature_extractor=feature_extractor,
    tokenizer=tokenizer
)

def transcribe(audio):
    audio_data = audio
    text = pipe(audio_data)["text"]  
    return text

input_audio = gr.Audio(
    sources=["upload"],
    waveform_options=gr.WaveformOptions(
        waveform_color="#01C6FF",
        waveform_progress_color="#0066B4",
        skip_length=2,
        show_controls=False,
    ),
)
demo = gr.Interface(
    fn=transcribe, 
    inputs=gr.Audio(sources="upload", type="filepath"), 
    outputs="text",
    title="Whisper Base Creolese",
    description="Demo for Creolese speech recognition using a fine-tuned Whisper base model.",
)

demo.launch()

Device set to use cpu


* Running on local URL:  http://127.0.0.1:7862
* To create a public link, set `share=True` in `launch()`.






In [32]:
# trainer.push_to_hub("kchase9/whisper-creolese-tiny")

model.push_to_hub(
        "kchase9/whisper-tiny-creolese-finetuned",
        create_pr=False,
        private=False
)

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/kchase9/whisper-tiny-creolese-finetuned/commit/3ab26b3aeff9db261927c669204e40b00fb2b39a', commit_message='Upload WhisperForConditionalGeneration', commit_description='', oid='3ab26b3aeff9db261927c669204e40b00fb2b39a', pr_url=None, repo_url=RepoUrl('https://huggingface.co/kchase9/whisper-tiny-creolese-finetuned', endpoint='https://huggingface.co', repo_type='model', repo_id='kchase9/whisper-tiny-creolese-finetuned'), pr_revision=None, pr_num=None)

In [71]:
# Post train, get sample dataset
audio_path = "../creolese-audio-dataset/finetune_eligible"
transcription_path = "../creolese-audio-dataset/finetune_eligible/transcripts.json"

# Load transcripts JSON
with open(transcription_path, 'r') as f:
    transcripts = json.load(f)

# Create a list of dicts pairing audio files and transcripts
data = []
for item in transcripts:
    audio_file = os.path.join(audio_path, item['audio'])
    if os.path.exists(audio_file):
        # print(f"Found file: {audio_file}")
        data.append({'audio': audio_file, 'text': item['text']})
    else:
        print(f"Missing file: {audio_file}")

dataset = Dataset.from_list(data)

# Cast the audio column to automatically load audio
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
final_eval_dataset = split_dataset["test"]


# Model Evaluation - Calculate WER, MER, and CER
import evaluate
import torch
from tqdm.auto import tqdm
import pandas as pd
import json

# Load evaluation metrics
wer_metric = evaluate.load("wer")
cer_metric = evaluate.load("cer")

def calculate_mer(predictions, references):
    """Calculate Match Error Rate (MER)"""
    total_matches = 0
    total_words = 0
    
    for pred, ref in zip(predictions, references):
        pred_words = pred.split()
        ref_words = ref.split()
        
        # Count exact matches
        matches = sum(1 for p, r in zip(pred_words, ref_words) if p == r)
        total_matches += matches
        total_words += max(len(pred_words), len(ref_words))
    
    mer = 1 - (total_matches / total_words) if total_words > 0 else 0
    return mer

def evaluate_model_fast(model, processor, eval_dataset, device="cpu"):
    """Evaluate the model using eval dataset directly"""
    model.eval()
    predictions = []
    references = []
    
    print(f"Evaluating on {len(eval_dataset)} samples...")
    pipe = pipeline(
        "automatic-speech-recognition",
        model=model,
        feature_extractor=processor.feature_extractor,
        tokenizer=processor.tokenizer,
        device=0 if device == "cuda" else -1
    )
    
    # Process eval dataset
    for i in range(len(eval_dataset)):
            
        try:
            # Get the audio data from eval_dataset
            sample = eval_dataset[i]

            audio_file_path = sample['audio']['path']
            reference = sample['text']

            # Use pipeline on the audio file
            result = pipe(audio_file_path, return_timestamps=False)
            hypothesis = result["text"]
            
            predictions.append(hypothesis)
            references.append(reference)
            
        except Exception as e:
            print(f"Error processing sample {i}: {e}")
            continue
    
    print(f"\nSuccessfully processed {len(predictions)} samples")
    print("Calculating metrics...")
    
    # Calculate metrics
    wer = wer_metric.compute(predictions=predictions, references=references)
    cer = cer_metric.compute(predictions=predictions, references=references)
    mer = calculate_mer(predictions, references)
    
    return {
        "wer": wer,
        "cer": cer, 
        "mer": mer,
        "predictions": predictions,
        "references": references
    }

# Run evaluation
print("Starting evaluation...")
results = evaluate_model_fast(model, processor, final_eval_dataset, device=device)

Dataset({
    features: ['audio', 'text'],
    num_rows: 53
})


Device set to use cpu


Starting evaluation...
Evaluating on 53 samples...





Successfully processed 53 samples
Calculating metrics...


In [89]:
print(results['wer'])
print(results['cer'])
print(results['mer'])

0.639751552795031
0.31517910366878016
0.8789697424356089
