# HW4 (c): Text-to-Speech with Language Models (40 points)

## Introduction

In this assignment, you will implement and train a Language Model (LM) for Text-to-Speech (TTS) synthesis, following the CosyVoice2 architecture. The assignment focuses on the core component of modern TTS systems: autoregressive language modeling for speech token generation.

### Learning Objectives

* Understand modern TTS architecture and how LLM-based TTS systems work
* Implement autoregressive generation with a transformer that generates speech tokens from text
* Handle multi-modal sequences by working with text and speech tokens in a unified framework
* Train a language model from scratch on real speech data
* Implement zero-shot voice cloning with in-context learning
* Evaluate TTS quality using Word Error Rate (WER) with automatic speech recognition

### System Architecture

```
Text → [Text Tokenizer] → Text Tokens → [Your LM] → Speech Tokens → [Flow Model] → Mel → [Vocoder] → Audio
```

You will implement and train the Language Model component that converts text tokens to speech tokens. All other components (tokenizers, flow model, vocoder) are provided as pre-trained models.

### Assignment Package

Download [**hw4_util.zip**](https://drive.google.com/file/d/1q5OdFHgXBtdK5MHJCWmWeyBHmz4Y-QVy/view?usp=sharing) and upload it to your Google Drive.

### Dataset

The assignment uses the LibriTTS dataset (pre-tokenized for efficiency):
* 354,780 training samples
* 9,957 test samples
* Multi-speaker data for diverse voice generation

### Resources

- [CosyVoice2 Paper](https://arxiv.org/abs/2412.10117)
- [LibriTTS Dataset](https://www.openslr.org/60/)
- [Transformer Architecture](https://arxiv.org/abs/1706.03762)

### Submission Requirements

You will submit **only 2 files** to Gradescope:

1. **`submission_[YOUR_ID].txt`** - Auto-generated WER evaluation results from Part 8
2. **`hw4-c.pdf`** - PDF export of this notebook showing all your code and outputs

**Important Notes:**
* The submission file is automatically generated when you run Part 8's evaluation
* Do NOT modify the submission file format - it must match the exact format for autograding
* Your WER score will be calculated on Gradescope using your transcriptions
* Ensure all code cells have been executed and outputs are visible in the PDF


## Part 0: Environment Setup

The cell below will automatically detect your environment (Google Colab or Local Machine) and set it up accordingly.


In [8]:
!pip install -r hw4_util/requirements.txt



In [10]:
!mv /content/hw4_util/* /content/

In [5]:
# from google.colab import drive
# drive.mount('/content/drive')

# !unzip /content/drive/MyDrive/cs283/hw4_util.zip -d /content
# !mv /content/hw4_util/* /content/
# !pip install -r requirements.txt

# import importlib
# hw4_util = importlib.machinery.SourceFileLoader("hw4_util", "./hw4_util.py").load_module()

# Environment Verification
from hw4_util import check_environment

check_environment()

Detected: Google Colab

Checking core dependencies...
  [OK] PyTorch 2.8.0+cu126
  [OK] GPU: Tesla T4
  [OK] Transformers 4.57.1
  [OK] Gradio
  [OK] CosyVoice/ found

Environment Check: READY

You can proceed with the assignment!


True

## Part 1: Setup and Configuration

In this part, you will set up the environment and define the configuration for the assignment.


In [6]:
# Essential imports
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import unpad_sequence, pad_sequence
import numpy as np
import math
from tqdm.notebook import tqdm
from typing import Tuple
from IPython.display import Audio, display
import torchaudio

# Set environment variables
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

# Check GPU availability
assert torch.cuda.is_available(), "GPU is required for this assignment!"
device = torch.device('cuda')
print(f"Using GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

Using GPU: Tesla T4
Memory: 15.8 GB


In [7]:
# Configuration
class Config:
    """Essential configuration for HW4 - Paths and Fixed Tokens Only"""

    # Paths (Fixed - Do Not Change)
    DATA_CACHE_DIR = './libritts_token_cache'
    TRAIN_CACHE = 'libritts_train.pt'
    TEST_CACHE = 'libritts_test.pt'
    PRETRAINED_DIR = './pretrained_models'
    RESULTS_DIR = './results'

    # Special tokens (Fixed - Matching CosyVoice2)
    IGNORE_ID = -1        # Padding/ignore token for loss
    SOS_EOS_ID = 0        # Start/End of sequence token
    TASK_ID = 1           # Task separator token

    # Device configuration
    DEVICE = 'cuda'

# Create necessary directories
import os
os.makedirs(Config.RESULTS_DIR, exist_ok=True)
os.makedirs(Config.PRETRAINED_DIR, exist_ok=True)

print(f"Configuration loaded")
print(f"Data directory: {Config.DATA_CACHE_DIR}")
print(f"Results directory: {Config.RESULTS_DIR}")


Configuration loaded
Data directory: ./libritts_token_cache
Results directory: ./results


## Part 2: Load Pretrained Components

In this section, you will load the pre-trained [CosyVoice2](https://arxiv.org/abs/2412.10117) components that remain frozen during training:

* **Text Tokenizer**: Qwen2 BPE tokenizer for converting text to tokens
* **Speech Tokenizer**: VQ-VAE for converting audio to discrete speech tokens  
* **Flow Matching Model**: For converting speech tokens to mel-spectrograms (inference only)
* **Vocoder**: HiFi-GAN for converting mel-spectrograms to audio waveforms (inference only)


In [8]:
# Import the pretrained model utilities
# Note: This module is provided and should NOT be modified
from hw4_util import (
    download_pretrained_models,
    load_text_tokenizer,
    load_speech_tokenizer,
    load_flow_model,
    load_vocoder
)

# Download pretrained models if needed
print("Downloading pretrained CosyVoice2 models...")
model_dir = download_pretrained_models(Config.PRETRAINED_DIR)
print(f"Models ready at: {model_dir}")

# Load tokenizers (needed for training)
print("\nLoading tokenizers for training...")
text_tokenizer = load_text_tokenizer(model_dir)
speech_tokenizer = load_speech_tokenizer(model_dir, device)

print(f"Text tokenizer loaded")
print(f"Vocab size: {text_tokenizer.vocab_size}")
print(f"Speech tokenizer loaded")
print(f"Vocab size: {speech_tokenizer.vocab_size}")

# Note: Flow model and vocoder will be loaded later for inference only


Downloading pretrained CosyVoice2 models...
Downloading CosyVoice2-0.5B...
Downloading Model from https://www.modelscope.cn to directory: ./pretrained_models/iic/CosyVoice2-0.5B


2025-11-17 15:23:11,240 - modelscope - INFO - Got 19 files, start to download ...


Processing 19 items:   0%|          | 0.00/19.0 [00:00<?, ?it/s]

Downloading [CosyVoice-BlankEN/config.json]:   0%|          | 0.00/659 [00:00<?, ?B/s]

Downloading [flow.decoder.estimator.fp32.onnx]:   0%|          | 0.00/273M [00:00<?, ?B/s]

Downloading [flow.cache.pt]:   0%|          | 0.00/430M [00:00<?, ?B/s]

Downloading [configuration.json]:   0%|          | 0.00/47.0 [00:00<?, ?B/s]

Downloading [campplus.onnx]:   0%|          | 0.00/27.0M [00:00<?, ?B/s]

Downloading [cosyvoice2.yaml]:   0%|          | 0.00/7.16k [00:00<?, ?B/s]

Downloading [asset/dingding.png]:   0%|          | 0.00/94.1k [00:00<?, ?B/s]

Downloading [flow.encoder.fp16.zip]:   0%|          | 0.00/111M [00:00<?, ?B/s]

Downloading [flow.encoder.fp32.zip]:   0%|          | 0.00/183M [00:00<?, ?B/s]

Downloading [flow.pt]:   0%|          | 0.00/430M [00:00<?, ?B/s]

Downloading [CosyVoice-BlankEN/generation_config.json]:   0%|          | 0.00/242 [00:00<?, ?B/s]

Downloading [hift.pt]:   0%|          | 0.00/79.5M [00:00<?, ?B/s]

Downloading [llm.pt]:   0%|          | 0.00/1.88G [00:00<?, ?B/s]

Downloading [CosyVoice-BlankEN/merges.txt]:   0%|          | 0.00/1.34M [00:00<?, ?B/s]

Downloading [CosyVoice-BlankEN/model.safetensors]:   0%|          | 0.00/942M [00:00<?, ?B/s]

Downloading [README.md]:   0%|          | 0.00/11.8k [00:00<?, ?B/s]

Downloading [speech_tokenizer_v2.onnx]:   0%|          | 0.00/473M [00:00<?, ?B/s]

Downloading [CosyVoice-BlankEN/tokenizer_config.json]:   0%|          | 0.00/1.26k [00:00<?, ?B/s]

Downloading [CosyVoice-BlankEN/vocab.json]:   0%|          | 0.00/2.65M [00:00<?, ?B/s]

2025-11-17 15:27:05,837 - modelscope - INFO - Download model 'iic/CosyVoice2-0.5B' successfully.
2025-11-17 15:27:05,838 - modelscope - INFO - Creating symbolic link [./pretrained_models/iic/CosyVoice2-0.5B].


Downloaded to: ./pretrained_models/iic/CosyVoice2-0___5B
Models ready at: ./pretrained_models/iic/CosyVoice2-0___5B

Loading tokenizers for training...
Text tokenizer loaded (Qwen2 BPE)
  Vocab size: 151643
Text tokenizer loaded
Vocab size: 151643
Speech tokenizer loaded
Vocab size: 6561


## Part 3: Dataset and Data Loading

In this part, you will implement a custom dataset class for the LibriTTS data. The dataset uses pre-tokenized speech data for efficiency.

### Requirements:
* Implement text tokenization (on-the-fly)
* Load pre-computed speech tokens from cache
* Handle multi-speaker information
* Implement proper sequence padding and batching

**TODO:** Complete the `__getitem__` method in the CosyVoiceDataset class below.

**TODO:** Implement the `cosyvoice_collate_fn` function for batching.


In [9]:
class CosyVoiceDataset(Dataset):
    """
    Multi-speaker TTS Dataset for CosyVoice2 using pre-tokenized LibriTTS data

    This dataset uses pre-computed speech tokens from cache, avoiding
    on-the-fly audio processing for faster training.
    """
    def __init__(self, samples_list, text_tokenizer, speaker_to_idx_dict,
                 max_text_len=200, max_speech_len=500):
        """
        Args:
            samples_list: List of pre-processed samples from .pt file
            text_tokenizer: Pretrained text tokenizer
            speaker_to_idx_dict: Speaker ID to index mapping from metadata
            max_text_len: Maximum text token length
            max_speech_len: Maximum speech token length
        """
        self.samples = samples_list
        self.text_tokenizer = text_tokenizer
        self.speaker_to_idx = speaker_to_idx_dict
        self.max_text_len = max_text_len
        self.max_speech_len = max_speech_len
        self.num_speakers = len(speaker_to_idx_dict)

        print(f"  Dataset initialized with {len(self.samples)} samples")
        print(f"  Number of speakers: {self.num_speakers}")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        """
        Get a single sample

        Returns:
            dict with keys: 'utt', 'text', 'text_token', 'speech_token',
                           'speaker_idx', 'speaker_id'
        """
        # TODO: Implement the __getitem__ method
        #
        # Tasks:
        # 1. Get sample from self.samples[idx]
        sample = self.samples[idx]
        # 2. Extract: text, speaker_id, speech_tokens (pre-computed), utt_id
        text = sample.get("text", None)
        speaker_id = sample.get("speaker_id", None)
        speech_tokens = sample.get("speech_tokens", None)
        utt_id = sample.get("utt")
        # 3. Return None if text or speech_tokens are invalid/empty
        if text is None or len(text.strip()) == 0:
          print('text is emtpy')
          return None
        if speech_tokens is None or len(speech_tokens) == 0:
          print('speech tokens is emtpy')
          return None

        if min(speech_tokens) < 0:
          return None

        # if speaker_id not in self.speaker_to_idx:
        #   return None

        # 4. Get speaker index from self.speaker_to_idx dictionary
        # speaker_idx = self.speaker_to_idx[speaker_id]
        speaker_idx = self.speaker_to_idx.get(speaker_id, 0)

        # 5. Tokenize text and truncate to max_text_len
        text_token = self.text_tokenizer.encode(text)
        if len(text_token) > self.max_text_len:
          text_token = text_token[:self.max_text_len]
        text_token = torch.tensor(text_token, dtype=torch.long)

        # 6. Convert speech_tokens to tensor and truncate to max_speech_len
        speech_token = torch.tensor(speech_tokens, dtype=torch.long)
        if len(speech_token) > self.max_speech_len:
          speech_token = speech_token[:self.max_speech_len]
        # 7. Return dict with required keys
        return {
          "utt": utt_id,
          "text": text,
          "text_token": text_token,
          "speech_token": speech_token,
          "speaker_idx": speaker_idx,
          "speaker_id": speaker_id
        }


        # pass  # TODO: Replace with your implementation


In [10]:
def cosyvoice_collate_fn(batch):
    """
    Collate function for batching variable-length sequences.
    Adapted from CosyVoice's collate strategy.

    Key features:
    - Filters invalid samples (None)
    - Sorts by speech length (descending) to minimize padding
    - Pads sequences efficiently
    """
    # TODO: Implement the collate function
    #
    # Tasks:
    # 1. Filter out None samples
    batch = [b for b in batch if b is not None]
    if len(batch) == 0:
      return None
    # 2. Sort samples by speech_token length (descending)
    batch.sort(key=lambda x: len(x["speech_token"]), reverse=True)

    # 3. Extract and reorder: utts, text, speaker_indices
    utts = [b["utt"] for b in batch]
    text = [b["text"] for b in batch]
    speaker_indices = torch.tensor([b["speaker_idx"] for b in batch], dtype=torch.long)

    # 4. Pad text_tokens and speech_tokens to same length within batch
    text_tokens = [b["text_token"] for b in batch]
    speech_tokens = [b["speech_token"] for b in batch]

    text_lengths = torch.tensor([len(t) for t in text_tokens], dtype=torch.long)
    speech_lengths = torch.tensor([len(s) for s in speech_tokens], dtype=torch.long)

    padded_text_tokens = nn.utils.rnn.pad_sequence(text_tokens, batch_first=True, padding_value=0)
    padded_speech_tokens = nn.utils.rnn.pad_sequence(speech_tokens, batch_first=True, padding_value=0) # Config.IGNORE_ID
                                                    #  padding_value=0

    # 5. Create length tensors for actual sequence lengths
    #
    # Use padding_value=0 for token sequences
    # Return dict with keys: utts, text, text_tokens, text_lengths,
    #                        speech_tokens, speech_lengths, speaker_indices


    return {
      "utts": utts,
      "text": text,
      "text_tokens": padded_text_tokens,
      "text_lengths": text_lengths,
      "speech_tokens": padded_speech_tokens,
      "speech_lengths": speech_lengths,
      "speaker_indices": speaker_indices
    }

    # pass  # TODO: Replace with your implementation

In [11]:
# Load pre-tokenized LibriTTS data
print("Loading LibriTTS dataset from cache...")
train_cache = torch.load(f'{Config.DATA_CACHE_DIR}/{Config.TRAIN_CACHE}')
test_cache = torch.load(f'{Config.DATA_CACHE_DIR}/{Config.TEST_CACHE}')

train_data = train_cache['samples']
val_data = test_cache['samples']
metadata = train_cache['metadata']
speaker_to_idx = metadata['speaker_to_idx']
speech_vocab_size = metadata['speech_vocab_size']

print(f"Dataset loaded:")
print(f"Train samples: {len(train_data)}")
print(f"Test samples: {len(val_data)}")
print(f"Speech vocab size: {speech_vocab_size}")
print(f"Number of speakers: {metadata['num_speakers']}")

# Create datasets
train_dataset = CosyVoiceDataset(train_data, text_tokenizer, speaker_to_idx)
val_dataset = CosyVoiceDataset(val_data, text_tokenizer, speaker_to_idx)

# Create dataloaders
# You may change the batch size, num_workers, etc. for faster training
# But it depends on your GPU memory
train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=4,
    collate_fn=cosyvoice_collate_fn,
    pin_memory=True,
    prefetch_factor=2,
    persistent_workers=False
)

val_loader = DataLoader(
    val_dataset,
    batch_size=8,
    shuffle=False,
    num_workers=4,
    collate_fn=cosyvoice_collate_fn,
    pin_memory=True,
    prefetch_factor=2,
    persistent_workers=False
)

print(f"DataLoaders created:")
print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")


Loading LibriTTS dataset from cache...
Dataset loaded:
Train samples: 354780
Test samples: 9957
Speech vocab size: 6561
Number of speakers: 247
  Dataset initialized with 354780 samples
  Number of speakers: 247
  Dataset initialized with 9957 samples
  Number of speakers: 247
DataLoaders created:
Train batches: 44348
Val batches: 1245


## Part 4: Model Architecture

In this part, you will implement the **TextToSpeechLM** model - the core component of the TTS system.

### Model Requirements:
1. Takes text tokens as input
2. Uses transformer layers to process the sequence
3. Generates speech tokens autoregressively
4. Follows the CosyVoice2 sequence format: `[SOS, text_tokens, TASK_ID, speech_tokens]`

**TODO:** Complete the model architecture with proper embeddings and transformer layers.

**TODO:** Implement the forward pass with attention masking and loss computation.

**TODO:** Implement the generate method for autoregressive inference.


In [13]:
class TextToSpeechLM(nn.Module):
    """Student implementation of text to speech token generation model

    Architecture:
    - Text tokens → Text embeddings → Transformer → Speech tokens
    - Special tokens: SOS_EOS (id=0), TASK_ID (id=1)
    - Sequence format: [SOS, text_tokens, TASK_ID, speech_tokens]
    """

    def __init__(self,
                 text_vocab_size: int,
                 speech_vocab_size: int,
                 d_model: int = 768,
                 n_heads: int = 12,
                 n_layers: int = 12,
                 max_seq_len: int = 2048):
        super().__init__()

        self.d_model = d_model
        self.speech_vocab_size = speech_vocab_size

        # Special token IDs
        self.sos_eos_id = Config.SOS_EOS_ID
        self.task_id = Config.TASK_ID

        # TODO: Define the model architecture
        #
        # Components needed:
        # 1. Text embedding layer (vocab_size → d_model)
        # 2. Speech embedding layer (vocab_size+1 → d_model, +1 for EOS)
        # 3. Special token embeddings (2 tokens)
        # 4. Positional encoding (learnable parameters)
        # 5. Transformer encoder stack
        # 6. Output projection (d_model → speech_vocab_size+1)

        # pass  # TODO: Replace with your implementation

        # 1. Embeddings
        self.text_emb = nn.Embedding(text_vocab_size, d_model)
        self.speech_emb = nn.Embedding(speech_vocab_size + 1, d_model)  # +1 for EOS
        self.special_emb = nn.Embedding(2, d_model)  # SOS_EOS + TASK_ID

        # 2. Positional encoding (learnable)
        self.pos_emb = nn.Embedding(max_seq_len, d_model)

        # 3. Transformer
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)

        # 4. Output projection
        self.proj = nn.Linear(d_model, speech_vocab_size + 1)

        # Loss function (provided)
        self.criterion = nn.CrossEntropyLoss(ignore_index=Config.IGNORE_ID)

    def prepare_sequence(self, text_tokens, text_lengths, speech_tokens=None, speech_lengths=None):
        """Prepare input sequence in CosyVoice2 format

        Args:
            text_tokens: Text token ids [B, T_text]
            text_lengths: Actual lengths of text [B]
            speech_tokens: Speech token ids [B, T_speech] (training only)
            speech_lengths: Actual lengths of speech [B] (training only)

        Returns:
            lm_input: Model input embeddings
            lm_target: Target token ids for loss computation
            padding_mask: Boolean mask for padded positions
        """
        # TODO: Implement sequence preparation
        #
        # Tasks:
        # 1. Get embeddings for text, speech (if training), and special tokens
        # 2. Build input sequence: [SOS, text_emb, TASK, speech_emb]
        # 3. Build target sequence for teacher forcing
        # 4. Handle variable lengths using unpad/pad operations
        # 5. Create padding mask (True where padded)
        #
        # Note: Target should be shifted for next-token prediction
        # Note: Use Config.IGNORE_ID for positions to ignore in loss

        # pass  # TODO: Replace with your implementation
        B, T_text = text_tokens.shape
        device = text_tokens.device

        # Special tokens
        sos = torch.full((B, 1), self.sos_eos_id, dtype=torch.long, device=device)
        task = torch.full((B, 1), self.task_id, dtype=torch.long, device=device)

        # Embed text tokens
        text_emb = self.text_emb(text_tokens)

        # If speech_tokens provided (training)
        if speech_tokens is not None:
            speech_emb = self.speech_emb(speech_tokens)
            lm_input_emb = torch.cat([self.special_emb(sos), text_emb, self.special_emb(task), speech_emb], dim=1)

            # Targets: shift speech tokens to right and add SOS at beginning
            lm_target = torch.cat([speech_tokens, torch.full((B, 1), self.speech_vocab_size, device=device, dtype=torch.long)], dim=1)
            # Padding mask: True where padding (0 in text or speech)
            padding_mask = torch.zeros(lm_input_emb.shape[:2], dtype=torch.bool, device=device)
            return lm_input_emb, lm_target, padding_mask

        else:
            # Inference: only SOS + text + TASK
            lm_input_emb = torch.cat([self.special_emb(sos), text_emb, self.special_emb(task)], dim=1)
            padding_mask = torch.zeros(lm_input_emb.shape[:2], dtype=torch.bool, device=device)
            return lm_input_emb, None, padding_mask

    def forward(self, text_tokens, text_lengths, speech_tokens, speech_lengths):
        """Forward pass for training

        Args:
            text_tokens: [B, T_text] padded text tokens
            text_lengths: [B] actual lengths
            speech_tokens: [B, T_speech] padded speech tokens
            speech_lengths: [B] actual lengths
        """
        # TODO: Implement forward pass
        #
        # Steps:
        # 1. Prepare sequences using prepare_sequence
        # 2. Add positional encoding to embeddings
        # 3. Create causal mask for autoregressive modeling
        # 4. Pass through transformer with both masks
        # 5. Project to output vocabulary
        # 6. Compute loss using targets from prepare_sequence
        # 7. Compute accuracy: (correct predictions) / (non-ignored positions)
        #
        # Return: loss (scalar), accuracy (scalar)

        # pass  # TODO: Replace with your implementation
        lm_input_emb, lm_target, padding_mask = self.prepare_sequence(text_tokens, text_lengths, speech_tokens, speech_lengths)

        # Positional encoding
        B, T, _ = lm_input_emb.shape
        pos_ids = torch.arange(T, device=text_tokens.device).unsqueeze(0).expand(B, T)
        lm_input_emb = lm_input_emb + self.pos_emb(pos_ids)

        # Causal mask for autoregressive generation
        causal_mask = torch.triu(torch.ones(T, T, device=text_tokens.device), diagonal=1).bool()

        # Transformer
        out = self.transformer(lm_input_emb, mask=causal_mask)

        # Project to vocab
        logits = self.proj(out)

        # Compute loss
        if lm_target is not None:
            logits_flat = logits[:, -lm_target.shape[1]:, :].reshape(-1, self.speech_vocab_size + 1)
            target_flat = lm_target.reshape(-1)
            loss = self.criterion(logits_flat, target_flat)

            # Accuracy
            preds = logits_flat.argmax(dim=-1)
            mask = target_flat != Config.IGNORE_ID
            acc = (preds[mask] == target_flat[mask]).float().mean()
            return loss, acc
        else:
            return None, None

    @torch.no_grad()
    def generate(self, text_tokens, max_length=500, temperature=1.0, top_k=50,
                 prompt_speech_token=None, prompt_text_tokens=None, min_length=None):
        """Generate speech tokens autoregressively

        Args:
            text_tokens: Input text [1, T]
            max_length: Maximum generation length
            temperature: Sampling temperature
            top_k: Top-k sampling
            prompt_speech_token: Optional voice prompt
            prompt_text_tokens: Optional text for voice prompt
        """
        # TODO: Implement autoregressive generation
        #
        # Steps:
        # 1. Build initial sequence with special tokens
        # 2. Add voice prompt if provided (for voice cloning)
        # 3. Generation loop:
        #    - Add positional encoding
        #    - Create causal mask
        #    - Forward through transformer
        #    - Get logits for last position
        #    - Apply temperature and top-k sampling
        #    - Stop at EOS token (speech_vocab_size)
        # 4. Return generated token ids
        #
        # Note: Prevent EOS before min_length tokens

        # pass  # TODO: Replace with your implementation
        device = text_tokens.device
        B = text_tokens.shape[0]

        # Build initial sequence: SOS + text + TASK
        sos = torch.full((B, 1), self.sos_eos_id, dtype=torch.long, device=device)
        task = torch.full((B, 1), self.task_id, dtype=torch.long, device=device)
        text_emb = self.text_emb(text_tokens)
        lm_input_emb = torch.cat([self.special_emb(sos), text_emb, self.special_emb(task)], dim=1)

        generated = []

        for _ in range(max_length):
            T = lm_input_emb.shape[1]
            pos_ids = torch.arange(T, device=device).unsqueeze(0).expand(B, T)
            inp = lm_input_emb + self.pos_emb(pos_ids)

            causal_mask = torch.triu(torch.ones(T, T, device=device), diagonal=1).bool()
            out = self.transformer(inp, mask=causal_mask)
            logits = self.proj(out[:, -1, :]) / temperature

            if top_k is not None:
                topk_vals, topk_idx = torch.topk(logits, top_k)
                probs = torch.zeros_like(logits)
                probs.scatter_(1, topk_idx, F.softmax(topk_vals, dim=-1))
            else:
                probs = F.softmax(logits, dim=-1)

            next_token = torch.multinomial(probs, 1)
            generated.append(next_token)

            # Stop if all EOS and min_length satisfied
            if min_length is not None and len(generated) < min_length:
                stop = False
            else:
                stop = (next_token == self.speech_vocab_size).all()
            if stop:
                break

            # Append next token embedding
            lm_input_emb = torch.cat([lm_input_emb, self.speech_emb(next_token)], dim=1)

        generated = torch.cat(generated, dim=1)
        return generated


In [14]:
# Initialize model
print("Initializing TextToSpeechLM...")

model = TextToSpeechLM(
    text_vocab_size=text_tokenizer.vocab_size,
    speech_vocab_size=speech_vocab_size
).to(device)

num_params = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {num_params:,} ({num_params/1e6:.1f}M)")
print(f"Trainable: {trainable:,} ({trainable/1e6:.1f}M)")


Initializing TextToSpeechLM...
Total parameters: 194,289,826 (194.3M)
Trainable: 194,289,826 (194.3M)


## Part 5: Sanity Check - Verify Data Pipeline

Before training, verify that your data processing pipeline works correctly by generating audio from ground-truth speech tokens. This section helps you debug any issues before starting the training process.


In [47]:
# print(len(val_dataset.samples))
# print(val_dataset.samples[0]['speech_tokens'])

# sample_idx = 0
# sample = val_dataset[sample_idx]

# print(sample)
# print(val_dataset.samples[1]['utt'])

# for s in val_dataset.samples[:5]:
#     print(s['utt'], s['text'], len(s.get('speech_token', [])))

In [15]:
# Audio generation helper (imported from hw4_util)
from hw4_util import generate_audio_from_tokens

# Run sanity check - generate audio from ground-truth tokens
print("Running sanity check...")

# Load flow model and vocoder for sanity check
print("Loading flow model and vocoder for sanity check...")
flow_model = load_flow_model(model_dir, device)
vocoder = load_vocoder(model_dir, device)

# Get a sample from the dataset
sample_idx = 0
sample = val_dataset[sample_idx]

if sample is not None:
    print(f"\n Sample text: '{sample['text']}'")
    print(f"  Text tokens: {sample['text_token'].shape}")
    print(f"  Speech tokens: {sample['speech_token'].shape}")

    # Generate audio from ground-truth speech tokens
    audio, mel = generate_audio_from_tokens(
        sample['speech_token'],
        flow_model,
        vocoder,
        device
    )

    sample_rate = 24000
    duration = audio.shape[0] / sample_rate

    print(f"\n Generated audio from ground-truth tokens:")
    print(f"  Mel shape: {mel.shape}")
    print(f"  Audio shape: {audio.shape}")
    print(f"  Duration: {duration:.2f}s @ {sample_rate}Hz")

    # Save for listening
    output_path = f'{Config.RESULTS_DIR}/sanity_check.wav'
    torchaudio.save(output_path, audio.cpu().unsqueeze(0), sample_rate)
    print(f"  Saved to: {output_path}")

    # Display audio in notebook
    display(Audio(audio.cpu().numpy(), rate=sample_rate))

    print("\n Sanity check passed! Data pipeline is working correctly.")
else:
    print("Sample is None - check dataset configuration")

# Clean up to save memory
del flow_model
del vocoder
torch.cuda.empty_cache()


Running sanity check...
Loading flow model and vocoder for sanity check...


  deprecate("LoRACompatibleLinear", "1.0.0", deprecation_message)


  - Checkpoint: flow.pt
  - Device: cuda
  - Checkpoint: hift.pt
  - Device: cuda

 Sample text: 'He defended Raglan Castle to extremity; and opened not its gates till the middle of August.'
  Text tokens: torch.Size([20])
  Speech tokens: torch.Size([176])

 Generated audio from ground-truth tokens:
  Mel shape: torch.Size([1, 80, 352])
  Audio shape: torch.Size([168960])
  Duration: 7.04s @ 24000Hz
  Saved to: ./results/sanity_check.wav


  s = torchaudio.io.StreamWriter(uri, format=muxer, buffer_size=buffer_size)



 Sanity check passed! Data pipeline is working correctly.


## Part 6: Training Loop

In this section, you will train the TextToSpeechLM model.

### Compute Requirements and Training Time

**Hardware Requirements:**
- **GPU Required**: This assignment requires a GPU with at least 16GB VRAM (e.g., NVIDIA T4, V100, A100, or similar)
- Training will not work on CPU-only machines due to memory and speed constraints

**Expected Training Time:**
- With default batch size (4), training typically takes **8-12 hours** on a T4 GPU for a full training run (3 epochs)
- Training time depends on:
  - GPU model
  - Batch size (larger batches = faster training but more memory)
  - Number of epochs
  - Data loading efficiency (num_workers)
- **Note**: If you have better GPUs like V100 or A100 with more VRAM, feel free to increase the batch size for faster training

**Checkpointing:**
- **It is strongly recommended to implement checkpointing** to save your model periodically during training
- This allows you to:
  - Resume training if interrupted (e.g., Colab disconnects, GPU timeout)
  - Save the best model based on validation loss
  - Avoid losing progress if training crashes
- Use the `save_checkpoint` helper function from `hw4_util.py` to save checkpoints after each epoch (or every N epochs)
- The helper function saves model state, optimizer state, scheduler state, epoch number, and losses for full resumability
- Consider saving checkpoints to Google Drive if using Colab to persist across sessions

### Training Components:
1. Setting up the optimizer and learning rate scheduler
2. Training for multiple epochs
3. Validating after each epoch
4. Saving the best model

**TODO:** Complete the training loop with proper loss computation and backpropagation.

**TODO:** Implement validation loop with metric tracking.

**TODO:** Add checkpointing to save the best model.


In [16]:
from tqdm import tqdm, trange
from torch.nn.utils import clip_grad_norm_

In [17]:
def train_epoch(model, dataloader, optimizer, scheduler, device):
    """Train for one epoch"""
    # TODO: Implement training loop
    #
    # For each batch:
    # 1. Move data to device (GPU)
    # 2. Forward pass to get loss and accuracy
    # 3. Backward pass (zero_grad → backward → clip_grad → step)
    # 4. Update learning rate scheduler
    # 5. Track metrics (accumulate loss and accuracy)
    # 6. Update progress bar with current metrics
    #
    # You may use tqdm for progress bar and skip None batches
    # Return: average_loss, average_accuracy

    # pass  # TODO: Replace with your implementation
    model.train()
    total_loss = 0
    total_acc = 0.0
    num_batches = 0

    for batch in tqdm(dataloader, desc="Training"):
      # print(len(batch))
      if batch is None:
          continue

      # Move inputs to device
      text_tokens = batch['text_tokens'].to(device)
      text_lengths = batch['text_lengths'].to(device)
      speech_tokens = batch['speech_tokens'].to(device)
      speech_lengths = batch['speech_lengths'].to(device)
      speaker_indices = batch.get('speaker_indices', None)
      if speaker_indices is not None:
          speaker_indices = speaker_indices.to(device)

      # Forward pass
      optimizer.zero_grad()
      loss, acc = model(text_tokens, text_lengths, speech_tokens, speech_lengths)

      # Backward pass
      loss.backward()
      clip_grad_norm_(model.parameters(), max_norm=1.0)
      optimizer.step()
      if scheduler is not None:
          scheduler.step()

      total_loss += loss.item()
      total_acc += acc.item()
      num_batches += 1

      # tqdm.write(f"Batch loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

    avg_loss = total_loss / max(num_batches, 1)
    avg_acc = total_acc / max(num_batches, 1)
    # tqdm.write(f"Ave loss: {avg_loss:.4f}, Ave Acc: {avg_acc:.4f}")
    return avg_loss, avg_acc


def validate(model, dataloader, device):
    """Validate the model"""
    # TODO: Implement validation loop
    #
    # Similar to training but:
    # - Use model.eval() and torch.no_grad()
    # - No gradient computation or weight updates
    # - Only track loss and accuracy
    #
    # Return: average_loss, average_accuracy

    # pass  # TODO: Replace with your implementation
    model.eval()
    total_loss = 0
    total_acc = 0.0
    num_batches = 0

    with torch.no_grad():
      for batch in tqdm(dataloader, desc="Validation"):
        if batch is None:
          continue

        # Move inputs to device
        text_tokens = batch['text_tokens'].to(device)
        text_lengths = batch['text_lengths'].to(device)
        speech_tokens = batch['speech_tokens'].to(device)
        speech_lengths = batch['speech_lengths'].to(device)
        speaker_indices = batch.get('speaker_indices', None)
        if speaker_indices is not None:
            speaker_indices = speaker_indices.to(device)

        # Forward pass
        loss, acc = model(text_tokens, text_lengths, speech_tokens, speech_lengths)

        total_loss += loss.item()
        total_acc += acc.item()
        num_batches += 1

    avg_loss = total_loss / max(num_batches, 1)
    avg_acc = total_acc / max(num_batches, 1)
    return avg_loss, avg_acc

print("Training functions defined")


Training functions defined


In [18]:
from tqdm import tqdm, trange
from torch.nn.utils import clip_grad_norm_

In [26]:
from hw4_util import get_warmup_cosine_scheduler, save_checkpoint

# TODO: Setup training configuration
#
# Tasks:
# 1. Create AdamW optimizer (lr around 2e-4, weight_decay around 0.01)
# 2. Calculate warmup steps (e.g., 10% of total)
# 3. Create scheduler using get_warmup_cosine_scheduler
#
# The scheduler warms up learning rate then decays with cosine

optimizer = torch.optim.Adam(model.parameters(), lr=2e-4, weight_decay=0.01)

num_epochs = 3  # change as needed
train_steps_per_epoch = len(train_loader)
total_steps = num_epochs * train_steps_per_epoch
warmup_steps = int(0.1 * total_steps)  # 10% warmup

scheduler = get_warmup_cosine_scheduler(
    optimizer,
    warmup_steps=warmup_steps,
    total_steps=total_steps,
)

# pass  # TODO: Replace with your implementation

# Training info
steps_per_epoch = len(train_loader)
print(f"Steps per epoch: {steps_per_epoch}")

# Training loop
best_val_loss = float('inf')
train_losses = []
val_losses = []

print("\n" + "=" * 60)
print("Starting training...")
print("=" * 60 + "\n")

# Train for a few epochs (increase for better results)
# num_epochs = 3

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    print("-" * 60)

    # Train
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, scheduler, device)
    train_losses.append(train_loss)

    # Validate
    val_loss, val_acc = validate(model, val_loader, device)
    val_losses.append(val_loss)

    # Get current learning rate
    current_lr = optimizer.param_groups[0]['lr']
    print(f"Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, LR: {current_lr:.2e}")

    # TODO: Save checkpoint when validation improves
    #
    # If val_loss < best_val_loss:
    # - Update best_val_loss
    # - Save checkpoint using save_checkpoint function
    # - Print confirmation message

    # pass  # TODO: Replace with checkpoint logic
    if val_loss < best_val_loss:
      best_val_loss = val_loss
      torch.save({'state_dict':model.state_dict()}, "part_c_best_model.pt")
      print(f'Best val loss: {val_loss}. Saved curr best model!')

print("\nTraining completed!")


Steps per epoch: 44348

Starting training...


Epoch 1/3
------------------------------------------------------------


Training:   0%|          | 0/44348 [00:00<?, ?it/s]

padded_text_tokens len = 8padded_text_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8padded_text_tokens len = 8padded_speech_tokens len = 8


padded_speech_tokens len = 8

padded_speech_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8
padded_text_tokens len = 8
padded_speech_tokens len = 8
[prepare_sequence] text_tokens min=0, max=91747, vocab_size=151643
[prepare_sequence] speech_tokens min=-1, max=6545, speech_vocab_size=6561


Training:   0%|          | 0/44348 [00:01<?, ?it/s]


AcceleratorError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


## Part 7: Inference and Evaluation

In this section, you will test your trained model by synthesizing speech.

### Tasks:
1. Load the best checkpoint
2. Generate speech from text
3. Support voice cloning with prompt audio

**TODO:** Load your trained checkpoint.

**TODO:** Generate audio samples and evaluate quality.


In [27]:
# torch.cuda.empty_cache()
import gc
gc.collect()

45854

In [31]:
# Inference utilities
from hw4_util import load_trained_model, synthesize

# Load pretrained components for inference
print("Loading pretrained components...")
flow_model = load_flow_model(model_dir, device)
vocoder = load_vocoder(model_dir, device)

# TODO: Load your trained model
#
# Check if checkpoint exists at Config.RESULTS_DIR/best.pt
# If yes: load using load_trained_model in hw4_util.py (needs path, vocab sizes, LM, device)
# If no: use current model state

# trained_model = model  # TODO: Replace with checkpoint loading

ckpt_path = os.path.join(Config.RESULTS_DIR, "part_c_best_model.pt")
text_vocab_size = text_tokenizer.vocab_size

assert os.path.exists(ckpt_path)

print(f"Checkpoint found at {ckpt_path}, loading...")
# trained_model = load_trained_model(ckpt_path, text_vocab_size, speech_vocab_size, TextToSpeechLM, device='cuda')
trained_model = TextToSpeechLM(
    text_vocab_size=text_vocab_size,
    speech_vocab_size=speech_vocab_size
).to(device)

# Load checkpoint
checkpoint = torch.load(ckpt_path, map_location=device)
trained_model.load_state_dict(checkpoint['state_dict'])
trained_model.eval()
print("loaded model")

# print(f"Loaded model from checkpoint (epoch {checkpoint['epoch']})")
# print(f"  Train loss: {checkpoint['train_loss']:.4f}")
# print(f"  Val loss: {checkpoint['val_loss']:.4f}")


Loading pretrained components...


  deprecate("LoRACompatibleLinear", "1.0.0", deprecation_message)


AcceleratorError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [28]:
# Test texts
test_texts = [
    "Hello, this is a test of the trained model.",
    "Text to speech synthesis with language models.",
    "CosyVoice two is a powerful speech synthesis system."
]

print("\nTesting TTS synthesis...")

# TODO: Generate audio for test texts
#
# For each text:
# - Use synthesize function to generate audio
# - Save to Config.RESULTS_DIR/synthesized_{i}.wav
# - Display audio using IPython.display.Audio
#
# synthesize returns (audio_tensor, sample_rate)

for idx, text in enumerate(test_texts):
  output_path = f"{Config.RESULTS_DIR}/synthesized_{idx}.wav"
  audio, sample_rate = synthesize(text, model, text_tokenizer, speech_tokenizer, flow_model, vocoder, output_path=output_path)
  display(Audio(audio.cpu().numpy(), rate=sample_rate))

# pass  # TODO: Replace with synthesis code

print("\nInference completed!")


Testing TTS synthesis...

Synthesizing: 'Hello, this is a test of the trained model.'


AcceleratorError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


## Part 8: Voice Cloning and Final Evaluation

This is the final evaluation section where you will record your voice directly in the notebook and use it for voice cloning. You will generate 200 utterances with your cloned voice and evaluate the quality using Whisper ASR.

**TODO:** Record your voice prompt.

**TODO:** Run the voice cloning evaluation.

**TODO:** Submit results to Gradescope (included in evaluation above).


### Step 1: Record Your Voice

In [30]:
from hw4_util import record_or_load_voice

# Record or load existing voice
prompt_audio, prompt_text, recorded_file_path = record_or_load_voice(
    prompt_text="The quick brown fox jumps over the lazy dog.",
    results_dir=Config.RESULTS_DIR
)



EXISTING RECORDING FOUND
Loading from: ./results/voice_prompt_16k.wav
Duration: 3.84s
Sample rate: 16000Hz

Playback of existing recording:


  s = torchaudio.io.StreamReader(src, format, None, buffer_size)



To record a new voice:
   1. Delete the file: ./results/voice_prompt_16k.wav
   2. Re-run this cell
   Or call: record_or_load_voice(..., force_new=True)


### Step 2: Test Voice Cloning with One Utterance

In [32]:
print("=" * 60)
print("TESTING VOICE CLONING")
print("=" * 60)

# Test with a single utterance first
# You may play around with the test text to see how the model performs
test_text = "This is my cloned voice speaking."
print(f"\n Test text: '{test_text}'")
print("Generating with your voice...")

# Generate with voice cloning
audio_test, sr = synthesize(
    text=test_text,
    model=trained_model,
    text_tokenizer=text_tokenizer,
    speech_tokenizer=speech_tokenizer,
    flow_model=flow_model,
    vocoder=vocoder,
    device=device,
    prompt_audio=prompt_audio,
    prompt_text=prompt_text,
    output_path=f'{Config.RESULTS_DIR}/voice_clone_test.wav'
)

print("\n Generated audio with your cloned voice:")
display(Audio(audio_test.cpu().numpy(), rate=sr))

print("\n Voice cloning test completed!")
print("If this sounds like your voice, proceed to the next cell.")
print("If not, try recording again with clearer pronunciation.")


TESTING VOICE CLONING

 Test text: 'This is my cloned voice speaking.'
Generating with your voice...


NameError: name 'trained_model' is not defined

### Step 3: Large-Scale Evaluation with ASR

**Automated evaluation system:**
- All students evaluate the same 200 texts
- Text order shuffled by Student ID (prevents cheating)
- Submit to Gradescope for WER scoring

**Process:**
1. Enter your Student ID → unique shuffle
2. Generate 200 utterances (~10-15 min)
3. Transcribe with Whisper (~5-10 min)
4. Submit `submission_[ID].txt` to Gradescope


In [None]:
import torch

student_id = 3034016860
# Load pre-generated fixed test set
# (All students use the same 200 texts)
fixed_test_path = f'{Config.DATA_CACHE_DIR}/fixed_test_set.pt'
test_data = torch.load(fixed_test_path)

FIXED_TEST_TEXTS = test_data['texts']
FIXED_TEST_INDICES = test_data['indices']

print(f"Loaded {len(FIXED_TEST_TEXTS)} test texts from {fixed_test_path}")
print(f"Seed: {test_data['seed']}")
print(f"Distribution: {test_data['n_short']} short + {test_data['n_medium']} medium + {test_data['n_long']} long")


### Step 4: Voice Cloning Evaluation (200 Utterances)

**Three-step process:**

1. **Load models:** Text tokenizer, speech tokenizer, flow model, vocoder
2. **Enter Student ID:** Generates unique text shuffling seed
3. **Run evaluation:**
   - Generate 200 utterances with your voice (~10-15 min)
   - Transcribe with Whisper ASR (~5-10 min)
   - Create Gradescope submission file

**Note:** WER score visible only on Gradescope after submission.


In [None]:
# Voice Cloning Evaluation
# Implementation details are in hw4_util.py

from hw4_util import load_pretrained_models_for_inference, run_voice_cloning_evaluation

# Step 1: Load pretrained models
print("Step 1: Loading pretrained models...\n")
models = load_pretrained_models_for_inference(
    pretrained_dir=Config.PRETRAINED_DIR,
    device=device
)

# Extract models
text_tokenizer = models['text_tokenizer']
speech_tokenizer = models['speech_tokenizer']
flow_model = models['flow_model']
vocoder = models['vocoder']

# Step 2: Get student ID
print("\nStep 2: Student identification\n")
STUDENT_ID = input("Enter your Student ID: ").strip()

print(f" Student ID: {STUDENT_ID}")

# Step 3: Run evaluation
print("\nStep 3: Running voice cloning evaluation...\n")
submission_path = run_voice_cloning_evaluation(
    student_id=STUDENT_ID,
    trained_model=trained_model,
    text_tokenizer=text_tokenizer,
    speech_tokenizer=speech_tokenizer,
    flow_model=flow_model,
    vocoder=vocoder,
    prompt_audio=prompt_audio,
    prompt_text=prompt_text,
    fixed_test_set_path=f'{Config.DATA_CACHE_DIR}/fixed_test_set.pt',
    results_dir=Config.RESULTS_DIR,
    pretrained_dir=Config.PRETRAINED_DIR,
    device=device
)

print(f"\n Done! Upload {submission_path} to Gradescope")


## Part 9: Final Submission Checklist

Before submitting to Gradescope, ensure you have completed the following:

### Required Files:

1. **`submission_[YOUR_ID].txt`** - Auto-generated evaluation file from Part 8
   - Generated automatically when you complete Part 8
   - Contains your Student ID, seed, and 200 transcriptions
   - Do NOT modify this file

2. **`hw4-c.pdf`** - PDF export of this notebook

### Submission Instructions:
1. Complete Part 8 to generate `submission_[YOUR_ID].txt`
2. Export this notebook to PDF
3. Upload both files to Gradescope

### Notes:
* Your WER score will be calculated automatically upon submission to GradScope
* The points distribution based on WER on the test set:

   - < 70% : 15 points

   - < 50% : 20 points

   - < 45% : 25 points

   - < 40% : 30 points

   - < 35% : 35 points

   - < 30% : 40 points (Full points)