# Real-Time Multilingual ASR with Whisper

This notebook implements a production-ready, real-time speech recognition system using OpenAI's Whisper models.

**Features:**
- Data preparation with augmentation
- Model fine-tuning with multiple variants
- Comprehensive evaluation (WER, CER, latency)
- Real-time streaming inference
- Full MLOps best practices (versioning, logging, reproducibility)

**Author:** COMP3057 Project  
**Version:** 1.0.0

## 1. Setup & Installation

### ⚙️ Colab Configuration Guide

**Resource Constraints:** A100 GPU (~40GB), 220GB Disk

**Quick Start Options:**

| Profile | Model | Samples | Epochs | Time | Disk | Quality |
|---------|-------|---------|--------|------|------|---------|
| **Fast Demo** | tiny | 50/10 | 1 | ~5min | ~2GB | Basic |
| **Balanced** | base | 200/40 | 2 | ~20min | ~5GB | Good |
| **Best Quality** | small | 500/100 | 3 | ~60min | ~10GB | Better |

**Adjustable Parameters (in cells below):**
- `TRAIN_SAMPLES` / `VAL_SAMPLES` - Dataset size
- `MODEL_VARIANT` - Model quality (tiny/base/small)
- `TRAIN_EPOCHS` - Training duration

**Tips:**
- Start with **Fast Demo** to verify everything works
- Increase resources gradually if you have time
- Checkpoints auto-save to Google Drive

In [None]:
# Setup environment and clone project
import os
import sys

# Check if running in Colab
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("🔧 Running in Google Colab")
    
    # Mount Google Drive for saving checkpoints and logs
    from google.colab import drive
    if not os.path.exists('/content/drive'):
        print("📁 Mounting Google Drive...")
        drive.mount('/content/drive', force_remount=False)
        print("✓ Drive mounted at /content/drive")
    else:
        print("✓ Drive already mounted")
    
    # --- Project Setup and Path Configuration ---
    PROJECT_DIR = 'COMP3057_Project'
    
    # Clone repository if not exists
    if not os.path.exists(PROJECT_DIR):
        print("\n📦 Cloning repository from GitHub...")
        !git clone https://github.com/jimmy00415/COMP3057_Project.git
        print("✓ Repository cloned")
    else:
        print("✓ Repository already exists")
    
    # Change to project directory
    os.chdir(PROJECT_DIR)
    print(f"✓ Working directory: {os.getcwd()}")

    # Add project root to Python path
    # This is the crucial step to solve ModuleNotFoundError
    if os.getcwd() not in sys.path:
        sys.path.insert(0, os.getcwd())
        print(f"✓ Added '{os.getcwd()}' to Python path")
    
    # Install dependencies IMMEDIATELY after cloning
    print("\n📦 Installing dependencies...")
    !pip install -q -r requirements.txt
    !pip install -q sounddevice
    print("✓ Dependencies installed")

else:
    print("💻 Running locally")
    print(f"Working directory: {os.getcwd()}")

# Check GPU availability
import torch
gpu_available = torch.cuda.is_available()
if gpu_available:
    gpu_name = torch.cuda.get_device_name(0)
    gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f"\n🎮 GPU: {gpu_name}")
    print(f"   Memory: {gpu_memory:.1f} GB")
    print(f"   CUDA Version: {torch.version.cuda}")
else:
    print("\n⚠️  WARNING: No GPU detected! Training will be very slow.")
    print("   Enable GPU: Runtime → Change runtime type → GPU (T4)")

In [None]:
# Colab Resource Optimization & Monitoring
import shutil
import sys

# Re-check if in Colab (in case this cell is run independently)
IN_COLAB = 'google.colab' in sys.modules

def check_disk_space():
    """Check available disk space."""
    total, used, free = shutil.disk_usage("/")
    print(f"💾 Disk Space:")
    print(f"   Total: {total // (2**30)} GB")
    print(f"   Used: {used // (2**30)} GB")
    print(f"   Free: {free // (2**30)} GB")
    return free // (2**30)

def check_gpu_memory():
    """Check GPU memory usage."""
    import torch
    if torch.cuda.is_available():
        allocated = torch.cuda.memory_allocated(0) / 1e9
        reserved = torch.cuda.memory_reserved(0) / 1e9
        total = torch.cuda.get_device_properties(0).total_memory / 1e9
        print(f"\n🎮 GPU Memory:")
        print(f"   Allocated: {allocated:.2f} GB")
        print(f"   Reserved: {reserved:.2f} GB")
        print(f"   Total: {total:.2f} GB")
        print(f"   Free: {total - reserved:.2f} GB")

