In [None]:
!pip install numpy==1.24.4
!pip install nemo_toolkit --no-cache-dir
!pip install torch torchaudio
!pip install pyannote.metrics datasets webdataset tqdm braceexpand hydra-core omegaconf lightning lhotse jiwer pyannote.core
!pip install einops sentencepiece
!pip install editdistance

In [None]:
# Mount Google Drive (remove if running locally)
# from google.colab import drive
# drive.mount('/content/drive')

# Set paths
# BASE_PATH = "/content/drive/MyDrive/creole_asr_project"

# Data paths
# AUDIO_DIR = f"{BASE_PATH}/data/audio"
# FINETUNE_DIR = f"{BASE_PATH}/data/finetune_eligible"
# TRANSCRIPTS_DIR = f"{BASE_PATH}/data/transcripts"
# MANIFESTS_DIR = f"{BASE_PATH}/data/manifests"

# Model paths
# PRETRAINED_MODEL_DIR = f"{BASE_PATH}/models/pretrained"
# CHECKPOINT_DIR = f"{BASE_PATH}/models/checkpoints"
# FINAL_MODEL_DIR = f"{BASE_PATH}/models/final"

# Create directories
# !mkdir -p "{AUDIO_DIR}" "{TRANSCRIPTS_DIR}" "{MANIFESTS_DIR}"
# !mkdir -p "{PRETRAINED_MODEL_DIR}" "{CHECKPOINT_DIR}" "{FINAL_MODEL_DIR}"

In [None]:
# Set paths (local version)
# Uncomment this stuff if running on actual hardware or cloud system other than Google Colab
BASE_PATH = "creolese-audio-dataset"  # Base folder containing the dataset. Change to whatever you want

Data paths (local version)
AUDIO_DIR = f"{BASE_PATH}/Audio Files"
FINETUNE_DIR = f"{AUDIO_DIR}/finetune_eligible"
TRANSCRIPTS_DIR = BASE_PATH  # Transcripts are at base level
MANIFESTS_DIR = f"{BASE_PATH}/manifests"

# Model paths (local version)
PRETRAINED_MODEL_DIR = f"{BASE_PATH}/models/pretrained"
CHECKPOINT_DIR = f"{BASE_PATH}/models/checkpoints"
FINAL_MODEL_DIR = f"{BASE_PATH}/models/final"

# Create directories
import os
os.makedirs(MANIFESTS_DIR, exist_ok=True)
os.makedirs(PRETRAINED_MODEL_DIR, exist_ok=True)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
os.makedirs(FINAL_MODEL_DIR, exist_ok=True)

In [None]:
import re
import json
import librosa
import os
import random
from pathlib import Path
from nemo.collections.asr.models import EncDecHybridRNNTCTCBPEModel
from omegaconf import OmegaConf, open_dict
from lightning.pytorch import Trainer
import lightning.pytorch as pl
from lightning.pytorch.callbacks import ModelCheckpoint, Callback
from lightning.pytorch.loggers import TensorBoardLogger
import torch

def normalize_audio_filename(filename):
    """Normalize filenames for consistent matching...ex 4a.wav, 4b.wav etc.
    Haven't tested to see if this works
    """
    base, ext = os.path.splitext(filename)
    if ext.lower() != '.wav':
        return None, None
    return filename, re.sub(r'[^a-zA-Z0-9]', '', base).lower()

def create_manifests_from_finetune(
    audio_dir: str,
    finetune_dir: str,
    output_train_path: str,
    output_val_path: str,
    val_split: float = 0.2,
    min_duration: float = 1.0,
    max_duration: float = 40.0,
    sample_rate: int = 16000,
    default_language: str = "en"
) -> None:
    """Create train/val manifests with robust audio-transcript matching."""
    random.seed(42)

    # Load transcripts
    try:
        with open(Path(finetune_dir)/"transcripts.json", 'r') as f:
            transcript_entries = json.load(f)
    except FileNotFoundError:
        print(f"Error: transcripts.json not found in {finetune_dir}")
        return
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from {Path(finetune_dir)/'transcripts.json'}")
        return

    # Create mappings
    audio_files = {}
    try:
        for f in os.listdir(audio_dir):
            if f.lower().endswith('.wav'):
                original, normalized = normalize_audio_filename(f)
                if normalized:
                    audio_files[normalized] = original
    except FileNotFoundError:
        print(f"Error: Audio directory not found at {audio_dir}.")
        return

    transcript_map = {}
    for entry in transcript_entries:
        original_name = entry.get('audio', '')
        if original_name:
            _, normalized = normalize_audio_filename(original_name)
            if normalized:
                entry['language'] = entry.get('language', "en")
                entry['lang'] = entry.get('lang', "en")
                entry['text'] = entry.get('text', "")
                transcript_map[normalized] = entry

    # Match audio-transcript pairs
    matched_entries = []
    for norm_name, audio_file in audio_files.items():
        if norm_name in transcript_map:
            entry = transcript_map[norm_name]
            audio_path = os.path.join(audio_dir, audio_file)

            if not os.path.exists(audio_path):
                print(f"Skipping {audio_file}: Audio file not found.")
                continue

            try:
                duration = librosa.get_duration(filename=audio_path)
                if min_duration <= duration <= max_duration:
                    matched_entries.append({
                        'audio_filepath': audio_path,
                        'text': entry['text'],
                        'duration': duration,
                        'language': entry['language'],
                        'lang': entry['lang'],
                        'sample_rate': sample_rate
                    })
                else:
                    print(f"Skipping {audio_file}: Duration outside range")
            except Exception as e:
                print(f"Skipping {audio_file}: Error getting duration - {str(e)}")

    if not matched_entries:
        print("No valid audio-transcript pairs found.")
        return

    # Split and write manifests
    random.shuffle(matched_entries)
    split_idx = int(len(matched_entries) * (1 - val_split))

    def _write_manifest(entries, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, 'w') as f:
            for entry in entries:
                json.dump(entry, f)
                f.write('\n')

    _write_manifest(matched_entries[:split_idx], output_train_path)
    _write_manifest(matched_entries[split_idx:], output_val_path)

    print(f"Created manifests: {len(matched_entries[:split_idx])} training, {len(matched_entries[split_idx:])} validation samples")

