# Voice Cloning with XTTS-v2

This notebook demonstrates voice cloning and text-to-speech generation using the [Coqui XTTS-v2](https://huggingface.co/coqui/XTTS-v2) model.


In [None]:
# Standard library
import os
import random as _rng
import re
import subprocess
from collections import defaultdict
from pathlib import Path

# Third-party
import librosa
import numpy as np
import pandas as pd
import soundfile as sf
import torch
import torchaudio
from IPython.display import Audio, display, HTML
from TTS.api import TTS
from tqdm import tqdm

In [None]:
# ============================================================================
# Configuration
# ============================================================================

# Project settings
PROJECT_DIR = "all_us_streets"  # Output directory for generated audio

# Random seed for reproducibility
RANDOM_SEED = 153

# Language filter: "all" for all languages, or specific code like "es", "fr", "de"
FILTER_LANGUAGE = "all"

# Client ID length for filenames
CLIENT_ID_SHORT_LENGTH = 8

# Translations of "Hello, my name is ..." for each language (via Google Translate)
LANGUAGE_TO_PREFIX = {
    "hi": "नमस्ते, मेरा नाम है... ",      # Hindi
    "ko": "안녕하세요, 제 이름은... ",       # Korean
    "hu": "Helló, a nevem... ",           # Hungarian
    "cs": "Ahoj, jmenuji se... ",         # Czech
    "tr": "Merhaba, benim adım... ",      # Turkish
    "zh-cn": "你好，我的名字是... ",        # Chinese
    "nl": "Hallo, mijn naam is... ",      # Dutch
    "ja": "こんにちは、私の名前は... ",      # Japanese
    "pl": "Cześć, mam na imię... ",       # Polish
    "ar": "مرحباً، اسمي... ",             # Arabic
    "pt": "Olá, meu nome é... ",          # Portuguese
    "it": "Ciao, mi chiamo... ",          # Italian
    "ru": "Привет, меня зовут... ",       # Russian
    "de": "Hallo, mein Name ist... ",     # German
    "fr": "Bonjour, je m'appelle... ",    # French
    "es": "Hola, mi nombre es... ",       # Spanish
}

# Translations of "... finish." for each language (via Google Translate)
LANGUAGE_TO_SUFFIX = {
    "hi": "... समाप्त।",      # Hindi
    "ko": "... 끝.",          # Korean
    "hu": "... vége.",        # Hungarian
    "cs": "... konec.",       # Czech
    "tr": "... bitiş.",       # Turkish
    "zh-cn": "... 结束。",     # Chinese
    "nl": "... einde.",       # Dutch
    "ja": "... 終わり。",      # Japanese
    "pl": "... koniec.",      # Polish
    "ar": "... انتهى.",       # Arabic
    "pt": "... fim.",         # Portuguese
    "it": "... fine.",        # Italian
    "ru": "... конец.",       # Russian
    "de": "... Ende.",        # German
    "fr": "... fin.",         # French
    "es": "... fin.",         # Spanish
}


In [None]:

PROJECT_DIR = "all_us_streets"  # <-- CHANGE THIS FOR DIFFERENT PROJECTS
streets_df = pd.read_csv("../street_names_dataset/street_names_100_per_city_random_state_589208.tsv", sep='\t')
streets_df = streets_df[['street_name']]
streets_df.columns = ['name']
# Keep only single-word street names (no spaces)
streets_df = streets_df[~streets_df['name'].str.contains(' ')].reset_index(drop=True)
print(f"{len(streets_df)} single-word street names to generate (1 file each, random speaker)")

In [None]:
# ============================================================================
# Compatibility Patches for Model Loading
# ============================================================================

def _patched_torchaudio_load(filepath, *args, **kwargs):
    """Load audio using soundfile instead of torchcodec (avoids FFmpeg 8 issue)."""
    data, sr = sf.read(filepath)
    if data.ndim == 1:
        data = data.reshape(1, -1)
    else:
        data = data.T
    return torch.from_numpy(data.astype(np.float32)), sr


In [None]:
# ============================================================================
# Device Setup & Model Loading
# ============================================================================

# Check available device
if torch.cuda.is_available():
    device = "cuda"
    print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    device = "mps"
    print("Using Apple Silicon GPU (MPS)")
else:
    device = "cpu"
    print("Using CPU (this will be slower)")

# Agree to Coqui license terms (non-commercial CPML)
os.environ["COQUI_TOS_AGREED"] = "1"

# Apply compatibility patches
_original_torch_load = torch.load
_original_torchaudio_load = torchaudio.load
torch.load = lambda *args, **kwargs: _original_torch_load(*args, **{**kwargs, 'weights_only': False})
torchaudio.load = _patched_torchaudio_load

# Load XTTS-v2 model
print("\nLoading XTTS-v2 model...")
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2")
tts.to(device)

# Restore torch.load but keep torchaudio.load patched
# (the patched version avoids FFmpeg/torchcodec errors during generation)
torch.load = _original_torch_load
# NOTE: torchaudio.load is intentionally left patched so that
# tts_to_file() calls during generation don't hit the torchcodec error.

print("Model loaded successfully!")


In [None]:
# streets_df, FILTER_LANGUAGE, and other config are now in Configuration cell (cell 2)


In [None]:
# Load XTTS supported languages and their codes from CSV
df_languages = pd.read_csv('xtts_supported_languages.csv')

# Create mappings between language names and codes
LANGUAGE_TO_CODE = dict(zip(df_languages['Language'], df_languages['Code']))
CODE_TO_LANGUAGE = dict(zip(df_languages['Code'], df_languages['Language']))

print(f"Loaded {len(df_languages)} XTTS supported languages")
print(df_languages)

# Base path for random sample clips
CV_BASE_PATH = Path("random_sample_clips")

def load_random_sample_clips(base_path: Path) -> pd.DataFrame:
    """Load audio clips and metadata from random_sample_clips directory.
    
    Expects each language folder to contain:
    - Audio files (.mp3, .wav, etc.)
    - validated.tsv file with metadata (from Common Voice)
    """
    all_dfs = []
    
    # Iterate through language folders
    for lang_folder in base_path.iterdir():
        if not lang_folder.is_dir():
            continue
            
        language_name = lang_folder.name
        language_code = LANGUAGE_TO_CODE.get(language_name, language_name.lower()[:2])
        
        # Load metadata from validated.tsv (required)
        validated_tsv = lang_folder / "validated.tsv"
        
        if not validated_tsv.exists():
            print(f"Skipping {language_name}: no validated.tsv found")
            continue
        
        # Load clips with metadata
        df = pd.read_csv(validated_tsv, sep='\t')
        df['language'] = language_code
        df['language_name'] = language_name
        df['file_path'] = df['path'].apply(lambda x: str(lang_folder / x))
        all_dfs.append(df)
        print(f"Loaded {len(df):,} clips from {language_name} ({language_code})")
    
    if not all_dfs:
        print("No clips loaded!")
        return pd.DataFrame()
    
    combined_df = pd.concat(all_dfs, ignore_index=True)
    print(f"\nTotal: {len(combined_df):,} clips across {len(all_dfs)} languages")
    
    return combined_df

# Load all clips
cv_metadata_df = load_random_sample_clips(CV_BASE_PATH)

In [None]:
# Display dataframe info
print(f"\nDataFrame shape: {cv_metadata_df.shape}")

# Show clips per language
print("\nAvailable Clips per language:")
print(cv_metadata_df.groupby(['language', 'language_name']).size().reset_index(name='count'))

# Show sample of the dataframe (only show columns that exist)
print("\nSample data:")
display_cols = ['path', 'language', 'language_name']
optional_cols = ['sentence', 'gender', 'age', 'duration_ms', 'client_id']
display_cols.extend([col for col in optional_cols if col in cv_metadata_df.columns])
cv_metadata_df[display_cols].head(10)

In [None]:
# Filter and shuffle clips for processing

def filter_and_shuffle_clips(df: pd.DataFrame, seed: int, 
                             filter_language: str = None) -> pd.DataFrame:
    """Filter clips by language and shuffle them randomly.
    
    This prepares all clips for processing by optionally filtering to a specific
    language and then shuffling the order for randomized processing.
    
    Args:
        df: DataFrame with clips metadata (must have 'language' column)
        seed: Random seed for reproducibility
        filter_language: Optional language code to filter to (e.g., "de", "fr"). 
                        If None or "all", uses all languages.
    """
    # Filter by language if specified
    if filter_language and filter_language.lower() != "all":
        original_len = len(df)
        df = df[df['language'] == filter_language].copy()
        print(f"Filtered to language '{filter_language}': {original_len} → {len(df)} clips")

    return df.sample(frac=1, random_state=seed).reset_index(drop=True)


# Filter and shuffle clips (uses FILTER_LANGUAGE from config cell)
sampled_clips_df = filter_and_shuffle_clips(cv_metadata_df, RANDOM_SEED, filter_language=FILTER_LANGUAGE)
sampled_clips_df = sampled_clips_df.groupby("language").head(3)

# Show stats
print("Clips per language:")
if 'client_id' in sampled_clips_df.columns:
    print(sampled_clips_df.groupby(['language', 'language_name']).agg(
        unique_clients=('client_id', 'nunique'),
        total_clips=('path', 'count')
    ).reset_index())
    print(f"\nTotal: {len(sampled_clips_df):,} clips available across {sampled_clips_df['client_id'].nunique():,} speakers")
else:
    print(sampled_clips_df.groupby(['language', 'language_name']).size().reset_index(name='count'))
    print(f"\nTotal: {len(sampled_clips_df):,} clips available")

# Show shuffled clips (only columns that exist)
print("\nShuffled clips preview:")
display_cols = ['path', 'language', 'language_name']
optional_cols = ['sentence', 'gender', 'age', 'duration_ms', 'client_id']
display_cols.extend([col for col in optional_cols if col in sampled_clips_df.columns])

In [None]:
# Listen to sampled clips
def play_sampled_clips(df: pd.DataFrame, max_per_language: int = 2):
    """Play sample audio clips from the dataframe."""
    for lang in sorted(df['language'].unique()):
        lang_df = df[df['language'] == lang].head(max_per_language)
        lang_name = lang_df['language_name'].iloc[0]
        
        display(HTML(f"<h3>{lang_name} ({lang})</h3>"))
        
        for _, row in lang_df.iterrows():
            print(f"File: {row['path']}")
            if 'sentence' in row and pd.notna(row.get('sentence')):
                print(f"Sentence: {row['sentence'][:100]}..." if len(str(row['sentence'])) > 100 else f"Sentence: {row['sentence']}")
            if pd.notna(row.get('gender')):
                print(f"Gender: {row['gender']}, Age: {row.get('age', 'N/A')}")
            display(Audio(row['file_path']))
            print("---")

In [None]:
# Extract the raw street name (what will be spoken)
def extract_raw_street_name(phrase):
    """Extract the street name from the phrase (e.g., 'I'm on ALEMANY' -> 'ALEMANY')."""
    return phrase.replace("I'm on ", "").strip()

# Extract clean street name for filename
def extract_filename_street_name(phrase):
    """Extract and clean street name for use in filename."""
    street = phrase.replace("I'm on ", "").strip()
    # Replace spaces with underscores, remove apostrophes
    street = re.sub(r"['\s]+", "_", street)
    # Remove any other special characters
    street = re.sub(r"[^a-zA-Z0-9_]", "", street)
    return street #keep it all caps

# Raw street name (what TTS will speak)
streets_df['street_name_raw'] = streets_df['name'].apply(extract_raw_street_name)
# Clean street name for filenames
streets_df['street_name'] = streets_df['name'].apply(extract_filename_street_name)

print(streets_df[['name', 'street_name_raw', 'street_name']].head(10))


In [None]:
# =============================================================================
# Voice Clone Generation - Helper Functions
# =============================================================================
# CLIENT_ID_SHORT_LENGTH is defined in Configuration cell (cell 2)

def get_short_client_id(client_id: str, length: int = CLIENT_ID_SHORT_LENGTH) -> str:
    """Get shortened client ID for use in filenames."""
    return client_id[:length]


def prepare_text_for_language(street_name_raw: str, language: str) -> str:
    """Add language-specific prefix and suffix to text.
    
    Args:
        street_name_raw: Raw street name text
        language: Language code (e.g., 'es', 'fr', 'de')
        
    Returns:
        Text with language-specific greeting prefix and suffix
    """
    prefix = LANGUAGE_TO_PREFIX.get(language, "")
    suffix = LANGUAGE_TO_SUFFIX.get(language, "")
    return f"{prefix}{street_name_raw}{suffix}"


def create_output_filename(short_id: str, street_name: str, language: str) -> str:
    """Create standardized output filename.
    
    Args:
        short_id: Shortened client/speaker ID
        street_name: Clean street name (safe for filenames)
        language: Language code
        
    Returns:
        Filename in format: {short_id}_{street_name}_{language}.wav
    """
    return f"{short_id}_{street_name}_{language}.wav"


def convert_mp3_to_wav(mp3_path: str, speaker_id: str, output_dir: str = "temp_wav") -> str:
    """Convert MP3 audio to WAV format for better XTTS compatibility.
    
    Args:
        mp3_path: Path to input MP3 file
        speaker_id: Speaker identifier for output filename
        output_dir: Directory to save converted WAV files
        
    Returns:
        Path to converted WAV file
        
    Raises:
        FileNotFoundError: If mp3_path doesn't exist
        RuntimeError: If FFmpeg conversion fails
    """
    if not os.path.exists(mp3_path):
        raise FileNotFoundError(f"Audio file not found: {mp3_path}")
    
    os.makedirs(output_dir, exist_ok=True)
    safe_speaker_id = re.sub(r'[^\w\-]', '_', speaker_id)
    wav_path = os.path.join(output_dir, f"{safe_speaker_id}.wav")
    
    # Skip if already converted
    if os.path.exists(wav_path):
        return wav_path
    
    # Convert using ffmpeg (22050 Hz, mono)
    try:
        subprocess.run([
            'ffmpeg', '-i', mp3_path,
            '-ar', '22050',
            '-ac', '1',
            '-y',
            wav_path
        ], check=True, capture_output=True, text=True)
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"FFmpeg conversion failed: {e.stderr}")
    
    return wav_path