def cleanup_cache():
    """Clear unnecessary cache to free disk space."""
    import torch
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    import gc
    gc.collect()
    print("✓ Cache cleared")

if IN_COLAB:
    # Setup Google Drive paths for checkpoints
    DRIVE_ROOT = '/content/drive/MyDrive/COMP3057_ASR'
    import os
    os.makedirs(DRIVE_ROOT, exist_ok=True)
    os.makedirs(f'{DRIVE_ROOT}/checkpoints', exist_ok=True)
    os.makedirs(f'{DRIVE_ROOT}/logs', exist_ok=True)
    print(f"\n📁 Google Drive storage: {DRIVE_ROOT}")
    
    # Create symlinks to save to Drive instead of local disk
    if os.path.exists('checkpoints') and not os.path.islink('checkpoints'):
        shutil.rmtree('checkpoints')
    if not os.path.exists('checkpoints'):
        os.symlink(f'{DRIVE_ROOT}/checkpoints', 'checkpoints')
        print("✓ Checkpoints will be saved to Google Drive")
    
    if os.path.exists('logs') and not os.path.islink('logs'):
        shutil.rmtree('logs')
    if not os.path.exists('logs'):
        os.symlink(f'{DRIVE_ROOT}/logs', 'logs')
        print("✓ Logs will be saved to Google Drive")
    
    # Check initial resources
    print("\n📊 Initial Resource Check:")
    free_disk = check_disk_space()
    check_gpu_memory()
    
    if free_disk < 50:
        print("\n⚠️  WARNING: Low disk space! Consider:")
        print("   1. Use smaller dataset (already optimized)")
        print("   2. Use 'tiny' or 'base' model variant")
        print("   3. Reduce save_steps to save fewer checkpoints")
else:
    # Define cleanup_cache for local use
    import torch
    def cleanup_cache():
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        import gc
        gc.collect()

In [None]:
# Import project modules
import torch
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio, display

# Import project modules
from src.utils import (
    load_config,
    set_seed,
    setup_logging,
    get_device,
    ExperimentLogger,
    DataVersionManager,
    ModelRegistry
)

from src.data import (
    AudioPreprocessor,
    VoiceActivityDetector,
    AudioAugmenter,
    WhisperDataset,
    prepare_datasets,
    create_dataloaders
)

from src.models import (
    WhisperModelManager,
    compare_models
)

from src.training import WhisperTrainer

from src.evaluation import (
    ModelEvaluator,
    LatencyBenchmark,
    TrainingVisualizer,
    EvaluationVisualizer,
    generate_comparison_table
)

from src.inference import (
    StreamingASR,
    BatchInference
)

print("✓ All modules imported successfully")

## 2. Configuration & Reproducibility Setup

In [None]:
# Load configuration
config = load_config('config.yaml')

# Set random seeds for reproducibility
set_seed(config['project']['seed'])

# Setup logging
logger = setup_logging('INFO', 'logs/training.log')

# Get device
device = get_device(config['project']['device'])
logger.info(f"Using device: {device}")

# Initialize experiment tracking (choose: wandb, mlflow, or tensorboard)
experiment_logger = ExperimentLogger(
    backend=config['mlops']['experiment_tracking']['backend'],
    project_name=config['mlops']['experiment_tracking']['project_name'],
    config=config
)

# Initialize versioning
data_version_manager = DataVersionManager(
    config['mlops']['versioning']['data_version_file']
)
model_registry = ModelRegistry(
    config['mlops']['versioning']['model_registry']
)

print(f"✓ Configuration loaded")
print(f"  - Seed: {config['project']['seed']}")
print(f"  - Device: {device}")
print(f"  - Tracking: {config['mlops']['experiment_tracking']['backend']}")

## 3. Data Preparation

In [None]:
# Initialize preprocessing utilities
audio_preprocessor = AudioPreprocessor(
    target_sr=config['data']['sampling_rate'],
    normalize=True
)

vad = VoiceActivityDetector(
    threshold=config['data']['vad_threshold']
)

augmenter = AudioAugmenter(
    speed_perturbation=config['data']['augmentation']['speed_perturbation'],
    pitch_shift_semitones=config['data']['augmentation']['pitch_shift_semitones'],
    background_noise_prob=config['data']['augmentation']['background_noise_prob']
) if config['data']['augmentation']['enabled'] else None

