In [None]:
!apt install ffmpeg

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 29 not upgraded.


In [19]:
!tar -xzf /content/drive/MyDrive/dataset/cv-corpus-20.0-delta-2024-12-06-en.tar.gz -C /content/

In [None]:
!pip install torchaudio librosa matplotlib
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import os
import librosa
import torchaudio
from IPython.display import Audio
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [55]:
import pandas as pd
from pathlib import Path

dataset_path = "/content/cv-corpus-20.0-delta-2024-12-06/"  # * will match version number
clips_dir = os.path.join(dataset_path, "en/clips")
tsv_path = os.path.join(dataset_path, "en/validated.tsv")

# Load metadata
df = pd.read_csv(tsv_path, sep='\t')
print(f"Total samples: {len(df)}")

# Create file mapping (path, text)
file_map = []
for _, row in df.iterrows():
    mp3_path = os.path.join(clips_dir, row['path'])
    if os.path.exists(mp3_path):
        file_map.append((mp3_path, row['sentence']))
    else:
        print(f"Missing file: {mp3_path}")

print(f"\nValid samples: {len(file_map)}")

# Take first 100 samples for quick testing (remove this for full dataset)
file_map = file_map[:100]

# Create text file for each audio (compatible with original code)
!mkdir -p processed_dataset
for idx, (mp3_path, text) in enumerate(file_map):
    # Convert to WAV
    wav_path = f"/content/processed_dataset/sample_{idx}.wav"
    !ffmpeg -i "{mp3_path}" -ar 16000 "{wav_path}" -y >/dev/null 2>&1

    # Create text file
    with open(f"/content/processed_dataset/sample_{idx}.txt", "w") as f:
        f.write(text)

print("\nCreated processed dataset with WAV files and text labels")

Total samples: 250

Valid samples: 250

Created processed dataset with WAV files and text labels


In [56]:
class CharTokenizer:
    def __init__(self):
        # Create a simple character-level tokenizer
        self.chars = "abcdefghijklmnopqrstuvwxyz0123456789 .,!?-'"
        self.char_to_idx = {c: i for i, c in enumerate(self.chars)}
        self.idx_to_char = {i: c for i, c in enumerate(self.chars)}
        self.vocab_size = len(self.chars)

    def encode(self, text):
        # Convert text to sequence of indices
        text = text.lower()
        return torch.tensor([self.char_to_idx.get(c, 0) for c in text])

    def decode(self, indices):
        # Convert indices back to text
        return ''.join(self.idx_to_char.get(i.item(), '') for i in indices)

tokenizer = CharTokenizer()
print(f"Vocabulary size: {tokenizer.vocab_size}")

Vocabulary size: 43


In [57]:
class TTSDataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, tokenizer):
        self.data_dir = data_dir
        self.files = [f for f in os.listdir(data_dir) if f.endswith('.wav')]
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        wav_path = os.path.join(self.data_dir, self.files[idx])
        txt_path = os.path.join(self.data_dir, self.files[idx].replace('.wav', '.txt'))

        # Load audio
        waveform, sr = torchaudio.load(wav_path)
        waveform = waveform.mean(dim=0)  # Convert to mono

        # Convert to mel-spectrogram
        mel_specgram = torchaudio.transforms.MelSpectrogram(
            sample_rate=16000, n_mels=80)(waveform)

        # Load and tokenize text
        with open(txt_path) as f:
            text = f.read().strip().lower()
            tokens = self.tokenizer.encode(text)

        return tokens, mel_specgram

def collate_fn(batch):
    # Separate texts and mels
    texts, mels = zip(*batch)

    # Get max dimensions
    max_text_len = max(len(t) for t in texts)
    max_mel_len = max(m.shape[1] for m in mels)
    n_mels = mels[0].shape[0]

    # Pad text sequences
    padded_texts = torch.zeros(len(texts), max_text_len, dtype=torch.long)
    for i, text in enumerate(texts):
        padded_texts[i, :len(text)] = text

    # Pad mel spectrograms
    padded_mels = torch.zeros(len(mels), n_mels, max_mel_len)
    for i, mel in enumerate(mels):
        padded_mels[i, :, :mel.shape[1]] = mel

    return padded_texts, padded_mels

# Create dataset and dataloader
dataset = TTSDataset('processed_dataset', tokenizer)
dataloader = torch.utils.data.DataLoader(
    dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=collate_fn
)

