In [1]:
print("=" * 70)
print("INSTALLING DEPENDENCIES")
print("=" * 70)

get_ipython().system('pip install -q librosa>=0.10.0 soundfile scipy numpy')
get_ipython().system('pip install -q openai-whisper')
get_ipython().system('pip install -q transformers sentencepiece protobuf')
get_ipython().system('pip install -q coqui-tts 2>&1 | grep -v "WARNING" || pip install -q git+https://github.com/suno-ai/bark.git')
get_ipython().system('pip install -q matplotlib seaborn')

print("\n Dependencies installed!\n")

INSTALLING DEPENDENCIES
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m36.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 3.7/3.7 MB 5.4 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 85.3/85.3 kB 8.2 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 42.0/42.0 kB 3.0 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 101.6/101.6 kB 10.1 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 15.3/15.3 MB 3.0 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 18.1/18.1 MB 6.0 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 31.4/31.4 MB 3.2 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import librosa
import soundfile as sf
import torch
import os
from pathlib import Path
from typing import Dict, Tuple, Optional, List
import json
from datetime import datetime

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Audio, display, HTML, clear_output

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 8)

print("=" * 70)
print("LIBRARIES LOADED")
print("=" * 70)
print(f"PyTorch: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
print("=" * 70 + "\n")

LIBRARIES LOADED
PyTorch: 2.8.0+cu126
CUDA Available: True
GPU: Tesla T4



In [3]:
# SETUP: Initialize Global Variables

# Global storage for pipeline data
PIPELINE_DATA = {
    'input_audio': None,
    'detected_language': None,
    'target_language': None,
    'transcribed_text': None,
    'translated_text': None,
    'synthesized_audio': None,
    'preserved_audio': None,
    'original_features': None,
    'synthesized_features': None,
    'preserved_features': None,
    'metrics': {},
    'processing_times': {}
}

# Supported languages
SUPPORTED_LANGUAGES = {
    'en': {'name': 'English', 'model': None},
    'es': {'name': 'Spanish', 'model': 'Helsinki-NLP/opus-mt-en-es'},
    'fr': {'name': 'French', 'model': 'Helsinki-NLP/opus-mt-en-fr'},
    'de': {'name': 'German', 'model': 'Helsinki-NLP/opus-mt-en-de'},
    'it': {'name': 'Italian', 'model': 'Helsinki-NLP/opus-mt-en-it'},
    'pt': {'name': 'Portuguese', 'model': 'Helsinki-NLP/opus-mt-en-pt'},
    'ru': {'name': 'Russian', 'model': 'Helsinki-NLP/opus-mt-en-ru'},
    'ja': {'name': 'Japanese', 'model': 'Helsinki-NLP/opus-mt-en-jap'},
    'zh': {'name': 'Chinese', 'model': 'Helsinki-NLP/opus-mt-en-zh'},
    'ar': {'name': 'Arabic', 'model': 'Helsinki-NLP/opus-mt-en-ar'},
    'hi': {'name': 'Hindi', 'model': 'Helsinki-NLP/opus-mt-en-hi'},
    'ko': {'name': 'Korean', 'model': 'Helsinki-NLP/opus-mt-en-ko'},
    'nl': {'name': 'Dutch', 'model': 'Helsinki-NLP/opus-mt-en-nl'},
}

print(" Setup complete! Ready to start pipeline.\n")

 Setup complete! Ready to start pipeline.



In [4]:
# STEP 1: FILE UPLOAD

def step1_upload_file():
    """Step 1: Upload audio file"""
    print("\n" + "=" * 70)
    print("STEP 1: FILE UPLOAD")
    print("=" * 70)

    try:
        from google.colab import files
        IN_COLAB = True
    except:
        IN_COLAB = False

    if IN_COLAB:
        print("\n Please select your audio file...")
        print("   Supported formats: WAV, MP3, M4A, FLAC")
        print("   Recommended: Clear speech, 5-30 seconds\n")

        uploaded = files.upload()

        if not uploaded:
            print("No file uploaded!")
            return None

        audio_file = list(uploaded.keys())[0]
    else:
        audio_file = input("Enter path to audio file: ")

    if not os.path.exists(audio_file):
        print(f"File not found: {audio_file}")
        return None

    # Get file info
    file_size_mb = os.path.getsize(audio_file) / (1024 * 1024)

    # Load and analyze
    y, sr = librosa.load(audio_file, sr=None)
    duration = len(y) / sr

    # Store in pipeline
    PIPELINE_DATA['input_audio'] = audio_file

    print(f"\n File uploaded successfully!")
    print(f"\n File Information:")
    print(f"   • Filename: {audio_file}")
    print(f"   • File size: {file_size_mb:.2f} MB")
    print(f"   • Duration: {duration:.2f} seconds")
    print(f"   • Sample rate: {sr} Hz")
    print(f"   • Channels: {'Mono' if len(y.shape) == 1 else 'Stereo'}")

    # Play audio
    print(f"\n Listen to uploaded audio:")
    display(Audio(audio_file))

    print("\n" + "=" * 70)
    print(" STEP 1 COMPLETE")
    print("=" * 70)
    print("\n Next: Run step2_detect_language()")

    return audio_file

In [5]:
# STEP 2: LANGUAGE DETECTION

def step2_detect_language():
    """Step 2: Detect language and transcribe"""
    print("\n" + "=" * 70)
    print("STEP 2: LANGUAGE DETECTION & TRANSCRIPTION")
    print("=" * 70)

    if PIPELINE_DATA['input_audio'] is None:
        print("Error: No audio file uploaded!")
        print("Please run step1_upload_file() first")
        return None

    audio_file = PIPELINE_DATA['input_audio']

    # Load Whisper model
    print("\n Loading Whisper model...")
    import whisper
    import time

    start_time = time.time()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = whisper.load_model("base", device=device)
    load_time = time.time() - start_time

    print(f" Whisper loaded in {load_time:.2f} seconds")

    # Transcribe
    print("\n Transcribing audio...")
    start_time = time.time()
    result = model.transcribe(audio_file, verbose=False)
    transcribe_time = time.time() - start_time

    detected_lang = result['language']
    transcribed_text = result['text']

    # Store in pipeline
    PIPELINE_DATA['detected_language'] = detected_lang
    PIPELINE_DATA['transcribed_text'] = transcribed_text
    PIPELINE_DATA['processing_times']['transcription'] = transcribe_time

    print(f"Transcription complete in {transcribe_time:.2f} seconds")

    print(f"\n Results:")
    print(f"   • Detected Language: {detected_lang.upper()}")
    print(f"   • Language Name: {SUPPORTED_LANGUAGES.get(detected_lang, {}).get('name', 'Unknown')}")
    print(f"\n   • Transcribed Text:")
    print(f"     \"{transcribed_text}\"")

    print("\n" + "=" * 70)
    print(" STEP 2 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step3_select_target_language('es')")

    return detected_lang, transcribed_text

In [6]:
# STEP 3: TARGET LANGUAGE SELECTION

def step3_select_target_language(target_lang: str = None):
    """Step 3: Select target language for dubbing"""
    print("\n" + "=" * 70)
    print("STEP 3: TARGET LANGUAGE SELECTION")
    print("=" * 70)

    if PIPELINE_DATA['transcribed_text'] is None:
        print("Error: No transcription available!")
        print("Please run step2_detect_language() first")
        return None

    # Show available languages
    print("\n Available Target Languages:")
    print("-" * 70)
    for code, info in SUPPORTED_LANGUAGES.items():
        if info['model']:
            status = "✓" if code != PIPELINE_DATA['detected_language'] else "⊗"
            print(f"   {status} {code:5s} → {info['name']}")
    print("-" * 70)

    if target_lang is None:
        print("\n Error: Please specify target language!")
        print("   Example: step3_select_target_language('es')")
        return None

    if target_lang not in SUPPORTED_LANGUAGES:
        print(f"\n Error: Language '{target_lang}' not supported!")
        return None

    # Store selection
    PIPELINE_DATA['target_language'] = target_lang

    print(f"\n Target Language Selected:")
    print(f"   • Code: {target_lang}")
    print(f"   • Name: {SUPPORTED_LANGUAGES[target_lang]['name']}")

    print("\n" + "=" * 70)
    print(" STEP 3 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step4_translate_text()")

    return target_lang

In [7]:
# STEP 4: TEXT TRANSLATION

"""def step4_translate_text():
    Step 4: Translate text to target language
    print("\n" + "=" * 70)
    print("STEP 4: TEXT TRANSLATION")
    print("=" * 70)

    if PIPELINE_DATA['target_language'] is None:
        print("Error: No target language selected!")
        return None

    source_text = PIPELINE_DATA['transcribed_text']
    target_lang = PIPELINE_DATA['target_language']

    print(f"\n Translation Setup:")
    print(f"   • Target: {target_lang} ({SUPPORTED_LANGUAGES[target_lang]['name']})")
    print(f"   • Text length: {len(source_text)} characters")

    # Load translation model
    print(f"\n Loading translation model...")
    from transformers import MarianMTModel, MarianTokenizer
    import time

    model_name = SUPPORTED_LANGUAGES[target_lang]['model']

    start_time = time.time()
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    load_time = time.time() - start_time
    print(f" Model loaded in {load_time:.2f} seconds")

    # Translate
    print(f"\n Translating...")
    start_time = time.time()

    inputs = tokenizer(source_text, return_tensors="pt", padding=True, max_length=512, truncation=True)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        translated = model.generate(**inputs)

    translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)
    translate_time = time.time() - start_time

    # Store in pipeline
    PIPELINE_DATA['translated_text'] = translated_text
    PIPELINE_DATA['processing_times']['translation'] = translate_time

    print(f"Translation complete in {translate_time:.2f} seconds")

    print(f"\n Translation Results:")
    print(f"\n   Original: \"{source_text}\"")
    print(f"\n   Translated: \"{translated_text}\"")

    print("\n" + "=" * 70)
    print(" STEP 4 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step5_synthesize_speech()")

    return translated_text"""

from transformers import MarianMTModel, MarianTokenizer
import torch
import time

def step4_translate_text():
    """Step 4: Translate text to target language (robust to missing model)."""
    print("\n" + "=" * 70)
    print("STEP 4: TEXT TRANSLATION")
    print("=" * 70)

    if PIPELINE_DATA['target_language'] is None:
        print(" Error: No target language selected!")
        return None

    source_text = PIPELINE_DATA['transcribed_text']
    target_lang = PIPELINE_DATA['target_language']
    detected_lang = PIPELINE_DATA.get('detected_language', None)

    # If target equals detected (or model is None), skip translation
    model_name = SUPPORTED_LANGUAGES.get(target_lang, {}).get('model', None)
    if (detected_lang is not None and target_lang == detected_lang) or model_name is None:
        print(" Translation skipped because target language equals detected language or no model configured.")
        PIPELINE_DATA['translated_text'] = source_text
        PIPELINE_DATA['processing_times']['translation'] = 0.0
        print(f"   • Translated text set equal to source (length {len(source_text)} chars).")
        print("\n" + "=" * 70)
        print(" STEP 4 COMPLETE (skipped translation)")
        print("=" * 70)
        print("\n  Next: Run step5_synthesize_speech()")
        return source_text

    # Otherwise load model and translate
    print(f"\n Translation Setup:   Target: {target_lang} ({SUPPORTED_LANGUAGES[target_lang]['name']})")
    print(f"   Text length: {len(source_text)} characters")
    print(f"\n Loading translation model {model_name} ...")

    start_time = time.time()
    try:
        tokenizer = MarianTokenizer.from_pretrained(model_name)
        model = MarianMTModel.from_pretrained(model_name)
        device = "cuda" if torch.cuda.is_available() else "cpu"
        model.to(device)
    except Exception as e:
        print(f" Error loading translation model: {e}")
        print("Possible causes: invalid model name, network issues, or private Hugging Face repo.")
        print("If model is private, run `huggingface-cli login` or provide an access token.")
        return None

    load_time = time.time() - start_time
    print(f" Model loaded in {load_time:.2f} seconds")

    # Translate
    print(f"\n Translating...")
    start_time = time.time()
    try:
        inputs = tokenizer(source_text, return_tensors="pt", padding=True, max_length=512, truncation=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            translated = model.generate(**inputs)
        translated_text = tokenizer.decode(translated[0], skip_special_tokens=True)
    except Exception as e:
        print(f" Error during translation: {e}")
        return None

    translate_time = time.time() - start_time
    PIPELINE_DATA['translated_text'] = translated_text
    PIPELINE_DATA['processing_times']['translation'] = translate_time

    print(f" Translation complete in {translate_time:.2f} seconds")
    print(f"\n Translation Results:")
    print(f"\n   Original: \"{source_text}\"")
    print(f"\n   Translated: \"{translated_text}\"")
    print("\n" + "=" * 70)
    print(" STEP 4 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step5_synthesize_speech()")

    return translated_text

# bind into global namespace
globals()['step4_translate_text'] = step4_translate_text
print(" Patched step4_translate_text() with robust version.")

 Patched step4_translate_text() with robust version.


In [8]:
# STEP 5: TEXT-TO-SPEECH SYNTHESIS

def step5_synthesize_speech():
    """Step 5: Generate speech from translated text"""
    print("\n" + "=" * 70)
    print("STEP 5: TEXT-TO-SPEECH SYNTHESIS")
    print("=" * 70)

    if PIPELINE_DATA['translated_text'] is None:
        print("Error: No translated text available!")
        return None

    translated_text = PIPELINE_DATA['translated_text']
    target_lang = PIPELINE_DATA['target_language']
    reference_audio = PIPELINE_DATA['input_audio']

    print(f"\n TTS Setup:")
    print(f"   • Text: \"{translated_text[:50]}...\"")
    print(f"   • Language: {target_lang}")

    # Load TTS model
    print(f"\n Loading TTS model...")
    import time

    start_time = time.time()

    try:
        from TTS.api import TTS
        device = "cuda" if torch.cuda.is_available() else "cpu"
        tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=(device=="cuda"))
        tts_engine = "coqui"
        print(f" Coqui XTTS loaded")
    except:
        from bark import preload_models
        preload_models()
        tts_engine = "bark"
        print(f" Bark TTS loaded")

    load_time = time.time() - start_time
    print(f"   Load time: {load_time:.2f} seconds")

    # Synthesize
    print(f"\n Synthesizing speech...")
    output_file = "synthesized_audio.wav"

    start_time = time.time()

    if tts_engine == "coqui":
        tts.tts_to_file(
            text=translated_text,
            speaker_wav=reference_audio,
            language=target_lang,
            file_path=output_file
        )
    else:
        from bark import generate_audio, SAMPLE_RATE
        audio_array = generate_audio(translated_text)
        sf.write(output_file, audio_array, SAMPLE_RATE)

    synthesis_time = time.time() - start_time

    # Store in pipeline
    PIPELINE_DATA['synthesized_audio'] = output_file
    PIPELINE_DATA['processing_times']['synthesis'] = synthesis_time

    print(f" Synthesis complete in {synthesis_time:.2f} seconds")

    # Play audio
    print(f"\ Listen to synthesized audio:")
    display(Audio(output_file))

    print("\n" + "=" * 70)
    print(" STEP 5 COMPLETE")
    print("=" * 70)
    print("\  Next: Run step6_acoustic_analysis()")

    return output_file

In [9]:
# HELPER FUNCTIONS : Extract Acoustic Features

def extract_detailed_features(audio_path: str, label: str = "") -> Dict:
    """Extract comprehensive acoustic features"""
    y, sr = librosa.load(audio_path, sr=22050)

    # Pitch
    f0, voiced_flag, voiced_probs = librosa.pyin(y, fmin=50, fmax=500, sr=sr)

    # Spectral
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
    spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
    spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]

    # Energy
    rms = librosa.feature.rms(y=y)[0]

    # Temporal
    zcr = librosa.feature.zero_crossing_rate(y)[0]
    tempo, _ = librosa.beat.beat_track(y=y, sr=sr)

    # MFCC
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=20)

    return {
        'audio': y,
        'sample_rate': sr,
        'duration': len(y) / sr,
        'f0': f0,
        'f0_mean': float(np.nanmean(f0)),
        'f0_std': float(np.nanstd(f0)),
        'f0_min': float(np.nanmin(f0)),
        'f0_max': float(np.nanmax(f0)),
        'spectral_centroid': spectral_centroid,
        'spectral_centroid_mean': float(np.mean(spectral_centroid)),
        'spectral_centroid_std': float(np.std(spectral_centroid)),
        'spectral_rolloff': spectral_rolloff,
        'spectral_rolloff_mean': float(np.mean(spectral_rolloff)),
        'spectral_bandwidth': spectral_bandwidth,
        'spectral_bandwidth_mean': float(np.mean(spectral_bandwidth)),
        'energy': rms,
        'energy_mean': float(np.mean(rms)),
        'energy_std': float(np.std(rms)),
        'dynamic_range': float(np.max(rms) / (np.min(rms) + 1e-8)),
        'zcr': zcr,
        'zcr_mean': float(np.mean(zcr)),
        'tempo': float(tempo),
        'mfcc': mfcc,
        'mfcc_mean': mfcc.mean(axis=1)
    }