def generate_single_clone(
    tts,
    text: str,
    output_path: str,
    ref_audio: str,
    language: str
) -> None:
    """Generate a single voice clone audio file.
    
    Args:
        tts: TTS model instance
        text: Text to synthesize
        output_path: Where to save the output file
        ref_audio: Path to reference audio for voice cloning
        language: Language code for synthesis
    """
    tts.tts_to_file(
        text=text,
        file_path=output_path,
        speaker_wav=ref_audio,
        language=language,
    )


def process_speaker_phrases(
    tts,
    speaker_id: str,
    short_id: str,
    language: str,
    ref_audio: str,
    phrases_df: pd.DataFrame,
    output_dir: str,
    pbar
) -> tuple[list[dict], list[tuple]]:
    """Process all phrases for a single speaker.
    
    Args:
        tts: TTS model instance
        speaker_id: Full client ID
        short_id: Shortened client ID
        language: Language code
        ref_audio: Path to reference audio
        phrases_df: DataFrame with phrase information
        output_dir: Output directory for generated files
        pbar: tqdm progress bar
        
    Returns:
        Tuple of (generated_files, errors) lists
    """
    generated = []
    errors = []
    
    for _, phrase_row in phrases_df.iterrows():
        # Prepare text and filenames
        street_name_raw = phrase_row['street_name_raw']
        street_name = phrase_row['street_name']
        text = prepare_text_for_language(street_name_raw, language)
        
        output_filename = create_output_filename(short_id, street_name, language)
        output_path = os.path.join(output_dir, output_filename)
        
        # Skip if already exists
        if os.path.exists(output_path):
            pbar.update(1)
            continue
        
        # Update progress bar with current item
        pbar.set_postfix_str(f"{language}/{street_name[:15]}")
        
        try:
            generate_single_clone(tts, text, output_path, ref_audio, language)
            generated.append({
                'client_id': speaker_id,
                'short_id': short_id,
                'language': language,
                'street_name': street_name,
                'text_spoken': text,
                'output_file': output_path
            })
        except Exception as e:
            errors.append((speaker_id, text, str(e)))
        
        pbar.update(1)
    
    return generated, errors