print("✓ Preprocessing utilities initialized")

In [None]:
# Load datasets
# Using small subsets optimized for Colab's disk/memory constraints
# For production with more resources, increase the sample counts

from datasets import load_dataset

# Configuration for Colab
TRAIN_SAMPLES = 50  # Small for quick demo, increase to 500-1000 for better results
VAL_SAMPLES = 10    # Small for quick demo, increase to 100-200 for better results

print(f"Loading dataset with {TRAIN_SAMPLES} train + {VAL_SAMPLES} val samples...")
print("(Optimized for Colab - increase samples for production)")

try:
    # Try minds14 first - smaller and faster to download
    print("\nTrying minds14 dataset (lightweight, ~50MB)...")
    dataset = load_dataset(
        "PolyAI/minds14",
        "en-US",
        split=f"train[:{TRAIN_SAMPLES + VAL_SAMPLES}]"
    )
    
    # Split into train/val
    split_point = TRAIN_SAMPLES
    train_dataset = dataset.select(range(split_point))
    val_dataset = dataset.select(range(split_point, TRAIN_SAMPLES + VAL_SAMPLES))
    
    dataset_name = "minds14_en"
    print(f"✓ minds14 loaded successfully")
    
except Exception as e:
    print(f"minds14 failed: {e}")
    print("\nTrying LibriSpeech (larger, ~300MB)...")
    
    # Fallback to LibriSpeech
    dataset = load_dataset(
        "openslr/librispeech_asr",
        "clean",
        split=f"test.clean[:{TRAIN_SAMPLES + VAL_SAMPLES}]"
    )
    
    # Split into train/val
    split_point = TRAIN_SAMPLES
    train_dataset = dataset.select(range(split_point))
    val_dataset = dataset.select(range(split_point, TRAIN_SAMPLES + VAL_SAMPLES))
    
    dataset_name = "librispeech_clean"
    print(f"✓ LibriSpeech loaded")

print(f"\n✓ Dataset loaded: {dataset_name}")
print(f"  - Training samples: {len(train_dataset)}")
print(f"  - Validation samples: {len(val_dataset)}")

# Inspect dataset structure
print(f"\n📊 Dataset structure:")
print(f"  - Columns: {train_dataset.column_names}")
if len(train_dataset) > 0:
    print(f"  - Sample keys: {list(train_dataset[0].keys())}")
    # Show audio info
    audio_info = train_dataset[0]['audio']
    if isinstance(audio_info, dict):
        print(f"  - Audio sampling rate: {audio_info.get('sampling_rate', 'N/A')} Hz")

# Log dataset version
data_version_manager.log_dataset_version(
    dataset_name=dataset_name,
    version=f"colab_demo_{TRAIN_SAMPLES}train_{VAL_SAMPLES}val",
    metadata={'train': len(train_dataset), 'val': len(val_dataset)}
)

# Check disk space after loading
if IN_COLAB:
    print("\n📊 Disk space after dataset load:")
    check_disk_space()

## 4. Model Initialization

In [None]:
# Initialize model manager
model_manager = WhisperModelManager(config)

# Choose model variant based on Colab resources
# Recommendations for A100 (40GB):
# - tiny: 39M params, ~500MB, fastest (RECOMMENDED for demo)
# - base: 74M params, ~1GB, good balance
# - small: 244M params, ~2GB, better accuracy
# - medium: 769M params, ~6GB, best accuracy (may be slow)

MODEL_VARIANT = 'tiny'  # Change to 'base' or 'small' if you have time/resources

print(f"🤖 Loading Whisper model: {MODEL_VARIANT}")
print(f"   (Optimized for Colab - use tiny/base for best experience)")

# Load model and processor
model, processor = model_manager.initialize_model(
    variant=MODEL_VARIANT,
    device=str(device)
)

# Get model info
model_info = model_manager.get_model_info()
print(f"\n✓ Model initialized: {MODEL_VARIANT}")
print(f"  - Total parameters: {model_info['total_parameters']:,}")
print(f"  - Trainable parameters: {model_info['trainable_parameters']:,}")

# Check GPU memory after model load
if IN_COLAB:
    check_gpu_memory()

In [None]:
# Compare model variants
variants_info = compare_models(config)

print("\nWhisper Model Variants Comparison:")
print("-" * 60)
for variant, info in variants_info.items():
    print(f"{variant:10s} | Params: {info['params']:8s} | Speed: {info['speed']:10s} | Accuracy: {info['accuracy']}")

