In [1]:
import os
import time
import json
import soundfile as sf
from IPython.display import Audio as Audio_rep, display
import logging, warnings
from transformers import logging as hf_logging
from huggingface_hub import login
import kagglehub
import pandas as pd
from datasets import Dataset, load_dataset, load_from_disk, Audio
from tqdm import tqdm
import torch
import torchaudio
import librosa

  from .autonotebook import tqdm as notebook_tqdm


---
## Setting up HuggingFace

In [2]:
########################################################
# Hugging face login
#######################################################
# Silence transformers/TRL logs early
hf_logging.set_verbosity_error()
logging.getLogger("trl").setLevel(logging.ERROR)

# Hide specific noisy warnings
warnings.filterwarnings(
    "ignore",
    message=r".*loss_type=None.*ForCausalLMLoss.*",
    category=UserWarning,
)
warnings.filterwarnings(
    "ignore",
    message=r".*cuDNN SDPA backward got grad_output\.strides\(\) != output\.strides\(\).*",
    category=UserWarning,
)
os.environ["TQDM_NOTEBOOK"] = "0"

#############################################
########## Google Colab #####################
# setting key in secrets google colab
# from google.colab import userdata
# hf_key = userdata.get('HUGGINGFACE_API_KEY')
#############################################
########## Locally with env file ############
# Load .env file (if present)
from dotenv import load_dotenv
load_dotenv()
hf_key = os.environ.get("HUGGINGFACE_API_KEY")
#############################################

if hf_key:
    login(hf_key)
else:
    raise EnvironmentError("HUGGINGFACE_API_KEY not found. Copy .env.template to .env and add your token. See Instruction.md")

---
## Load Dataset - LJ Speech Dataset

You can either download the dataset from Kaggle or Huggingface, 13100 short audio clips, with transcription, from Kaggle 
you get individual .wav files.

The size of the dataset is 3GB.

---
### Kaggle

In [3]:
path = kagglehub.dataset_download("mathurinache/the-lj-speech-dataset")

print("Path to dataset files:", path)

# Create in the root of the project a /datasets folder and move the donwloaded dataset there,
# otherwise set use the default path where it has been saved (usually .cache/kagglehub)
DATASET_PATH = 'mathurinache/the-lj-speech-dataset/versions/1/LJSpeech-1.1'
DATASET_NAME = 'LJSpeech1_1'

Downloading from https://www.kaggle.com/api/v1/datasets/download/mathurinache/the-lj-speech-dataset?dataset_version_number=1...


100%|██████████| 2.99G/2.99G [02:00<00:00, 26.6MB/s]

Extracting files...





Path to dataset files: /storage/homefs/kr23w045/.cache/kagglehub/datasets/mathurinache/the-lj-speech-dataset/versions/1


In [8]:
# Load metadata
df = pd.read_csv(
    f"/storage/homefs/kr23w045/.cache/kagglehub/datasets/mathurinache/the-lj-speech-dataset/versions/1/LJSpeech-1.1/metadata.csv",
    sep="|",
    names=["id", "text", "normalized"],
)

# Add full audio paths
df["audio"] = df["id"].apply(
    lambda x: f"/storage/homefs/kr23w045/.cache/kagglehub/datasets/mathurinache/the-lj-speech-dataset/versions/1/LJSpeech-1.1/wavs{x}.wav"
)

# Rename fields to match your benchmark code
df = df.rename(columns={"text": "spoken_text"})

# Create HuggingFace dataset
dataset = Dataset.from_pandas(df)

# Tell HF that "audio" contains audio files
dataset = dataset.cast_column("audio", 
    load_dataset("audiofolder", data_dir=f"/storage/homefs/kr23w045/.cache/kagglehub/datasets/mathurinache/the-lj-speech-dataset/versions/1/LJSpeech-1.1/wavs")["train"].features["audio"]
)

Downloading data: 100%|██████████| 13100/13100 [00:00<00:00, 87407.19files/s] 
Generating train split: 13100 examples [00:00, 28601.82 examples/s]


---
### HuggingFace

In [9]:
DATASET_NAME = "MikhailT/lj-speech"
# load dataset
import datasets

dataset_local = True
if dataset_local:
    dataset = load_from_disk(os.path.join('datasets', DATASET_NAME))
else:
    dataset = load_dataset(DATASET_NAME, split="full", streaming=False).select(range(NUM_SAMPLES))

dataset = dataset.cast_column("audio", datasets.features.Audio(decode=False))

# save dataset locally    
# dataset.save_to_disk(f'./datasets/{DATASET_NAME}')

FileNotFoundError: Directory datasets/MikhailT/lj-speech not found

---
## Models Building 

In [10]:
#######################################################
# CONFIGURATION
#######################################################
from pathlib import Path

# Set your models here
TTS_MODELS = [
    # "bark_small", # "suno/bark-small",
    "speecht5_tts", # "microsoft/speecht5_tts"                          
    "mms_tts"
]

models = {}