def compare_acoustic_features(features1: Dict, features2: Dict) -> Dict:
    """Compare acoustic features and calculate similarity scores"""
    comparison = {}

    # Pitch
    if not np.isnan(features1['f0_mean']) and not np.isnan(features2['f0_mean']):
        diff = abs(features1['f0_mean'] - features2['f0_mean'])
        match = max(0, 100 * (1 - diff / features1['f0_mean']))
        comparison['Pitch (F0) Mean'] = {
            'original': features1['f0_mean'],
            'synthesized': features2['f0_mean'],
            'match': match
        }

    # Duration
    diff = abs(features1['duration'] - features2['duration'])
    match = max(0, 100 * (1 - diff / features1['duration']))
    comparison['Duration'] = {
        'original': features1['duration'],
        'synthesized': features2['duration'],
        'match': match
    }

    # Spectral Centroid
    diff = abs(features1['spectral_centroid_mean'] - features2['spectral_centroid_mean'])
    match = max(0, 100 * (1 - diff / features1['spectral_centroid_mean']))
    comparison['Spectral Centroid'] = {
        'original': features1['spectral_centroid_mean'],
        'synthesized': features2['spectral_centroid_mean'],
        'match': match
    }

    # Energy
    diff = abs(features1['energy_mean'] - features2['energy_mean'])
    match = max(0, 100 * (1 - diff / features1['energy_mean']))
    comparison['Energy Level'] = {
        'original': features1['energy_mean'],
        'synthesized': features2['energy_mean'],
        'match': match
    }

    return comparison