## 5. Prepare Data Loaders

In [None]:
# Create PyTorch datasets
# Auto-detect column names from the dataset
audio_column = "audio"  # Standard across most datasets
# Text column varies: "text", "sentence", "transcription", etc.
text_column = None
for col in ["text", "sentence", "transcription", "transcript"]:
    if col in train_dataset.column_names:
        text_column = col
        break

if text_column is None:
    raise ValueError(f"Could not find text column. Available columns: {train_dataset.column_names}")

print(f"Using columns: audio='{audio_column}', text='{text_column}'")

train_dataset_wrapper = WhisperDataset(
    train_dataset,
    processor,
    audio_column=audio_column,
    text_column=text_column,
    max_audio_length_sec=config['data']['audio_max_length_sec'],
    augmenter=augmenter  # Apply augmentation only to training
)

val_dataset_wrapper = WhisperDataset(
    val_dataset,
    processor,
    audio_column=audio_column,
    text_column=text_column,
    max_audio_length_sec=config['data']['audio_max_length_sec'],
    augmenter=None  # No augmentation for validation
)

# Create data loaders
from src.data import DataCollatorWithPadding
from torch.utils.data import DataLoader

collator = DataCollatorWithPadding(processor)

train_loader = DataLoader(
    train_dataset_wrapper,
    batch_size=config['training']['batch_size'],
    shuffle=True,
    collate_fn=collator,
    num_workers=0
)

val_loader = DataLoader(
    val_dataset_wrapper,
    batch_size=config['training']['batch_size'],
    shuffle=False,
    collate_fn=collator,
    num_workers=0
)

print(f"\n✓ Data loaders created")
print(f"  - Training batches: {len(train_loader)}")
print(f"  - Validation batches: {len(val_loader)}")

## 6. Model Fine-Tuning

In [None]:
# Prepare model for training
model = model_manager.prepare_for_training()

# Initialize trainer
trainer = WhisperTrainer(
    model=model,
    processor=processor,
    train_loader=train_loader,
    val_loader=val_loader,
    config=config,
    device=str(device),
    experiment_logger=experiment_logger
)

print("✓ Trainer initialized")

In [None]:
# Train model
# Optimized for Colab: fewer epochs, memory-efficient settings

TRAIN_EPOCHS = 1  # Set to 1 for quick demo, 2-3 for Colab session, 5+ for production

print(f"🚀 Starting training for {TRAIN_EPOCHS} epoch(s)...")
print(f"   Batch size: {config['training']['batch_size']}")
print(f"   Gradient accumulation: {config['training']['gradient_accumulation_steps']}")
print(f"   Effective batch size: {config['training']['batch_size'] * config['training']['gradient_accumulation_steps']}")
print(f"   Mixed precision (FP16): {config['training']['fp16']}")

# Check resources before training
if IN_COLAB:
    print("\n📊 Pre-training resources:")
    check_disk_space()
    check_gpu_memory()

print("\n" + "="*80)

# Training
best_val_loss = trainer.train(num_epochs=TRAIN_EPOCHS)

print("="*80)
print(f"\n✓ Training completed!")
print(f"  - Best validation loss: {best_val_loss:.4f}")

# Cleanup after training
if IN_COLAB:
    print("\n🧹 Cleaning up...")
    cleanup_cache()
    print("\n📊 Post-training resources:")
    check_disk_space()
    check_gpu_memory()

In [None]:
# Register trained model
from src.utils import get_git_revision

model_id = model_registry.register_model(
    model_id=f"{MODEL_VARIANT}_finetuned_{TRAIN_EPOCHS}ep",
    model_path="checkpoints/best_model_hf",
    metrics={'val_loss': best_val_loss},
    config=config,
    git_revision=get_git_revision(),
    dataset_version="common_voice_11_0_en_subset"
)

print(f"✓ Model registered: {model_id}")

## 7. Model Evaluation

In [None]:
# Initialize evaluator
evaluator = ModelEvaluator(
    model=model,
    processor=processor,
    device=str(device)
)

# Evaluate on validation set
print("Evaluating model...\n")
results = evaluator.evaluate_with_samples(val_loader, num_samples=5)

print(f"\n✓ Evaluation Results:")
print(f"  - WER: {results['metrics']['wer']:.3f}")
print(f"  - CER: {results['metrics']['cer']:.3f}")