# Directory to store generated audio
OUTPUT_DIR = Path("tts_results")
OUTPUT_DIR.mkdir(exist_ok=True)


# Set the device
device = "cuda" if torch.cuda.is_available() else "cpu"

---
### Microsoft/speecht5_tts

In [11]:
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan

speecht5_tts_local_path = './models/speecht5_tts'
speecht5_tts_local = True

# Load models
try:
    processor = SpeechT5Processor.from_pretrained(speecht5_tts_local_path if speecht5_tts_local else "microsoft/speecht5_tts")
    model = SpeechT5ForTextToSpeech.from_pretrained(speecht5_tts_local_path if speecht5_tts_local else "microsoft/speecht5_tts")
    vocoder = SpeechT5HifiGan.from_pretrained(os.path.join(speecht5_tts_local_path, "vocoder") if speecht5_tts_local else "microsoft/speecht5_hifigan")
except OSError:
    raise EnvironmentError("Make sure that the local path to the model is correct.")
else:
    models["speecht5_tts"] = {"processor": processor, "model": model, "vocoder": vocoder}

OSError: Make sure that the local path to the model is correct.

In [None]:
# save the model locally
# save_path = "./models/speecht5_tts"
# models["speecht5_tts"]['model'].save_pretrained(save_path)
# models["speecht5_tts"]['processor'].save_pretrained(save_path)
# models["speecht5_tts"]['vocoder'].save_pretrained(os.path.join(save_path, 'vocoder'))

#### Testing

In [None]:
# Prepare input text
inputs = models['speecht5_tts']['processor'](text="Test one, two, three, I am talking!", return_tensors="pt")

# Use a random speaker embedding (512 dimensions)
speaker_embeddings = torch.randn(1, 512)

# Generate speech
with torch.no_grad():
    speech = models['speecht5_tts']['model'].generate_speech(
        inputs["input_ids"],
        speaker_embeddings,
        vocoder=models['speecht5_tts']['vocoder']
    )

# Save output
sf.write(f"{OUTPUT_DIR}/speecht5_tts/speech_test.wav", speech.numpy(), 16000)

In [None]:
display(Audio_rep(f"{OUTPUT_DIR}/speecht5_tts/speech_test.wav"))

---
### facebook/mms-tts-eng

In [None]:
from transformers import VitsModel, AutoTokenizer

mms_tts_local_path = './models/mms_tts'
mms_tts_local = False
# Load models
try:
    model = VitsModel.from_pretrained(mms_tts_local_path if mms_tts_local else "facebook/mms-tts-eng")
    tokenizer = AutoTokenizer.from_pretrained(mms_tts_local_path if mms_tts_local else "facebook/mms-tts-eng")
except OSError:
    raise EnvironmentError("Make sure that the local path to the model is correct.")
else:
    models["mms_tts"] = {"model": model, "tokenizer": tokenizer}

In [None]:
# # save the model locally
# save_path = "./models/mms_tts"
# models["mms_tts"]['model'].save_pretrained(save_path)
# models["mms_tts"]['tokenizer'].save_pretrained(save_path)

#### Testing

In [None]:
text = "Test one, two, three, I am talking!"
inputs = models['mms_tts']['tokenizer'](text, return_tensors="pt")

with torch.no_grad():
    output = models['mms_tts']['model'](**inputs).waveform

sf.write(f"{OUTPUT_DIR}/mms_tts/speech_test.wav", output.cpu().numpy().squeeze(), models['mms_tts']['model'].config.sampling_rate)

In [None]:
display(Audio_rep(f"{OUTPUT_DIR}/mms_tts/speech_test.wav"))

---
## Benchmarking

In [None]:
#######################################################
# METRICS — Small, simple (expand as needed)
#######################################################
import soundfile as sf

def audio_duration(path):
    """Returns duration in seconds."""
    y, sr = librosa.load(path, sr=None)
    return len(y) / sr

def infer_model(models, model_name, texts):
    if model_name == 'speecht5_tts':
        if len(texts) > 1:
            print("Speecht5_tts cannot be ran with batches, only the first text will be passed to the model")
            
        inputs = models[model_name]['processor'](text=texts[0], return_tensors="pt")
        # Use a random speaker embedding (512 dimensions)
        # speaker_embeddings = torch.randn(1, 512)
        # fix voice for the moment
        speaker_embeddings = torch.zeros(1, 512)

        # Generate speech
        with torch.no_grad():
            generated = models[model_name]['model'].generate_speech(
                inputs["input_ids"],
                speaker_embeddings,
                vocoder=models[model_name]['vocoder']
            )
        
        return [generated]
        
    elif model_name == 'mms_tts':
        inputs = models[model_name]['tokenizer'](text=texts, return_tensors="pt", padding=True)
    
        with torch.no_grad():
          generated = models[model_name]['model'](**inputs).waveform
        
        return generated

    else:
        Warning("Model not implemented yet!")
        return None