def print_generation_summary(generated_count: int, errors: list) -> None:
    """Print summary of generation results.
    
    Args:
        generated_count: Number of successfully generated files
        errors: List of error tuples
    """
    print(f"\nGeneration complete!")
    print(f"   Successfully generated: {generated_count:,} files")
    print(f"   Errors: {len(errors)}")
    
    if errors:
        print("\nErrors encountered:")
        for err in errors[:10]:
            print(f"   {err}")
        if len(errors) > 10:
            print(f"   ... and {len(errors) - 10} more errors")


# =============================================================================
# Main Generation Function
# =============================================================================

def generate_voice_clones(
    speakers_df: pd.DataFrame,
    phrases_df: pd.DataFrame,
    output_dir: str = "outputs",
    convert_to_wav: bool = True
) -> tuple[pd.DataFrame, list]:
    """Generate voice clones for each speaker saying each phrase.
    
    Args:
        speakers_df: DataFrame with columns: client_id, file_path, language
        phrases_df: DataFrame with columns: street_name_raw, street_name
        output_dir: Directory to save output files
        convert_to_wav: Whether to convert MP3 reference audio to WAV first
        
    Returns:
        Tuple of (generated_df, errors):
            - generated_df: DataFrame with info about generated files
            - errors: List of error tuples (client_id, text, error_msg)
    """
    os.makedirs(output_dir, exist_ok=True)
    
    total_generations = len(speakers_df) * len(phrases_df)
    print(f"Starting voice cloning generation")
    print(f"   Speakers: {len(speakers_df)}")
    print(f"   Phrases: {len(phrases_df)}")
    print(f"   Total generations: {total_generations:,}")
    print(f"   Output directory: {output_dir}/")
    print()
    
    all_generated = []
    all_errors = []
    
    with tqdm(total=total_generations, desc="Generating audio") as pbar:
        for _, speaker in speakers_df.iterrows():
            client_id = speaker['client_id']
            short_id = get_short_client_id(client_id)
            language = speaker['language']
            ref_audio = speaker['file_path']
            
            # Convert MP3 to WAV if needed
            if convert_to_wav and ref_audio.endswith('.mp3'):
                try:
                    ref_audio = convert_mp3_to_wav(ref_audio, short_id)
                except Exception as e:
                    all_errors.append((client_id, "wav_conversion", str(e)))
                    pbar.update(len(phrases_df))
                    continue
            
            # Process all phrases for this speaker
            generated, errors = process_speaker_phrases(
                tts=tts,
                speaker_id=client_id,
                short_id=short_id,
                language=language,
                ref_audio=ref_audio,
                phrases_df=phrases_df,
                output_dir=output_dir,
                pbar=pbar
            )
            
            all_generated.extend(generated)
            all_errors.extend(errors)
    
    print_generation_summary(len(all_generated), all_errors)
    
    return pd.DataFrame(all_generated), all_errors