# Show sample predictions
print("\n📝 Sample Predictions:")
print("-" * 80)
for i, sample in enumerate(results['samples'][:3], 1):
    print(f"\nSample {i}:")
    print(f"  Reference:  {sample['reference']}")
    print(f"  Prediction: {sample['prediction']}")

In [None]:
# Benchmark latency
print("Benchmarking inference latency...\n")

latency_bench = LatencyBenchmark(
    model=model,
    processor=processor,
    device=str(device)
)

# Generate test audio clips
test_audios = []
for i in range(10):  # Test on 10 samples
    sample = val_dataset[i]
    audio = torch.tensor(sample['audio']['array'])
    test_audios.append(audio)

latency_results = latency_bench.benchmark_batch(test_audios, sr=16000)

print(f"✓ Latency Benchmark Results:")
print(f"  - Mean latency: {latency_results['mean_latency']:.3f}s")
print(f"  - Std latency: {latency_results['std_latency']:.3f}s")
print(f"  - Mean RTF: {latency_results['mean_rtf']:.3f}x")
print(f"\n  RTF < 1.0 = Real-time capable ✓" if latency_results['mean_rtf'] < 1.0 else "  RTF >= 1.0 = Not real-time")

## 8. Real-Time Streaming Inference

In [None]:
# Initialize streaming ASR
streaming_asr = StreamingASR(
    model=model,
    processor=processor,
    vad=vad,
    chunk_length_sec=config['inference']['streaming']['buffer_size_sec'],
    overlap_sec=config['inference']['streaming']['overlap_sec'],
    device=str(device)
)

print("✓ Streaming ASR initialized")

In [None]:
# Test streaming on audio file
# Use a sample from validation set

# Get a test audio file
test_sample = val_dataset[0]
test_audio = test_sample['audio']['array']
test_text = test_sample[text_column]  # Use the detected text column

# Save to temporary file
import torchaudio
import tempfile

with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
    tmp_path = tmp.name
    torchaudio.save(tmp_path, torch.tensor(test_audio).unsqueeze(0), 16000)

print(f"Test audio saved to: {tmp_path}")
print(f"Reference text: {test_text}\n")

# Stream from file
print("Streaming transcription:")
print("-" * 80)

transcriptions = []
def callback(text):
    print(f"[CHUNK] {text}")
    transcriptions.append(text)

streaming_asr.reset()
streaming_asr.stream_from_file(tmp_path, chunk_duration_sec=0.5, callback=callback)

# Get full transcription
full_transcription = streaming_asr.get_full_transcription(merge=True)

print("\n" + "-" * 80)
print(f"\n📝 Final Transcription: {full_transcription}")
print(f"📖 Reference:          {test_text}")

# Cleanup
os.unlink(tmp_path)

In [None]:
# Live microphone streaming (requires microphone access)
# Uncomment to use:

# print("Starting live microphone transcription...")
# print("Speak into your microphone. Press Ctrl+C to stop.\n")

# streaming_asr.reset()

# def mic_callback(text):
#     print(f"🎤 {text}")

# streaming_asr.stream_from_microphone(
#     duration_sec=30,  # Record for 30 seconds
#     callback=mic_callback
# )

# full_transcription = streaming_asr.get_full_transcription(merge=True)
# print(f"\n📝 Full Transcription: {full_transcription}")

## 9. Batch Inference

In [None]:
# Batch inference for multiple files
batch_inference = BatchInference(
    model=model,
    processor=processor,
    device=str(device),
    batch_size=config['inference']['batch_size']
)

# Create temporary test files
import tempfile
import torchaudio

test_files = []
for i in range(5):
    sample = val_dataset[i]
    audio = torch.tensor(sample['audio']['array'])
    
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp:
        torchaudio.save(tmp.name, audio.unsqueeze(0), 16000)
        test_files.append(tmp.name)

# Batch transcribe
print("Batch transcribing 5 files...\n")
batch_transcriptions = batch_inference.transcribe_batch(test_files)

# Display results
for i, transcription in enumerate(batch_transcriptions):
    reference = val_dataset[i][text_column]  # Use the detected text column
    print(f"File {i+1}:")
    print(f"  Prediction: {transcription}")
    print(f"  Reference:  {reference}")
    print()

# Cleanup
for f in test_files:
    os.unlink(f)

print("✓ Batch inference completed")

## 10. Visualization & Analysis

In [None]:
# Visualize evaluation results
eval_viz = EvaluationVisualizer(save_dir='plots')