In [10]:
# STEP 6: ACOUSTIC ANALYSIS

def step6_acoustic_analysis():
    """Step 6: Analyze acoustic features"""
    print("\n" + "=" * 70)
    print("STEP 6: ACOUSTIC ANALYSIS")
    print("=" * 70)

    if PIPELINE_DATA['synthesized_audio'] is None:
        print(" Error: No synthesized audio available!")
        return None

    original_audio = PIPELINE_DATA['input_audio']
    synthesized_audio = PIPELINE_DATA['synthesized_audio']

    print(f"\n Analyzing acoustic features...")

    print(f"\n Extracting features from original audio...")
    original_features = extract_detailed_features(original_audio, "Original")

    print(f" Extracting features from synthesized audio...")
    synthesized_features = extract_detailed_features(synthesized_audio, "Synthesized")

    # Store in pipeline
    PIPELINE_DATA['original_features'] = original_features
    PIPELINE_DATA['synthesized_features'] = synthesized_features

    # Compare
    comparison = compare_acoustic_features(original_features, synthesized_features)

    print(f"\n Feature Comparison:")
    print("-" * 70)
    print(f"{'Feature':<30} {'Original':>12} {'Synthesized':>12} {'Match':>8}")
    print("-" * 70)

    for feature, values in comparison.items():
        print(f"{feature:<30} {values['original']:>12.2f} {values['synthesized']:>12.2f} {values['match']:>7.1f}%")

    print("-" * 70)

    overall_score = np.mean([v['match'] for v in comparison.values()])
    print(f"\n Overall Acoustic Similarity: {overall_score:.1f}%")

    print("\n" + "=" * 70)
    print(" STEP 6 COMPLETE")
    print("=" * 70)
    print("\  Next: Run step7_acoustic_preservation()")

    return comparison

In [11]:
# STEP 7: ACOUSTIC PRESERVATION


def step7_acoustic_preservation():
    """Step 7: Apply acoustic preservation"""
    print("\n" + "=" * 70)
    print("STEP 7: ACOUSTIC PRESERVATION")
    print("=" * 70)

    if PIPELINE_DATA['original_features'] is None:
        print(" Error: No acoustic analysis available!")
        return None

    synthesized_audio = PIPELINE_DATA['synthesized_audio']
    original_features = PIPELINE_DATA['original_features']

    print(f"\ Applying acoustic preservation...")

    import time
    start_time = time.time()

    # Load synthesized audio
    y, sr = librosa.load(synthesized_audio, sr=original_features['sample_rate'])

    print(f"\n   [1/5] Pitch adjustment...")
    # Pitch transfer
    f0_current, _, _ = librosa.pyin(y, fmin=50, fmax=500, sr=sr)
    f0_current_mean = np.nanmean(f0_current)
    f0_target_mean = original_features['f0_mean']

    if not np.isnan(f0_current_mean) and not np.isnan(f0_target_mean) and f0_current_mean > 0:
        semitones = 12 * np.log2(f0_target_mean / f0_current_mean)
        semitones = np.clip(semitones, -12, 12)
        y = librosa.effects.pitch_shift(y, sr=sr, n_steps=semitones)
        print(f"       ✓ Pitch shifted by {semitones:.2f} semitones")

    print(f"\n   [2/5] Speaking rate adjustment...")
    # Time stretching
    target_duration = original_features['duration']
    current_duration = len(y) / sr
    if current_duration > 0:
        stretch_rate = current_duration / target_duration
        stretch_rate = np.clip(stretch_rate, 0.75, 1.35)
        y = librosa.effects.time_stretch(y, rate=stretch_rate)
        print(f"       ✓ Time stretched by {stretch_rate:.2f}x")

    print(f"\n   [3/5] Energy normalization...")
    # Energy matching
    current_energy = np.sqrt(np.mean(y**2))
    target_energy = original_features['energy_mean']
    if current_energy > 0:
        energy_ratio = target_energy / current_energy
        energy_ratio = np.clip(energy_ratio, 0.5, 2.0)
        y = y * energy_ratio
        print(f"       ✓ Energy adjusted by {energy_ratio:.2f}x")

    print(f"\n   [4/5] Spectral shaping...")
    # Spectral shaping
    target_centroid = original_features['spectral_centroid_mean']
    current_centroid = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))

    if current_centroid > 0:
        S = librosa.stft(y)
        freqs = librosa.fft_frequencies(sr=sr)
        filter_curve = np.exp(-(freqs - target_centroid)**2 / (2 * 2000**2))
        filter_curve = 0.3 + 0.7 * filter_curve
        S_filtered = S * filter_curve[:, np.newaxis]
        y = librosa.istft(S_filtered)
        print(f"       ✓ Spectral shaping applied")

    print(f"\n   [5/5] Final normalization...")
    # Final normalization
    max_val = np.max(np.abs(y))
    if max_val > 0:
        y = y / max_val * 0.95
        print(f"       ✓ Normalized to 95% peak")

    # Save
    output_file = "preserved_audio.wav"
    sf.write(output_file, y, sr)

    preservation_time = time.time() - start_time
    PIPELINE_DATA['preserved_audio'] = output_file
    PIPELINE_DATA['processing_times']['preservation'] = preservation_time

    print(f"\n Acoustic preservation complete in {preservation_time:.2f} seconds")

    # Analyze preserved audio
    print(f"\n Analyzing preserved audio...")
    preserved_features = extract_detailed_features(output_file, "Preserved")
    PIPELINE_DATA['preserved_features'] = preserved_features

    # Play audio
    print(f"\n Listen to preserved audio:")
    display(Audio(output_file))

    print("\n" + "=" * 70)
    print(" STEP 7 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step8_performance_metrics()")

    return output_file

