In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torchaudio
!pip install transformers
!pip install jiwer
!pip install audiomentations

Collecting jiwer
  Downloading jiwer-3.0.4-py3-none-any.whl.metadata (2.6 kB)
Collecting rapidfuzz<4,>=3 (from jiwer)
  Downloading rapidfuzz-3.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading jiwer-3.0.4-py3-none-any.whl (21 kB)
Downloading rapidfuzz-3.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.4/3.4 MB[0m [31m39.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.4 rapidfuzz-3.9.7
Collecting audiomentations
  Downloading audiomentations-0.37.0-py3-none-any.whl.metadata (11 kB)
Collecting numpy-minmax<1,>=0.3.0 (from audiomentations)
  Downloading numpy_minmax-0.3.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.0 kB)
Collecting numpy-rms<1,>=0.4.2 (from audiomentations)
  Downloading numpy_rms-0.4.2-cp310-cp310-manylinux

In [None]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import os
from torch.nn import functional as F
import torch.nn as nn
import torch.optim as optim
import librosa
import numpy as np
from scipy.io.wavfile import write
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift
from jiwer import wer

# Step 1: Define Data Augmentation Pipeline
augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.25, p=0.5),
    PitchShift(min_semitones=-4, max_semitones=4, p=0.5),
    Shift(min_shift=-0.5, max_shift=0.5, p=0.5)
])

# Custom Dataset for Loading Audio Files with Augmentation
class SpeechDataset(Dataset):
    def __init__(self, data_dir, transform=None, target_length=80000, num_files=100, apply_augmentation=False):
        self.data_dir = data_dir
        self.transform = transform
        self.target_length = target_length
        self.apply_augmentation = apply_augmentation
        self.audio_files = [f for f in os.listdir(data_dir) if f.endswith('.wav')][:num_files]

        if len(self.audio_files) == 0:
            raise ValueError(f"No audio files found in directory: {data_dir}")

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        wav_path = os.path.join(self.data_dir, self.audio_files[idx])
        if not os.path.exists(wav_path):
            raise FileNotFoundError(f"Audio file not found: {wav_path}")

        try:
            waveform, sr = torchaudio.load(wav_path)
        except RuntimeError as e:
            print(f"Error loading audio file: {wav_path}, Error: {e}")
            raise e

        # Pad or truncate to the target length
        if waveform.shape[1] < self.target_length:
            padding = self.target_length - waveform.shape[1]
            waveform = F.pad(waveform, (0, padding))
        else:
            waveform = waveform[:, :self.target_length]

        # Apply Augmentation
        if self.apply_augmentation:
            waveform = augment_audio(waveform)

        # Apply normalization if any
        if self.transform:
            waveform = self.transform(waveform)

        return waveform, self.audio_files[idx]

# Function to Apply Augmentation to the Audio
def augment_audio(audio):
    augmented_samples = augment(samples=audio.numpy(), sample_rate=16000)
    return torch.tensor(augmented_samples)

# Define Normalization Transform
def normalize_waveform(waveform):
    return (waveform - waveform.mean()) / waveform.std()

# Directories for Data
data_dir_A = "/content/drive/MyDrive/data/extracted_files-3/en/North_American_English_W/"

# Target length for all audio files
target_length = 80000

# Initialize Dataset with 550 Files and Apply Augmentation
dataset_A = SpeechDataset(data_dir_A, transform=normalize_waveform, target_length=target_length, num_files=100, apply_augmentation=True)

# Initialize DataLoader
dataloader_A = DataLoader(dataset_A, batch_size=1, shuffle=True)

# Define AAE Components: Encoder, Decoder, Discriminator, Transformer-based Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim=80000, d_model=512, nhead=8, num_layers=6):
        super(TransformerEncoder, self).__init__()
        self.fc_in = nn.Linear(input_dim, d_model)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead), num_layers=num_layers
        )
        self.fc_out = nn.Linear(d_model, 64)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the input to (batch_size, seq_length)
        x = self.fc_in(x)  # Linear transformation to match d_model
        x = x.unsqueeze(1)  # Add sequence length dimension for Transformer
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)  # Global average pooling over the sequence length
        return self.fc_out(x)

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 80000)
        )

    def forward(self, x):
        x = self.main(x)
        x = x.view(x.size(0), 1, -1)
        return x

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            nn.Linear(64, 32),
            nn.LeakyReLU(0.2),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.main(x)
        return x

# Initialize Models
encoder = TransformerEncoder()
decoder = Decoder()
discriminator = Discriminator()

# Initialize Loss Functions and Optimizers
criterion_reconstruction = nn.MSELoss()
criterion_adversarial = nn.BCELoss()
optimizer_enc_dec = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=0.0002)
optimizer_disc = optim.Adam(discriminator.parameters(), lr=0.0002)

# Set device to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder.to(device)
decoder.to(device)
discriminator.to(device)

# Function to Compute MCD (Placeholder)
def compute_mcd(orig_audio, conv_audio):
    return np.random.random()

# Evaluation function for Mean Mel-Cepstral Distortion (MCD)
def evaluate_metrics(original_dir, converted_dir, mapping):
    mcd_scores = []

    for orig_file, conv_file in mapping.items():
        orig_path = os.path.join(original_dir, orig_file)
        conv_path = os.path.join(converted_dir, conv_file)

        if not os.path.exists(orig_path):
            print(f"Original file not found: {orig_path}")
            continue
        if not os.path.exists(conv_path):
            print(f"Converted file not found: {conv_path}")
            continue

        try:
            orig_audio, _ = torchaudio.load(orig_path)
            conv_audio, _ = torchaudio.load(conv_path)
        except RuntimeError as e:
            print(f"Error loading audio files. Original: {orig_path}, Converted: {conv_path}, Error: {e}")
            continue

        # Compute MCD
        mcd_score = compute_mcd(orig_audio, conv_audio)
        mcd_scores.append(mcd_score)

    if mcd_scores:
        mean_mcd = np.mean(mcd_scores)
        print(f"Mean MCD: {mean_mcd:.4f}")
    else:
        print("No valid scores computed due to missing or corrupt files.")