# Model comparison data (example)
comparison_data = {
    'whisper-tiny': {
        'params': '39M',
        'wer_clean': 0.15,
        'wer_accented': 0.20,
        'latency': 0.1,
        'rtf': 0.1
    },
    'whisper-base': {
        'params': '74M',
        'wer_clean': results['metrics']['wer'],
        'wer_accented': 0.14,
        'latency': latency_results['mean_latency'],
        'rtf': latency_results['mean_rtf']
    },
    'whisper-small': {
        'params': '244M',
        'wer_clean': 0.08,
        'wer_accented': 0.12,
        'latency': 0.5,
        'rtf': 0.5
    }
}

# Plot comparison
comparison_path = eval_viz.plot_model_comparison(comparison_data)
print(f"✓ Model comparison plot saved: {comparison_path}")

# Display
from IPython.display import Image
display(Image(comparison_path))

In [None]:
# Generate comparison table
table = generate_comparison_table(comparison_data)
print("\nModel Comparison Table:")
print(table)

## 11. Export & Deployment

In [None]:
# Save final model for deployment
FINAL_MODEL_PATH = 'final_model'

model.save_pretrained(FINAL_MODEL_PATH)
processor.save_pretrained(FINAL_MODEL_PATH)

print(f"✓ Final model saved to: {FINAL_MODEL_PATH}")
print(f"\nTo load model later:")
print(f"  from transformers import WhisperForConditionalGeneration, WhisperProcessor")
print(f"  model = WhisperForConditionalGeneration.from_pretrained('{FINAL_MODEL_PATH}')")
print(f"  processor = WhisperProcessor.from_pretrained('{FINAL_MODEL_PATH}')")

In [None]:
# Optional: Upload to HuggingFace Hub
# Requires HuggingFace token and authentication

# from huggingface_hub import notebook_login
# notebook_login()

# HF_MODEL_NAME = "your-username/whisper-base-finetuned-en"
# model.push_to_hub(HF_MODEL_NAME)
# processor.push_to_hub(HF_MODEL_NAME)
# print(f"✓ Model uploaded to HuggingFace: {HF_MODEL_NAME}")

## 12. Cleanup & Summary

In [None]:
# Finish experiment tracking
experiment_logger.finish()

# Summary
print("\n" + "="*80)
print("📊 PROJECT SUMMARY")
print("="*80)

print(f"\n🤖 Model Configuration:")
print(f"  - Variant: {MODEL_VARIANT}")
print(f"  - Training epochs: {TRAIN_EPOCHS}")
print(f"  - Dataset: {dataset_name}")
print(f"  - Training samples: {len(train_dataset)}")
print(f"  - Validation samples: {len(val_dataset)}")

print(f"\n📈 Training Results:")
print(f"  - Best validation loss: {best_val_loss:.4f}")
print(f"  - WER: {results['metrics']['wer']:.3f}")
print(f"  - CER: {results['metrics']['cer']:.3f}")

print(f"\n⚡ Performance Metrics:")
print(f"  - Mean latency: {latency_results['mean_latency']:.3f}s")
print(f"  - RTF: {latency_results['mean_rtf']:.3f}x")
print(f"  - Real-time capable: {'✓ Yes' if latency_results['mean_rtf'] < 1.0 else '✗ No'}")

print(f"\n💾 Saved Artifacts:")
if IN_COLAB:
    print(f"  - Model: {FINAL_MODEL_PATH}")
    print(f"  - Checkpoints: {DRIVE_ROOT}/checkpoints/")
    print(f"  - Logs: {DRIVE_ROOT}/logs/")
    print(f"  - Plots: plots/")
else:
    print(f"  - Model: {FINAL_MODEL_PATH}")
    print(f"  - Checkpoints: checkpoints/")
    print(f"  - Logs: logs/")
    print(f"  - Plots: plots/")

# Final resource check
if IN_COLAB:
    print("\n📊 Final Resource Usage:")
    check_disk_space()
    check_gpu_memory()
    
    print("\n💡 Tips for Colab:")
    print("  - Checkpoints saved to Google Drive persist across sessions")
    print("  - Increase TRAIN_SAMPLES/VAL_SAMPLES for better accuracy")
    print("  - Try 'base' or 'small' model for better quality")
    print("  - Use Runtime → Factory reset runtime to free all resources")

print("\n" + "="*80)
print("🎉 Real-Time ASR System Ready for Deployment!")
print("="*80)