In [None]:
class PatchedModel(EncDecHybridRNNTCTCBPEModel):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.validation_step_outputs = []  # Store validation outputs here

    def validation_step(self, batch, batch_idx):
        """Override validation step to capture outputs and handle WER safely"""
        output = super().validation_step(batch, batch_idx)

        # Calculate WER immediately and store safe value
        if hasattr(self, '_wer'):
            try:
                wer = self._wer.compute()
                if wer is not None:
                    # Ensure WER is never negative
                    safe_wer = max(float(wer), 0.0) if isinstance(wer, (int, float)) else wer.clamp(min=0)
                    output['val_wer'] = safe_wer
                    self.log("val_wer", safe_wer, prog_bar=True, on_step=False, on_epoch=True)
            except Exception as e:
                print(f"Error comupting WER: {str(e)}")
                output['val_wer'] = torch.tensor(0.0)  # Default safe value

        self.validation_step_outputs.append(output)
        return output

    def on_validation_epoch_end(self):
        """Clean up stored outputs"""
        self.validation_step_outputs = []

        # Call parent's validation logic if needed
        if hasattr(super(), 'on_validation_epoch_end'):
            super().on_validation_epoch_end()


def configure_and_train_model():
    """Configure and train the ASR model with proper settings"""
    # Create directories if they don't exist
    os.makedirs(MANIFESTS_DIR, exist_ok=True)
    os.makedirs(CHECKPOINT_DIR, exist_ok=True)
    os.makedirs(FINAL_MODEL_DIR, exist_ok=True)

    # Create manifests
    train_manifest_path = f"{MANIFESTS_DIR}/train_manifest.json"
    val_manifest_path = f"{MANIFESTS_DIR}/val_manifest.json"

    create_manifests_from_finetune(
        audio_dir=FINETUNE_DIR,
        finetune_dir=FINETUNE_DIR,
        output_train_path=train_manifest_path,
        output_val_path=val_manifest_path,
        val_split=0.2,
        min_duration=1.0,
        max_duration=40.0,
        sample_rate=16000
    )

    # Check manifests
    if not os.path.exists(train_manifest_path) or not os.path.exists(val_manifest_path):
        print("Manifest files were not created.")
        return
    if os.path.getsize(train_manifest_path) == 0 or os.path.getsize(val_manifest_path) == 0:
        print("Manifest files are empty.")
        return

    # Initialize trainer
    accelerator = 'gpu' if torch.cuda.is_available() else 'cpu'
    devices = 1

    trainer = Trainer(
        accelerator=accelerator,
        devices=devices,
        max_epochs=10,
        enable_checkpointing=True,
        logger=TensorBoardLogger(save_dir="logs", name="creole_finetune", log_model=False),
        callbacks=[
            ModelCheckpoint(
                dirpath=CHECKPOINT_DIR,
                save_top_k=1,
                monitor="val_wer",
                mode="min",
                filename='best_model-{epoch}-{val_wer:.2f}',
                save_last=True
            )
        ],
        enable_progress_bar=True,
        check_val_every_n_epoch=1,
        num_sanity_val_steps=2
    )

    # Load model with our patched version
    model = PatchedModel.from_pretrained(
        "stt_multilingual_fastconformer_hybrid_large_pc",
        trainer=trainer
    )

    # Configure model
    with open_dict(model.cfg):
        model.cfg.train_ds = {
            'manifest_filepath': train_manifest_path,
            'sample_rate': 16000,
            'batch_size': 4,
            'shuffle': True,
            'num_workers': 2,
            'pin_memory': True,
            'min_duration': 1.0,
            'max_duration': 40.0,
            'normalize_transcripts': True,
            'trim_silence': True
        }

        model.cfg.validation_ds = {
            'manifest_filepath': val_manifest_path,
            'sample_rate': 16000,
            'batch_size': 4,
            'shuffle': False,
            'num_workers': 2,
            'pin_memory': True,
            'normalize_transcripts': True,
            'trim_silence': True,
            'return_sample_id': False
        }

        model.cfg.optim = {
            'lr': 0.0001,
            'sched': {'name': 'CosineAnnealing', 'warmup_steps': 1000}
        }
        model.cfg.language = "en"

    # Setup training
    model.setup_training_data(train_data_config=model.cfg.train_ds)
    model.setup_validation_data(val_data_config=model.cfg.validation_ds)

    # Train with error handling
    try:
        trainer.fit(model)
        model.save_to(f"{FINAL_MODEL_DIR}/creole_english_finetuned.nemo")
        print("Training complete and model saved!")
    except Exception as e:
        print(f"Training failed: {e}")
        try:
            model.save_to(f"{FINAL_MODEL_DIR}/partial_model.nemo")
            print("Saved partial model")
        except Exception as save_error:
            print(f"Failed to save partial model: {save_error}")
        raise

configure_and_train_model()

In [None]:
# Optional: Resume from best checkpoint
checkpoints = !ls "{CHECKPOINT_DIR}" | grep .ckpt
if checkpoints:
    best_ckpt = f"{CHECKPOINT_DIR}/{checkpoints[0]}"
    trainer.fit(model, ckpt_path=best_ckpt)