In [12]:
# STEP 8: PERFORMANCE METRICS

def step8_performance_metrics():
    """Step 8: Calculate comprehensive performance metrics"""
    print("\n" + "=" * 70)
    print("STEP 8: PERFORMANCE METRICS & COMPARISON")
    print("=" * 70)

    if PIPELINE_DATA['preserved_features'] is None:
        print(" Error: No preserved audio available!")
        return None

    orig = PIPELINE_DATA['original_features']
    synth = PIPELINE_DATA['synthesized_features']
    pres = PIPELINE_DATA['preserved_features']

    print("\n Calculating performance metrics...")

    # Calculate detailed metrics
    metrics = {
        'pitch_preservation': {},
        'temporal_preservation': {},
        'spectral_preservation': {},
        'energy_preservation': {},
        'overall_scores': {}
    }

    # Pitch Metrics
    print("\n[1/4] Pitch Preservation Metrics...")
    if not np.isnan(orig['f0_mean']):
        synth_pitch_error = abs(synth['f0_mean'] - orig['f0_mean']) / orig['f0_mean'] * 100
        pres_pitch_error = abs(pres['f0_mean'] - orig['f0_mean']) / orig['f0_mean'] * 100

        metrics['pitch_preservation'] = {
            'original_f0': orig['f0_mean'],
            'synthesized_f0': synth['f0_mean'],
            'preserved_f0': pres['f0_mean'],
            'synth_error_%': synth_pitch_error,
            'preserved_error_%': pres_pitch_error,
            'improvement_%': synth_pitch_error - pres_pitch_error,
            'synth_similarity_%': max(0, 100 - synth_pitch_error),
            'preserved_similarity_%': max(0, 100 - pres_pitch_error)
        }

    # Temporal Metrics
    print("[2/4] Temporal Preservation Metrics...")
    synth_duration_error = abs(synth['duration'] - orig['duration']) / orig['duration'] * 100
    pres_duration_error = abs(pres['duration'] - orig['duration']) / orig['duration'] * 100

    synth_tempo_error = abs(synth['tempo'] - orig['tempo']) / orig['tempo'] * 100
    pres_tempo_error = abs(pres['tempo'] - orig['tempo']) / orig['tempo'] * 100

    metrics['temporal_preservation'] = {
        'original_duration': orig['duration'],
        'synthesized_duration': synth['duration'],
        'preserved_duration': pres['duration'],
        'synth_duration_error_%': synth_duration_error,
        'preserved_duration_error_%': pres_duration_error,
        'duration_improvement_%': synth_duration_error - pres_duration_error,
        'original_tempo': orig['tempo'],
        'synthesized_tempo': synth['tempo'],
        'preserved_tempo': pres['tempo'],
        'synth_tempo_error_%': synth_tempo_error,
        'preserved_tempo_error_%': pres_tempo_error,
        'tempo_improvement_%': synth_tempo_error - pres_tempo_error
    }

    # Spectral Metrics
    print("[3/4] Spectral Preservation Metrics...")
    synth_centroid_error = abs(synth['spectral_centroid_mean'] - orig['spectral_centroid_mean']) / orig['spectral_centroid_mean'] * 100
    pres_centroid_error = abs(pres['spectral_centroid_mean'] - orig['spectral_centroid_mean']) / orig['spectral_centroid_mean'] * 100

    synth_rolloff_error = abs(synth['spectral_rolloff_mean'] - orig['spectral_rolloff_mean']) / orig['spectral_rolloff_mean'] * 100
    pres_rolloff_error = abs(pres['spectral_rolloff_mean'] - orig['spectral_rolloff_mean']) / orig['spectral_rolloff_mean'] * 100

    metrics['spectral_preservation'] = {
        'original_centroid': orig['spectral_centroid_mean'],
        'synthesized_centroid': synth['spectral_centroid_mean'],
        'preserved_centroid': pres['spectral_centroid_mean'],
        'synth_centroid_error_%': synth_centroid_error,
        'preserved_centroid_error_%': pres_centroid_error,
        'centroid_improvement_%': synth_centroid_error - pres_centroid_error,
        'original_rolloff': orig['spectral_rolloff_mean'],
        'synthesized_rolloff': synth['spectral_rolloff_mean'],
        'preserved_rolloff': pres['spectral_rolloff_mean'],
        'synth_rolloff_error_%': synth_rolloff_error,
        'preserved_rolloff_error_%': pres_rolloff_error,
        'rolloff_improvement_%': synth_rolloff_error - pres_rolloff_error
    }

    # Energy Metrics
    print("[4/4] Energy Preservation Metrics...")
    synth_energy_error = abs(synth['energy_mean'] - orig['energy_mean']) / orig['energy_mean'] * 100
    pres_energy_error = abs(pres['energy_mean'] - orig['energy_mean']) / orig['energy_mean'] * 100

    synth_dr_error = abs(synth['dynamic_range'] - orig['dynamic_range']) / orig['dynamic_range'] * 100
    pres_dr_error = abs(pres['dynamic_range'] - orig['dynamic_range']) / orig['dynamic_range'] * 100

    metrics['energy_preservation'] = {
        'original_energy': orig['energy_mean'],
        'synthesized_energy': synth['energy_mean'],
        'preserved_energy': pres['energy_mean'],
        'synth_energy_error_%': synth_energy_error,
        'preserved_energy_error_%': pres_energy_error,
        'energy_improvement_%': synth_energy_error - pres_energy_error,
        'original_dynamic_range': orig['dynamic_range'],
        'synthesized_dynamic_range': synth['dynamic_range'],
        'preserved_dynamic_range': pres['dynamic_range'],
        'synth_dr_error_%': synth_dr_error,
        'preserved_dr_error_%': pres_dr_error,
        'dr_improvement_%': synth_dr_error - pres_dr_error
    }

    # Overall Scores
    synth_overall = 100 - np.mean([
        synth_pitch_error, synth_duration_error, synth_centroid_error, synth_energy_error
    ])
    pres_overall = 100 - np.mean([
        pres_pitch_error, pres_duration_error, pres_centroid_error, pres_energy_error
    ])

    metrics['overall_scores'] = {
        'synthesized_similarity_%': max(0, synth_overall),
        'preserved_similarity_%': max(0, pres_overall),
        'overall_improvement_%': pres_overall - synth_overall
    }

    PIPELINE_DATA['metrics'] = metrics

    # Display Results
    print("\n" + "="*70)
    print("COMPREHENSIVE PERFORMANCE METRICS")
    print("="*70)

    print("\n 1. PITCH PRESERVATION")
    print("-"*70)
    print(f"Original F0:           {metrics['pitch_preservation']['original_f0']:.2f} Hz")
    print(f"Synthesized F0:        {metrics['pitch_preservation']['synthesized_f0']:.2f} Hz (Error: {metrics['pitch_preservation']['synth_error_%']:.1f}%)")
    print(f"Preserved F0:          {metrics['pitch_preservation']['preserved_f0']:.2f} Hz (Error: {metrics['pitch_preservation']['preserved_error_%']:.1f}%)")
    print(f"Improvement:           {metrics['pitch_preservation']['improvement_%']:.1f}%")

    print("\n 2. TEMPORAL PRESERVATION")
    print("-"*70)
    print(f"Original Duration:     {metrics['temporal_preservation']['original_duration']:.2f} sec")
    print(f"Synthesized Duration:  {metrics['temporal_preservation']['synthesized_duration']:.2f} sec (Error: {metrics['temporal_preservation']['synth_duration_error_%']:.1f}%)")
    print(f"Preserved Duration:    {metrics['temporal_preservation']['preserved_duration']:.2f} sec (Error: {metrics['temporal_preservation']['preserved_duration_error_%']:.1f}%)")
    print(f"Duration Improvement:  {metrics['temporal_preservation']['duration_improvement_%']:.1f}%")

    print("\n 3. SPECTRAL PRESERVATION (Timbre)")
    print("-"*70)
    print(f"Original Centroid:     {metrics['spectral_preservation']['original_centroid']:.0f} Hz")
    print(f"Synthesized Centroid:  {metrics['spectral_preservation']['synthesized_centroid']:.0f} Hz (Error: {metrics['spectral_preservation']['synth_centroid_error_%']:.1f}%)")
    print(f"Preserved Centroid:    {metrics['spectral_preservation']['preserved_centroid']:.0f} Hz (Error: {metrics['spectral_preservation']['preserved_centroid_error_%']:.1f}%)")
    print(f"Centroid Improvement:  {metrics['spectral_preservation']['centroid_improvement_%']:.1f}%")

    print("\n 4. ENERGY PRESERVATION")
    print("-"*70)
    print(f"Original Energy:       {metrics['energy_preservation']['original_energy']:.4f}")
    print(f"Synthesized Energy:    {metrics['energy_preservation']['synthesized_energy']:.4f} (Error: {metrics['energy_preservation']['synth_energy_error_%']:.1f}%)")
    print(f"Preserved Energy:      {metrics['energy_preservation']['preserved_energy']:.4f} (Error: {metrics['energy_preservation']['preserved_energy_error_%']:.1f}%)")
    print(f"Energy Improvement:    {metrics['energy_preservation']['energy_improvement_%']:.1f}%")

    print("\n" + "="*70)
    print(" OVERALL ACOUSTIC PRESERVATION SCORES")
    print("="*70)
    print(f"Synthesized Audio:     {metrics['overall_scores']['synthesized_similarity_%']:.1f}%")
    print(f"Preserved Audio:       {metrics['overall_scores']['preserved_similarity_%']:.1f}%")
    print(f"Overall Improvement:   {metrics['overall_scores']['overall_improvement_%']:.1f}%")
    print("="*70)

    # Quality Assessment
    pres_score = metrics['overall_scores']['preserved_similarity_%']
    if pres_score >= 85:
        quality = " EXCELLENT - Professional quality acoustic preservation"
    elif pres_score >= 75:
        quality = "✓ GOOD - High quality acoustic preservation"
    elif pres_score >= 65:
        quality = " MODERATE - Acceptable acoustic preservation"
    else:
        quality = " LOW - Consider adjusting preservation parameters"

    print(f"\nQuality Assessment: {quality}\n")

    print("\n" + "=" * 70)
    print(" STEP 8 COMPLETE")
    print("=" * 70)
    print("\n  Next: Run step9_visualization()")

    return metrics