print("Generation functions defined. Ready to run.")


In [None]:
# Run the voice cloning generation for all speakers and phrases
# Generate ONE file per street, with a randomly selected speaker

_rng.seed(RANDOM_SEED)

output_dir = Path(PROJECT_DIR) / "outputs"
output_dir.mkdir(exist_ok=True, parents=True)

existing_files = list(output_dir.glob("*.wav"))
print(f"Output directory: {output_dir}/")
print(f"   Existing files: {len(existing_files)}")
print(f"   Streets to generate: {len(streets_df)}")

# For each street, randomly assign one speaker from sampled_clips_df
speaker_assignments = [sampled_clips_df.sample(n=1, random_state=_rng.randint(0, 2**31)).iloc[0] 
                       for _ in range(len(streets_df))]

all_generated = []
all_errors = []

with tqdm(total=len(streets_df), desc="Generating audio") as pbar:
    for i, (_, street_row) in enumerate(streets_df.iterrows()):
        speaker = speaker_assignments[i]
        client_id = speaker['client_id']
        short_id = get_short_client_id(client_id)
        language = speaker['language']
        ref_audio = speaker['file_path']
        
        street_name = street_row['street_name']
        street_name_raw = street_row['street_name_raw']
        text = prepare_text_for_language(street_name_raw, language)
        
        # Build output filename: {short_id}_{street_name}_{language}.wav
        out_filename = create_output_filename(short_id, street_name, language)
        out_path = os.path.join(str(output_dir), out_filename)
        
        # Skip if already exists
        if os.path.exists(out_path):
            pbar.update(1)
            continue
        
        # Convert MP3 to WAV if needed
        if ref_audio.endswith('.mp3'):
            try:
                ref_audio = convert_mp3_to_wav(ref_audio, short_id)
            except Exception as e:
                all_errors.append((client_id, text, str(e)))
                pbar.update(1)
                continue
        
        try:
            generate_single_clone(tts, text, out_path, ref_audio, language)
            all_generated.append({
                'client_id': client_id,
                'short_id': short_id,
                'language': language,
                'text': text,
                'street_name': street_name,
                'file_path': out_path
            })
        except Exception as e:
            all_errors.append((client_id, text, str(e)))
        
        pbar.update(1)

print(f"\nDone! Generated {len(all_generated)} files, {len(all_errors)} errors")

# Save the generation log
generated_df = pd.DataFrame(all_generated)
log_path = output_dir / "generation_log.csv"
generated_df.to_csv(log_path, index=False)
print(f"Generation log saved to {log_path}")