def save_gen_audio(model_name, output_audio_path, audio, sample):
    if model_name == 'speecht5_tts':
        # output_audio_path = model_dir / f"sample_{b_start}_bs{batch_size}.wav"
        sf.write(output_audio_path, audio.numpy(), 16000)
        # Metrics
        duration = audio_duration(output_audio_path)
        # similarity = mel_spectrogram_similarity(reference_audio_path,output_audio_path)
    elif model_name == 'mms_tts':
        waveform = audio.cpu().numpy()
        sf.write(output_audio_path, waveform, models[model_name]['model'].config.sampling_rate)
        # Metrics
        duration = audio_duration(output_audio_path)
        # similarity = mel_spectrogram_similarity(reference_audio_path,output_audio_path)

    return {
        "text": sample["spoken_text"],
        "reference": sample["audio"]["path"],
        "generated": str(output_audio_path),
        "duration": duration
        # "mel_similarity": float(similarity)
    }

# def mel_spectrogram_similarity(ref_path, gen_path):
#     """
#     Simple similarity metric comparing mel spectrogram cosine similarity.
#     Not perfect, but useful for midterm presentation.
#     """
#     ref, sr_ref = librosa.load(ref_path, sr=22050)
#     gen, sr_gen = librosa.load(gen_path, sr=22050)

#     ref_mel = librosa.feature.melspectrogram(ref, sr=22050)
#     gen_mel = librosa.feature.melspectrogram(gen, sr=22050)

#     ref_vec = np.mean(ref_mel, axis=1)
#     gen_vec = np.mean(gen_mel, axis=1)

#     return 1 - cosine(ref_vec, gen_vec)
    

#######################################################
# Run the benchmark on a given dataset
#######################################################
def run_tts_benchmark(dataset, exp_folder):

    for model_name in TTS_MODELS:
        print(f"\n### Running inference for: {model_name}")
        model_dir = OUTPUT_DIR / model_name.replace("/", "_") / exp_folder
        model_dir.mkdir(exist_ok=True)
        
        # Warmup the model
        warmup_text = "Warm up the model."
        print("Running warm-up…")
        _ = infer_model(models, model_name, [warmup_text])
        print("Warm-up complete.\n")
        
        for batch_size in BATCH_SIZES:
          print(f"Batch size: {batch_size}")

          model_results = []
          samples = list(dataset)

          if (model_name == 'speecht5_tts' and batch_size != 1):
             print('speecht5_tts does not support batching, skip...')
             continue

          for b_start in tqdm(range(0, len(samples), batch_size)):
              batch = samples[b_start:b_start+batch_size]
              texts = [s["spoken_text"] for s in batch]

              # ----- Inference -----
              t0 = time.time()

              generated_batch = infer_model(models, model_name, texts)

              if generated_batch is None:
                  print("Something went wrong while generating the batch!")
                  return

              t1 = time.time()

              ## save generated audio and results
              for idx, (sample, audio) in enumerate(zip(batch, generated_batch)):
                output_audio_path = model_dir / f"sample_{b_start + idx}_bs{batch_size}.wav"
                results = save_gen_audio(model_name, output_audio_path, audio, sample)
                results['batch_size'] = batch_size
                results['inference_time'] = t1 - t0
                model_results.append(results)

          # Save model results
          with open(model_dir / f"results_bs{batch_size}.json", "w") as f:
              json.dump(model_results, f, indent=2)
              
    print("\nDone! Benchmarking done, save results in", OUTPUT_DIR, " and inside the model folder, under ", exp_folder)

In [None]:
BATCH_SIZES = [1, 5]
NUM_SAMPLES = 2   # subset for fast evaluation
exp_folder = 'test0'

In [None]:
dataset_sampled = dataset.select(range(NUM_SAMPLES))
run_tts_benchmark(dataset_sampled, exp_folder)
print(TTS_MODELS)

In [None]:
import pandas as pd
exp_to_import = 'test0'
df_mms_tts_b1 = pd.read_json(f'tts_results/mms_tts/{exp_to_import}/results_bs1.json')
df_mms_tts_b5 = pd.read_json(f'tts_results/mms_tts/{exp_to_import}/results_bs5.json')
df_speecht5_tts_b1 = pd.read_json(f'tts_results/speecht5_tts/{exp_to_import}/results_bs1.json')

In [None]:
df_mms_tts_b1.head(5)
df_mms_tts_b1['rtf'] = df_mms_tts_b1['duration']/df_mms_tts_b1['inference_time']
df_mms_tts_b1.describe()

In [None]:
df_mms_tts_b5.head(5)
df_mms_tts_b5['rtf'] = 5*df_mms_tts_b5['duration']/df_mms_tts_b5['inference_time']
df_mms_tts_b5.describe()

In [None]:
df_speecht5_tts_b1.head(5)
df_speecht5_tts_b1['rtf'] = df_speecht5_tts_b1['duration']/df_speecht5_tts_b1['inference_time']
df_speecht5_tts_b1.describe()