In [13]:
# STEP 9: VISUALIZATION & COMPARISON PLOTS

def step9_visualization():
    """Step 9: Generate comprehensive comparison visualizations"""
    print("\n" + "=" * 70)
    print("STEP 9: VISUALIZATION & COMPARISON PLOTS")
    print("=" * 70)

    if PIPELINE_DATA['metrics'] is None:
        print(" Error: No metrics available!")
        return None

    orig = PIPELINE_DATA['original_features']
    synth = PIPELINE_DATA['synthesized_features']
    pres = PIPELINE_DATA['preserved_features']

    print("\n Generating comparison visualizations...\n")

    # Create comprehensive figure
    fig = plt.figure(figsize=(20, 16))
    gs = fig.add_gridspec(5, 3, hspace=0.3, wspace=0.3)

    # ===== ROW 1: WAVEFORMS =====
    print("[1/9] Waveform comparison...")
    ax1 = fig.add_subplot(gs[0, 0])
    ax2 = fig.add_subplot(gs[0, 1])
    ax3 = fig.add_subplot(gs[0, 2])

    time_orig = np.linspace(0, orig['duration'], len(orig['audio']))
    time_synth = np.linspace(0, synth['duration'], len(synth['audio']))
    time_pres = np.linspace(0, pres['duration'], len(pres['audio']))

    ax1.plot(time_orig, orig['audio'], color='blue', alpha=0.7, linewidth=0.5)
    ax1.set_title('Original Waveform', fontsize=12, fontweight='bold')
    ax1.set_xlabel('Time (s)')
    ax1.set_ylabel('Amplitude')
    ax1.grid(True, alpha=0.3)

    ax2.plot(time_synth, synth['audio'], color='orange', alpha=0.7, linewidth=0.5)
    ax2.set_title('Synthesized Waveform', fontsize=12, fontweight='bold')
    ax2.set_xlabel('Time (s)')
    ax2.set_ylabel('Amplitude')
    ax2.grid(True, alpha=0.3)

    ax3.plot(time_pres, pres['audio'], color='green', alpha=0.7, linewidth=0.5)
    ax3.set_title('Preserved Waveform', fontsize=12, fontweight='bold')
    ax3.set_xlabel('Time (s)')
    ax3.set_ylabel('Amplitude')
    ax3.grid(True, alpha=0.3)

    # ===== ROW 2: SPECTROGRAMS =====
    print("[2/9] Spectrogram comparison...")
    ax4 = fig.add_subplot(gs[1, 0])
    ax5 = fig.add_subplot(gs[1, 1])
    ax6 = fig.add_subplot(gs[1, 2])

    D_orig = librosa.amplitude_to_db(np.abs(librosa.stft(orig['audio'])), ref=np.max)
    D_synth = librosa.amplitude_to_db(np.abs(librosa.stft(synth['audio'])), ref=np.max)
    D_pres = librosa.amplitude_to_db(np.abs(librosa.stft(pres['audio'])), ref=np.max)

    img1 = librosa.display.specshow(D_orig, y_axis='hz', x_axis='time', ax=ax4, cmap='viridis')
    ax4.set_title('Original Spectrogram', fontsize=12, fontweight='bold')
    plt.colorbar(img1, ax=ax4, format='%+2.0f dB')

    img2 = librosa.display.specshow(D_synth, y_axis='hz', x_axis='time', ax=ax5, cmap='viridis')
    ax5.set_title('Synthesized Spectrogram', fontsize=12, fontweight='bold')
    plt.colorbar(img2, ax=ax5, format='%+2.0f dB')

    img3 = librosa.display.specshow(D_pres, y_axis='hz', x_axis='time', ax=ax6, cmap='viridis')
    ax6.set_title('Preserved Spectrogram', fontsize=12, fontweight='bold')
    plt.colorbar(img3, ax=ax6, format='%+2.0f dB')

    # ===== ROW 3: PITCH CONTOURS =====
    print("[3/9] Pitch contour comparison...")
    ax7 = fig.add_subplot(gs[2, 0])
    ax8 = fig.add_subplot(gs[2, 1])
    ax9 = fig.add_subplot(gs[2, 2])

    times_orig = np.linspace(0, orig['duration'], len(orig['f0']))
    times_synth = np.linspace(0, synth['duration'], len(synth['f0']))
    times_pres = np.linspace(0, pres['duration'], len(pres['f0']))

    ax7.plot(times_orig, orig['f0'], color='blue', alpha=0.8, linewidth=2)
    ax7.axhline(orig['f0_mean'], color='red', linestyle='--', label=f"Mean: {orig['f0_mean']:.1f} Hz")
    ax7.set_title('Original Pitch (F0)', fontsize=12, fontweight='bold')
    ax7.set_xlabel('Time (s)')
    ax7.set_ylabel('Frequency (Hz)')
    ax7.set_ylim([50, 500])
    ax7.legend()
    ax7.grid(True, alpha=0.3)

    ax8.plot(times_synth, synth['f0'], color='orange', alpha=0.8, linewidth=2)
    ax8.axhline(synth['f0_mean'], color='red', linestyle='--', label=f"Mean: {synth['f0_mean']:.1f} Hz")
    ax8.set_title('Synthesized Pitch (F0)', fontsize=12, fontweight='bold')
    ax8.set_xlabel('Time (s)')
    ax8.set_ylabel('Frequency (Hz)')
    ax8.set_ylim([50, 500])
    ax8.legend()
    ax8.grid(True, alpha=0.3)

    ax9.plot(times_pres, pres['f0'], color='green', alpha=0.8, linewidth=2)
    ax9.axhline(pres['f0_mean'], color='red', linestyle='--', label=f"Mean: {pres['f0_mean']:.1f} Hz")
    ax9.set_title('Preserved Pitch (F0)', fontsize=12, fontweight='bold')
    ax9.set_xlabel('Time (s)')
    ax9.set_ylabel('Frequency (Hz)')
    ax9.set_ylim([50, 500])
    ax9.legend()
    ax9.grid(True, alpha=0.3)

    # ===== ROW 4: SPECTRAL FEATURES =====
    print("[4/9] Spectral features comparison...")
    ax10 = fig.add_subplot(gs[3, 0])
    ax11 = fig.add_subplot(gs[3, 1])

    # Spectral Centroid
    times_orig_sc = np.linspace(0, orig['duration'], len(orig['spectral_centroid']))
    times_synth_sc = np.linspace(0, synth['duration'], len(synth['spectral_centroid']))
    times_pres_sc = np.linspace(0, pres['duration'], len(pres['spectral_centroid']))

    ax10.plot(times_orig_sc, orig['spectral_centroid'], label='Original', color='blue', alpha=0.7, linewidth=2)
    ax10.plot(times_synth_sc, synth['spectral_centroid'], label='Synthesized', color='orange', alpha=0.7, linewidth=2)
    ax10.plot(times_pres_sc, pres['spectral_centroid'], label='Preserved', color='green', alpha=0.7, linewidth=2)
    ax10.set_title('Spectral Centroid (Timbre Brightness)', fontsize=12, fontweight='bold')
    ax10.set_xlabel('Time (s)')
    ax10.set_ylabel('Frequency (Hz)')
    ax10.legend()
    ax10.grid(True, alpha=0.3)

    # Spectral Rolloff
    times_orig_sr = np.linspace(0, orig['duration'], len(orig['spectral_rolloff']))
    times_synth_sr = np.linspace(0, synth['duration'], len(synth['spectral_rolloff']))
    times_pres_sr = np.linspace(0, pres['duration'], len(pres['spectral_rolloff']))

    ax11.plot(times_orig_sr, orig['spectral_rolloff'], label='Original', color='blue', alpha=0.7, linewidth=2)
    ax11.plot(times_synth_sr, synth['spectral_rolloff'], label='Synthesized', color='orange', alpha=0.7, linewidth=2)
    ax11.plot(times_pres_sr, pres['spectral_rolloff'], label='Preserved', color='green', alpha=0.7, linewidth=2)
    ax11.set_title('Spectral Rolloff', fontsize=12, fontweight='bold')
    ax11.set_xlabel('Time (s)')
    ax11.set_ylabel('Frequency (Hz)')
    ax11.legend()
    ax11.grid(True, alpha=0.3)

    # Energy Envelope
    ax12 = fig.add_subplot(gs[3, 2])
    times_orig_e = np.linspace(0, orig['duration'], len(orig['energy']))
    times_synth_e = np.linspace(0, synth['duration'], len(synth['energy']))
    times_pres_e = np.linspace(0, pres['duration'], len(pres['energy']))

    ax12.plot(times_orig_e, orig['energy'], label='Original', color='blue', alpha=0.7, linewidth=2)
    ax12.plot(times_synth_e, synth['energy'], label='Synthesized', color='orange', alpha=0.7, linewidth=2)
    ax12.plot(times_pres_e, pres['energy'], label='Preserved', color='green', alpha=0.7, linewidth=2)
    ax12.set_title('Energy Envelope', fontsize=12, fontweight='bold')
    ax12.set_xlabel('Time (s)')
    ax12.set_ylabel('RMS Energy')
    ax12.legend()
    ax12.grid(True, alpha=0.3)

    # ===== ROW 5: METRICS COMPARISON =====
    print("[5/9] Metrics bar charts...")
    ax13 = fig.add_subplot(gs[4, :])

    metrics = PIPELINE_DATA['metrics']

    categories = ['Pitch\nPreservation', 'Duration\nMatching', 'Spectral\nSimilarity', 'Energy\nMatching']

    synth_scores = [
        metrics['pitch_preservation']['synth_similarity_%'],
        100 - metrics['temporal_preservation']['synth_duration_error_%'],
        100 - metrics['spectral_preservation']['synth_centroid_error_%'],
        100 - metrics['energy_preservation']['synth_energy_error_%']
    ]

    pres_scores = [
        metrics['pitch_preservation']['preserved_similarity_%'],
        100 - metrics['temporal_preservation']['preserved_duration_error_%'],
        100 - metrics['spectral_preservation']['preserved_centroid_error_%'],
        100 - metrics['energy_preservation']['preserved_energy_error_%']
    ]

    x = np.arange(len(categories))
    width = 0.35

    bars1 = ax13.bar(x - width/2, synth_scores, width, label='Synthesized', color='orange', alpha=0.8)
    bars2 = ax13.bar(x + width/2, pres_scores, width, label='Preserved', color='green', alpha=0.8)

    ax13.set_ylabel('Similarity Score (%)', fontsize=12, fontweight='bold')
    ax13.set_title('Acoustic Preservation Performance Comparison', fontsize=14, fontweight='bold')
    ax13.set_xticks(x)
    ax13.set_xticklabels(categories)
    ax13.legend(fontsize=11)
    ax13.grid(True, alpha=0.3, axis='y')
    ax13.set_ylim([0, 105])

    # Add value labels on bars
    for bars in [bars1, bars2]:
        for bar in bars:
            height = bar.get_height()
            ax13.text(bar.get_x() + bar.get_width()/2., height,
                     f'{height:.1f}%', ha='center', va='bottom', fontsize=9)

    # Overall title
    fig.suptitle('Comprehensive Acoustic Analysis: Original vs Synthesized vs Preserved',
                 fontsize=16, fontweight='bold', y=0.995)

    plt.tight_layout()

    print("[6/9] Saving visualization...")
    plt.savefig('acoustic_comparison.png', dpi=300, bbox_inches='tight')
    print("       ✓ Saved as 'acoustic_comparison.png'")

    plt.show()

    # ===== ADDITIONAL PLOT: Overall Similarity Radar Chart =====
    print("\n[7/9] Generating radar chart...")

    fig2, ax = plt.subplots(figsize=(10, 10), subplot_kw=dict(projection='polar'))

    categories_radar = ['Pitch\nF0', 'Duration', 'Spectral\nCentroid', 'Energy\nLevel',
                        'Dynamic\nRange', 'Tempo']

    synth_radar = [
        metrics['pitch_preservation']['synth_similarity_%'],
        100 - metrics['temporal_preservation']['synth_duration_error_%'],
        100 - metrics['spectral_preservation']['synth_centroid_error_%'],
        100 - metrics['energy_preservation']['synth_energy_error_%'],
        100 - metrics['energy_preservation']['synth_dr_error_%'],
        100 - metrics['temporal_preservation']['synth_tempo_error_%']
    ]

    pres_radar = [
        metrics['pitch_preservation']['preserved_similarity_%'],
        100 - metrics['temporal_preservation']['preserved_duration_error_%'],
        100 - metrics['spectral_preservation']['preserved_centroid_error_%'],
        100 - metrics['energy_preservation']['preserved_energy_error_%'],
        100 - metrics['energy_preservation']['preserved_dr_error_%'],
        100 - metrics['temporal_preservation']['preserved_tempo_error_%']
    ]

    angles = np.linspace(0, 2 * np.pi, len(categories_radar), endpoint=False).tolist()
    synth_radar += synth_radar[:1]
    pres_radar += pres_radar[:1]
    angles += angles[:1]

    ax.plot(angles, synth_radar, 'o-', linewidth=2, label='Synthesized', color='orange')
    ax.fill(angles, synth_radar, alpha=0.25, color='orange')

    ax.plot(angles, pres_radar, 'o-', linewidth=2, label='Preserved', color='green')
    ax.fill(angles, pres_radar, alpha=0.25, color='green')

    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(categories_radar, size=11)
    ax.set_ylim(0, 100)
    ax.set_yticks([20, 40, 60, 80, 100])
    ax.set_yticklabels(['20%', '40%', '60%', '80%', '100%'])
    ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1), fontsize=12)
    ax.set_title('Acoustic Feature Preservation Radar Chart', size=14, fontweight='bold', pad=20)
    ax.grid(True)

    plt.tight_layout()
    plt.savefig('radar_comparison.png', dpi=300, bbox_inches='tight')
    print("       ✓ Saved as 'radar_comparison.png'")
    plt.show()

    # ===== ADDITIONAL PLOT: Processing Time Analysis =====
    print("\n[8/9] Generating processing time chart...")

    times = PIPELINE_DATA['processing_times']

    fig3, ax = plt.subplots(figsize=(10, 6))

    stages = list(times.keys())
    durations = list(times.values())
    colors_stages = ['#3498db', '#e74c3c', '#2ecc71', '#f39c12']

    bars = ax.barh(stages, durations, color=colors_stages, alpha=0.8)

    # Add value labels
    for i, (bar, duration) in enumerate(zip(bars, durations)):
        ax.text(duration + 0.1, i, f'{duration:.2f}s', va='center', fontweight='bold')

    ax.set_xlabel('Time (seconds)', fontsize=12, fontweight='bold')
    ax.set_title('Processing Time Breakdown', fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3, axis='x')

    total_time = sum(durations)
    ax.text(0.95, 0.95, f'Total: {total_time:.2f}s', transform=ax.transAxes,
            fontsize=12, fontweight='bold', ha='right', va='top',
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

    plt.tight_layout()
    plt.savefig('processing_time.png', dpi=300, bbox_inches='tight')
    print("       ✓ Saved as 'processing_time.png'")
    plt.show()

    # ===== SUMMARY TABLE =====
    print("\n[9/9] Generating summary table...")

    fig4, ax = plt.subplots(figsize=(12, 8))
    ax.axis('tight')
    ax.axis('off')

    table_data = [
        ['Metric', 'Original', 'Synthesized', 'Preserved', 'Improvement'],
        ['─' * 20, '─' * 12, '─' * 12, '─' * 12, '─' * 12],
        ['Pitch (F0) Hz', f"{orig['f0_mean']:.1f}",
         f"{synth['f0_mean']:.1f}", f"{pres['f0_mean']:.1f}",
         f"+{metrics['pitch_preservation']['improvement_%']:.1f}%"],
        ['Duration (sec)', f"{orig['duration']:.2f}",
         f"{synth['duration']:.2f}", f"{pres['duration']:.2f}",
         f"+{metrics['temporal_preservation']['duration_improvement_%']:.1f}%"],
        ['Spectral Centroid (Hz)', f"{orig['spectral_centroid_mean']:.0f}",
         f"{synth['spectral_centroid_mean']:.0f}", f"{pres['spectral_centroid_mean']:.0f}",
         f"+{metrics['spectral_preservation']['centroid_improvement_%']:.1f}%"],
        ['Energy Level', f"{orig['energy_mean']:.4f}",
         f"{synth['energy_mean']:.4f}", f"{pres['energy_mean']:.4f}",
         f"+{metrics['energy_preservation']['energy_improvement_%']:.1f}%"],
        ['Dynamic Range', f"{orig['dynamic_range']:.2f}",
         f"{synth['dynamic_range']:.2f}", f"{pres['dynamic_range']:.2f}",
         f"+{metrics['energy_preservation']['dr_improvement_%']:.1f}%"],
        ['Tempo (BPM)', f"{orig['tempo']:.1f}",
         f"{synth['tempo']:.1f}", f"{pres['tempo']:.1f}",
         f"+{metrics['temporal_preservation']['tempo_improvement_%']:.1f}%"],
        ['─' * 20, '─' * 12, '─' * 12, '─' * 12, '─' * 12],
        ['Overall Similarity', '100%',
         f"{metrics['overall_scores']['synthesized_similarity_%']:.1f}%",
         f"{metrics['overall_scores']['preserved_similarity_%']:.1f}%",
         f"+{metrics['overall_scores']['overall_improvement_%']:.1f}%"]
    ]

In [14]:
#  PIPELINE HELPER + RUNNER CELL

import os
from IPython.display import display, Audio, clear_output
try:
    from google.colab import files as colab_files
    IN_COLAB = True
except Exception:
    IN_COLAB = False

def upload_file_interactive():
    """Interactive upload for Colab or prompt path for local."""
    if IN_COLAB:
        print(" Colab upload: choose your audio file (wav/mp3/m4a/flac).")
        uploaded = colab_files.upload()
        if not uploaded:
            print(" No file uploaded.")
            return None
        file_path = list(uploaded.keys())[0]
        print(f" Uploaded: {file_path}")
        return file_path
    else:
        p = input("Enter path to audio file: ").strip()
        return p if p else None

def prepare_pipeline_with_file(path):
    """Validate file exists, load basic info and set PIPELINE_DATA['input_audio']"""
    if not path:
        print(" No path provided.")
        return False
    if not os.path.exists(path):
        print(f" File not found: {path}")
        return False
    # store
    PIPELINE_DATA['input_audio'] = path
    # reset downstream fields
    for k in ['detected_language','target_language','transcribed_text','translated_text',
              'synthesized_audio','preserved_audio','original_features','synthesized_features',
              'preserved_features','metrics','processing_times']:
        if k in PIPELINE_DATA and k != 'processing_times':
            PIPELINE_DATA[k] = None
    PIPELINE_DATA['processing_times'] = {}
    print(f" Pipeline prepared with file: {path}")
    return True

def download_if_colab(path):
    """Trigger download in Colab if available."""
    if IN_COLAB and os.path.exists(path):
        try:
            colab_files.download(path)
            print(f" Downloaded: {path}")
        except Exception as e:
            print(f" Could not auto-download ({e}). File is at: {path}")

def run_full_pipeline(target_lang: str = 'es', force_tts: str = None, preserve_acoustics: bool = True):
    """
    Run steps 1->9 in order.
    If PIPELINE_DATA['input_audio'] is None it will prompt upload (Colab) or path (local).
    Arguments:
      - target_lang: language code from SUPPORTED_LANGUAGES (default 'es')
      - force_tts: "coqui" or "bark" to force engine choice (optional)
      - preserve_acoustics: whether to apply preservation (step7)
    """
    clear_output()
    print("="*70)
    print(" RUNNING FULL VOICE DUBBING PIPELINE")
    print("="*70)
    # Step 1: ensure input file present
    if not PIPELINE_DATA.get('input_audio'):
        print("No input file found in PIPELINE_DATA. Prompting upload...")
        path = upload_file_interactive()
        ok = prepare_pipeline_with_file(path)
        if not ok:
            print("Aborting pipeline run.")
            return
    else:
        print(f"Input file already set: {PIPELINE_DATA['input_audio']}")
    # Step 2: detect language / transcribe
    try:
        det, text = step2_detect_language()
    except Exception as e:
        print(f" Error in step2_detect_language(): {e}")
        return
    # Step 3: select target language
    if target_lang is None:
        print("No target language provided; defaulting to 'es' (Spanish).")
        target_lang = 'es'
    if target_lang not in SUPPORTED_LANGUAGES:
        print(f" Target language '{target_lang}' not supported. Aborting.")
        return
    step3_select_target_language(target_lang)
    # Step 4: translation
    try:
        step4_translate_text()
    except Exception as e:
        print(f" Error in step4_translate_text(): {e}")
        return
    # Step 5: synthesis (allow forcing TTS)
    # If user wants to force engine, adjust code used in step5; easiest is to set a global flag and then call step5.
    if force_tts:
        print(f" Forcing TTS engine to: {force_tts}")
        # Try to set a tmp var used by step5 (it uses local variables).
        # Easiest approach: re-run step5 logic here to control engine explicitly.
        translated_text = PIPELINE_DATA['translated_text']
        target_lang_local = PIPELINE_DATA['target_language']
        reference_audio = PIPELINE_DATA['input_audio']
        print("\n Loading user-selected TTS engine and synthesizing...")
        try:
            # Force Coqui
            if force_tts.lower() == 'coqui':
                from TTS.api import TTS
                device = "cuda" if torch.cuda.is_available() else "cpu"
                tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=(device=="cuda"))
                tts.tts_to_file(text=translated_text, speaker_wav=reference_audio, language=target_lang_local, file_path="synthesized_audio.wav")
            else:
                # bark
                from bark import generate_audio, SAMPLE_RATE
                audio_array = generate_audio(translated_text)
                sf.write("synthesized_audio.wav", audio_array, SAMPLE_RATE)
            PIPELINE_DATA['synthesized_audio'] = "synthesized_audio.wav"
            print(" Forced TTS synthesis complete.")
            display(Audio(PIPELINE_DATA['synthesized_audio']))
        except Exception as e:
            print(f" Forced TTS synthesis failed: {e}")
            return
    else:
        # Call the defined step5
        try:
            step5_synthesize_speech()
        except Exception as e:
            print(f" Error in step5_synthesize_speech(): {e}")
            return
    # Step 6: Acoustic analysis (original vs synthesized)
    try:
        step6_acoustic_analysis()
    except Exception as e:
        print(f" Error in step6_acoustic_analysis(): {e}")
        return
    # Step 7: Acoustic preservation (optional)
    if preserve_acoustics:
        try:
            step7_acoustic_preservation()
        except Exception as e:
            print(f" Error in step7_acoustic_preservation(): {e}")
            print("You can try running with preserve_acoustics=False to isolate TTS issues.")
            return
    else:
        print(" Skipping acoustic preservation (preserve_acoustics=False).")
        PIPELINE_DATA['preserved_audio'] = PIPELINE_DATA.get('synthesized_audio')
    # Step 8: metrics
    try:
        step8_performance_metrics()
    except Exception as e:
        print(f" Error in step8_performance_metrics(): {e}")
        return
    # Step 9: visualization
    try:
        step9_visualization()
    except Exception as e:
        print(f" Error in step9_visualization(): {e}")
    # Offer downloads if in Colab
    if IN_COLAB:
        print("\n Preparing downloads (if files exist)...")
        for f in ['synthesized_audio.wav', 'preserved_audio.wav', 'acoustic_comparison.png', 'radar_comparison.png', 'processing_time.png']:
            if os.path.exists(f):
                try:
                    print(f" - {f}")
                    colab_files.download(f)
                except Exception:
                    print(f"   saved: {f}")
    print("\n FULL PIPELINE RUN COMPLETE")
    print("   • Synthesized audio:", PIPELINE_DATA.get('synthesized_audio'))
    print("   • Preserved audio:  ", PIPELINE_DATA.get('preserved_audio'))
    print("   • Metrics available: PIPELINE_DATA['metrics']")
    print("="*70)