In [58]:
class ImprovedTTS(nn.Module):
    def __init__(self, vocab_size, hidden_size=256):
        super().__init__()
        # Text embedding layer
        self.embedding = nn.Embedding(vocab_size, hidden_size)

        # Encoder LSTM
        self.encoder = nn.LSTM(hidden_size, hidden_size//2,
                              bidirectional=True, batch_first=True)

        # Decoder LSTM
        self.decoder = nn.LSTM(hidden_size, hidden_size//2, batch_first=True)

        # Final projection layer
        self.mel_linear = nn.Linear(hidden_size//2, 80)

    def forward(self, text, target_mels=None):
        # Text processing
        embedded = self.embedding(text)  # [B, T_text, hidden]

        # Encode text
        enc_out, _ = self.encoder(embedded)  # [B, T_text, hidden]

        # If in inference mode, set a default target length
        if target_mels is None:
            target_len = 500  # Default length for generated spectrograms
        else:
            target_len = target_mels.size(2)

        # Dynamic upsampling using interpolation
        batch_size = enc_out.size(0)
        enc_out = enc_out.transpose(1, 2)  # [B, hidden, T_text]
        upsampled = nn.functional.interpolate(
            enc_out,
            size=target_len,
            mode='linear',
            align_corners=False
        )  # [B, hidden, T_mel]
        upsampled = upsampled.transpose(1, 2)  # [B, T_mel, hidden]

        # Decode
        dec_out, _ = self.decoder(upsampled)  # [B, T_mel, hidden/2]

        # Predict mel
        mel_out = self.mel_linear(dec_out)  # [B, T_mel, 80]
        return mel_out.transpose(1, 2)  # [B, 80, T_mel]

# Initialize model, optimizer and loss
model = ImprovedTTS(tokenizer.vocab_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

In [63]:
# Cell 6 (Updated): Improved GAN Generator Architecture
class Generator(nn.Module):
    def __init__(self, input_channels=80, output_channels=1):
        super(Generator, self).__init__()

        # Initial convolution block
        model = [
            nn.ReflectionPad1d(3),
            nn.Conv1d(input_channels, 256, kernel_size=7),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        # Upsampling layers - we need to upsample by hop_length (256)
        # Using 4 layers of 4x upsampling: 4^4 = 256
        in_features = 256
        for i in range(4):  # 4 upsampling layers
            out_features = in_features // 2 if i > 0 else in_features
            model += [
                nn.Conv1d(in_features, out_features, kernel_size=3, stride=1, padding=1),
                nn.LeakyReLU(0.2, inplace=True),
                nn.Upsample(scale_factor=4)  # Each upsamples by 4x
            ]
            in_features = out_features

        # Residual blocks for refinement
        for _ in range(2):
            model += [ResidualBlock(in_features)]

        # Output layer
        model += [
            nn.ReflectionPad1d(3),
            nn.Conv1d(in_features, output_channels, kernel_size=7),
            nn.Tanh()
        ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

# Reinitialize the generator with improved architecture
generator = Generator(input_channels=80, output_channels=1)
discriminator = Discriminator(input_channels=1)

# Optimizers for GAN
optimizer_G = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

# Test the generator output dimensions
with torch.no_grad():
    test_mel = torch.randn(1, 80, 100)  # 100 mel frames
    test_output = generator(test_mel)
    print(f"Test Generator - Input shape: {test_mel.shape}")
    print(f"Test Generator - Output shape: {test_output.shape}")
    print(f"Upsampling ratio: {test_output.shape[2] / test_mel.shape[2]}")

Test Generator - Input shape: torch.Size([1, 80, 100])
Test Generator - Output shape: torch.Size([1, 1, 25600])
Upsampling ratio: 256.0


In [64]:
# Cell 7 (Updated): Improved GAN Dataset for Vocoder
class VocoderDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir
        self.files = [f for f in os.listdir(data_dir) if f.endswith('.wav')]

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        wav_path = os.path.join(self.data_dir, self.files[idx])

        # Load audio
        waveform, sr = torchaudio.load(wav_path)
        waveform = waveform.mean(dim=0)  # Convert to mono

        # Normalize waveform to [-1, 1]
        waveform = waveform / (torch.max(torch.abs(waveform)) + 1e-8)

        # Convert to mel-spectrogram
        mel_specgram = torchaudio.transforms.MelSpectrogram(
            sample_rate=16000, n_mels=80)(waveform)

        # Log mel spectrogram
        mel_specgram = torch.log(torch.clamp(mel_specgram, min=1e-5))

        # Normalize mel spectrogram
        mel_specgram = (mel_specgram - mel_specgram.mean()) / (mel_specgram.std() + 1e-8)

        # Calculate time ratio (audio samples per mel frame)
        # Each mel frame typically corresponds to ~256 audio samples
        hop_length = 256
        expected_audio_length = mel_specgram.shape[1] * hop_length

        # Pad or trim audio to match expected length
        if waveform.shape[0] < expected_audio_length:
            # Pad if too short
            padded = torch.zeros(expected_audio_length)
            padded[:waveform.shape[0]] = waveform
            waveform = padded
        else:
            # Trim if too long
            waveform = waveform[:expected_audio_length]

        return mel_specgram, waveform.unsqueeze(0)  # Add channel dimension to waveform

def vocoder_collate_fn(batch):
    # Separate mels and waveforms
    mels, waves = zip(*batch)

    # Find smallest length that works well with the model
    # (ensuring the ratio between mel frames and audio samples is preserved)
    hop_length = 256
    min_mel_len = min(m.shape[1] for m in mels)
    # Make it divisible by 8 for better upsampling in the generator
    min_mel_len = (min_mel_len // 8) * 8
    min_wave_len = min_mel_len * hop_length

    # Trim to compatible lengths
    mels = [m[:, :min_mel_len] for m in mels]
    waves = [w[:, :min_wave_len] for w in waves]

    # Stack batches
    mels = torch.stack(mels)
    waves = torch.stack(waves)

    return mels, waves

# Create vocoder dataset and dataloader
vocoder_dataset = VocoderDataset('processed_dataset')
vocoder_dataloader = DataLoader(
    vocoder_dataset,
    batch_size=4,
    shuffle=True,
    collate_fn=vocoder_collate_fn
)

print(f"Vocoder dataset created with {len(vocoder_dataset)} samples")
print(f"Checking first batch dimensions:")
test_mels, test_waves = next(iter(vocoder_dataloader))
print(f"Mel batch shape: {test_mels.shape}")
print(f"Wave batch shape: {test_waves.shape}")
print(f"Ratio: {test_waves.shape[2] / test_mels.shape[2]}")

Vocoder dataset created with 100 samples
Checking first batch dimensions:
Mel batch shape: torch.Size([4, 80, 336])
Wave batch shape: torch.Size([4, 1, 86016])
Ratio: 256.0


In [None]:
# Training loop for Text-to-Mel model
num_epochs = 1000  # Adjust as needed

for epoch in range(num_epochs):
    total_loss = 0
    num_batches = 0

    for batch_idx, (text_tensor, padded_mels) in enumerate(dataloader):
        optimizer.zero_grad()

        # Forward pass
        outputs = model(text_tensor, padded_mels)

        # Calculate loss
        loss = criterion(outputs, padded_mels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        num_batches += 1

        if batch_idx % 5 == 0:
            print(f'Text-to-Mel - Epoch: {epoch+1}, Batch: {batch_idx+1}, Loss: {loss.item():.4f}')

    avg_loss = total_loss / num_batches
    print(f'Text-to-Mel - Epoch: {epoch+1}, Average Loss: {avg_loss:.4f}')

# Save the text-to-mel model
torch.save(model.state_dict(), 'tts_model.pth')
print("Text-to-Mel model saved to tts_model.pth")

In [None]:
# Cell 9 (Updated): Improved GAN Training Loop
# Training loop for GAN vocoder
num_epochs_gan = 20  # Adjust as needed
lambda_identity = 10  # Weight for identity loss

for epoch in range(num_epochs_gan):
    total_g_loss = 0
    total_d_loss = 0
    num_batches = 0

    for batch_idx, (mels, real_waves) in enumerate(vocoder_dataloader):
        # Move to device if available
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        mels, real_waves = mels.to(device), real_waves.to(device)

        # Check shapes
        expected_wave_len = mels.shape[2] * 256  # hop_length
        if real_waves.shape[2] != expected_wave_len:
            print(f"Warning: Expected wave length {expected_wave_len}, got {real_waves.shape[2]}")
            continue

        # ----------------------
        #  Train Discriminator
        # ----------------------
        optimizer_D.zero_grad()

        # Generate audio from mel spectrograms
        fake_waves = generator(mels)

        # Ensure the fake waves match the real ones in size
        if fake_waves.shape[2] != real_waves.shape[2]:
            # If different size, interpolate to match
            fake_waves = F.interpolate(
                fake_waves,
                size=real_waves.shape[2],
                mode='linear',
                align_corners=False
            )

        # Real audio
        pred_real = discriminator(real_waves)
        target_real = torch.ones_like(pred_real)
        loss_D_real = criterion_GAN(pred_real, target_real)

        # Fake audio
        pred_fake = discriminator(fake_waves.detach())
        target_fake = torch.zeros_like(pred_fake)
        loss_D_fake = criterion_GAN(pred_fake, target_fake)

        # Total discriminator loss
        loss_D = (loss_D_real + loss_D_fake) * 0.5
        loss_D.backward()
        optimizer_D.step()

        # ------------------
        #  Train Generator
        # ------------------
        optimizer_G.zero_grad()

        # GAN loss (try to fool discriminator)
        pred_fake = discriminator(fake_waves)
        target_real = torch.ones_like(pred_fake)
        loss_G_GAN = criterion_GAN(pred_fake, target_real)

        # Identity loss (L1 between generated and real waveforms)
        loss_identity = criterion_Identity(fake_waves, real_waves) * lambda_identity

        # Total generator loss
        loss_G = loss_G_GAN + loss_identity
        loss_G.backward()
        optimizer_G.step()

        # Print statistics
        total_g_loss += loss_G.item()
        total_d_loss += loss_D.item()
        num_batches += 1

        if batch_idx % 5 == 0:
            print(f'GAN Vocoder - Epoch: {epoch+1}, Batch: {batch_idx+1}, '
                  f'G Loss: {loss_G.item():.4f}, D Loss: {loss_D.item():.4f}')
            print(f'Shapes - Mel: {mels.shape}, Real: {real_waves.shape}, Fake: {fake_waves.shape}')

    # Print epoch statistics
    if num_batches > 0:
        avg_g_loss = total_g_loss / num_batches
        avg_d_loss = total_d_loss / num_batches
        print(f'GAN Vocoder - Epoch: {epoch+1}, Avg G Loss: {avg_g_loss:.4f}, '
              f'Avg D Loss: {avg_d_loss:.4f}')

    # Save models periodically
    if (epoch + 1) % 5 == 0:
        torch.save(generator.state_dict(), f'generator_epoch_{epoch+1}.pth')
        torch.save(discriminator.state_dict(), f'discriminator_epoch_{epoch+1}.pth')

# Save final models
torch.save(generator.state_dict(), 'generator_final.pth')
torch.save(discriminator.state_dict(), 'discriminator_final.pth')
print("GAN Vocoder models saved")

In [67]:
def text_to_speech_gan(text_model, text, tokenizer, gen_model, sample_rate=16000):
    text_model.eval()  # Set text-to-mel model to evaluation mode
    gen_model.eval()  # Set generator to evaluation mode

    # Tokenize the text
    tokens = tokenizer.encode(text).unsqueeze(0)  # Add batch dimension

    # Step 1: Generate mel spectrogram from text
    with torch.no_grad():
        mel = text_model(tokens)  # No target_mels needed for inference

    # Step 2: Convert mel to audio using GAN generator
    with torch.no_grad():
        # Normalize mel for generator
        mel_norm = (mel - mel.mean()) / (mel.std() + 1e-8)
        waveform = gen_model(mel_norm)

        # Get audio as numpy array
        audio = waveform.squeeze().cpu().numpy()

        # Normalize audio to [-1, 1]
        audio = audio / (np.max(np.abs(audio)) + 1e-8)

    print(f"Input text: '{text}'")
    print(f"Generated mel spectrogram shape: {mel.shape}")
    print(f"Generated audio length: {len(audio)} samples ({len(audio)/sample_rate:.2f} seconds)")

    return audio * 0.8  # Scale down slightly to avoid clipping

# Test with sample text
text_input = "Hello world, this is a test of text to speech using a GAN vocoder."
audio_gan = text_to_speech_gan(model, text_input, tokenizer, generator)
Audio(audio_gan, rate=16000)

Input text: 'Hello world, this is a test of text to speech using a GAN vocoder.'
Generated mel spectrogram shape: torch.Size([1, 80, 500])
Generated audio length: 128000 samples (8.00 seconds)