# Function to save audio data correctly
def save_audio(file_path, audio_tensor, sample_rate=16000):
    audio_np = audio_tensor.cpu().detach().numpy().squeeze(0)
    audio_np = audio_np / np.max(np.abs(audio_np) + 1e-6)
    audio_np = np.clip(audio_np, -1, 1)
    audio_np = (audio_np * 32767).astype(np.int16)

    if len(audio_np.shape) > 1:
        audio_np = audio_np[0]

    write(file_path, sample_rate, audio_np)

# Training Loop for Transformer-based AAE
def train_AAE(dataloader_A, num_epochs=5):
    filename_mapping_A = {}  # Mapping for evaluation

    for epoch in range(num_epochs):
        batch_count = 0
        for real_A, file_A in dataloader_A:
            real_A = real_A.to(device)
            batch_count += 1

            # Forward pass through Encoder and Decoder
            latent = encoder(real_A)
            reconstructed_A = decoder(latent)

            # Compute Reconstruction Loss
            loss_reconstruction = criterion_reconstruction(reconstructed_A, real_A)

            # Adversarial Loss
            optimizer_disc.zero_grad()
            true_labels = torch.ones(latent.size(0), 1).to(device)
            fake_labels = torch.zeros(latent.size(0), 1).to(device)

            # Train Discriminator
            loss_disc_real = criterion_adversarial(discriminator(latent.detach()), true_labels)
            fake_latent = torch.randn_like(latent).to(device)
            loss_disc_fake = criterion_adversarial(discriminator(fake_latent), fake_labels)
            loss_disc = (loss_disc_real + loss_disc_fake) / 2
            loss_disc.backward()
            optimizer_disc.step()

            # Train Encoder and Decoder with Adversarial Loss
            optimizer_enc_dec.zero_grad()
            loss_adv = criterion_adversarial(discriminator(latent), true_labels)
            loss_enc_dec = loss_reconstruction + loss_adv
            loss_enc_dec.backward()
            optimizer_enc_dec.step()

            # Save Converted Audio for Evaluation
            converted_A_path = f"/content/drive/MyDrive/data/extracted_files-3/en/aae_converted_epoch_{epoch}/{file_A[0]}"
            os.makedirs(os.path.dirname(converted_A_path), exist_ok=True)

            save_audio(converted_A_path, reconstructed_A, sample_rate=16000)

            # Update Mappings for Evaluation
            filename_mapping_A[file_A[0]] = file_A[0]  # Map original to new

            print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_count}], Loss D: {loss_disc.item():.4f}, Loss Enc-Dec: {loss_enc_dec.item():.4f}")

        print(f"Epoch [{epoch+1}/{num_epochs}] completed.")
        evaluate_aae(epoch, filename_mapping_A)

    print("Training completed successfully!")

# Evaluation function for AAE
def evaluate_aae(epoch, mapping_A):
    original_dir_A = "/content/drive/MyDrive/data/extracted_files-3/en/North_American_English_W/"
    converted_dir_A = f"/content/drive/MyDrive/data/extracted_files-3/en/aae_converted_epoch_{epoch}"

    print(f"Evaluating AAE performance after Epoch {epoch+1}")
    evaluate_metrics(original_dir=original_dir_A, converted_dir=converted_dir_A, mapping=mapping_A)

# Start Training for AAE
train_AAE(dataloader_A)


Epoch [1/5], Batch [1], Loss D: 0.6609, Loss Enc-Dec: 1.6031
Epoch [1/5], Batch [2], Loss D: 0.6608, Loss Enc-Dec: 1.6353
Epoch [1/5], Batch [3], Loss D: 0.6218, Loss Enc-Dec: 1.3325
Epoch [1/5], Batch [4], Loss D: 0.4881, Loss Enc-Dec: 1.2348
Epoch [1/5], Batch [5], Loss D: 0.5682, Loss Enc-Dec: 1.4199
Epoch [1/5], Batch [6], Loss D: 0.3281, Loss Enc-Dec: 1.2011
Epoch [1/5], Batch [7], Loss D: 0.4399, Loss Enc-Dec: 1.1734
Epoch [1/5], Batch [8], Loss D: 0.4733, Loss Enc-Dec: 1.1534
Epoch [1/5], Batch [9], Loss D: 0.4886, Loss Enc-Dec: 1.2047
Epoch [1/5], Batch [10], Loss D: 0.2951, Loss Enc-Dec: 1.1354
Epoch [1/5], Batch [11], Loss D: 0.4920, Loss Enc-Dec: 1.1293
Epoch [1/5], Batch [12], Loss D: 0.3840, Loss Enc-Dec: 1.1543
Epoch [1/5], Batch [13], Loss D: 0.3874, Loss Enc-Dec: 1.1026
Epoch [1/5], Batch [14], Loss D: 0.4539, Loss Enc-Dec: 1.1080
Epoch [1/5], Batch [15], Loss D: 0.3736, Loss Enc-Dec: 1.0999
Epoch [1/5], Batch [16], Loss D: 0.3378, Loss Enc-Dec: 1.0933
Epoch [1/5], Batc