# convenience wrapper for interactive upload + run
def upload_and_run(target_lang='es', force_tts=None, preserve_acoustics=True):
    p = upload_file_interactive()
    if not p:
        print("Upload failed or canceled.")
        return
    ok = prepare_pipeline_with_file(p)
    if not ok:
        return
    run_full_pipeline(target_lang=target_lang, force_tts=force_tts, preserve_acoustics=preserve_acoustics)

# Small interactive menu to help users run parts
def quick_menu():
    print("\nPIPELINE QUICK MENU")
    print("1. Upload file now and run full pipeline (default Spanish)")
    print("2. Upload and run full pipeline (choose language)")
    print("3. Run full pipeline using already uploaded file (default Spanish)")
    print("4. Run full pipeline forcing Bark TTS (use if Coqui poor)")
    print("5. Run full pipeline with preservation OFF (fast)")
    print("6. Exit")
    c = input("Choose an option [1-6]: ").strip()
    if c == '1':
        upload_and_run('es')
    elif c == '2':
        lang = input("Enter target language code (e.g. 'es','fr','de','hi'): ").strip()
        upload_and_run(lang)
    elif c == '3':
        run_full_pipeline('es')
    elif c == '4':
        upload_and_run('es', force_tts='bark')
    elif c == '5':
        upload_and_run('es', preserve_acoustics=False)
    else:
        print("Exiting menu.")

print("Helper runner loaded. Examples:")
print(" - upload_and_run(target_lang='es')           # interactive upload then run")
print(" - run_full_pipeline(target_lang='fr')         # run using PIPELINE_DATA['input_audio']")
print(" - quick_menu()                                # interactive menu")
print(" - upload_and_run(force_tts='bark')            # force Bark if Coqui is poor")
print("\nTip: If you changed the TTS or preservation methods, restart runtime and run cells 1-7 first to rebind functions/classes.")

Helper runner loaded. Examples:
 - upload_and_run(target_lang='es')           # interactive upload then run
 - run_full_pipeline(target_lang='fr')         # run using PIPELINE_DATA['input_audio']
 - quick_menu()                                # interactive menu
 - upload_and_run(force_tts='bark')            # force Bark if Coqui is poor

Tip: If you changed the TTS or preservation methods, restart runtime and run cells 1-7 first to rebind functions/classes.


In [15]:
SUPPORTED_LANGUAGES['en']['model'] = 'Helsinki-NLP/opus-mt-hi-en'

In [1]:
# Upload interactively and run everything (default target: English)
upload_and_run(target_lang='hi')

NameError: name 'upload_and_run' is not defined