# YZV302E - DEEP LEARNING TERM PROJECT
## Many-to-one Voice Conversion
## Fall, 23-24, Istanbul Technical University


### Authors: Muhammet Serdar NAZLI, Ömer Faruk AYDIN

# THIS IS OUR FAILED APPROACH. YOU WILL NOT BE ABLE TO GET GOOD RESULTS, THE MODEL WILL NOT CONVERGE.

## Importing Necessary Libraries and device settings

In [1]:
import os
import sys
print(sys.version)
import glob

import numpy as np 

import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split
from torch.functional import F
import librosa
import librosa.display
import soundfile as sf

from IPython.display import Audio
import matplotlib.pyplot as plt

from pretrained.rmvpe.RMVPE.rmvpe import RMVPE



device = 'cuda' if torch.cuda.is_available() else 'cpu'

print("Using CPU/GPU: ", device)

3.8.18 (default, Sep 11 2023, 13:39:12) [MSC v.1916 64 bit (AMD64)]
Using CPU/GPU:  cuda


## HuBERT Large model from torch pipelines.

For more details you can check the paper about the HuBERT: https://arxiv.org/abs/2106.07447

For more details about pipeline to get pretrained HuBERT: 'https://pytorch.org/audio/0.10.0/pipelines.html'

In [2]:
# For more details about what this pipelines lib is doing, see: torchaudio>pipelines>_wav2vec>impl.py: HUBERT_BASE. 
hubert_bundle = torchaudio.pipelines.HUBERT_BASE              

# Build the model and load pre-trained weights
# Around 300MB. May take a while to download if you have a slow connection.
# Once you get it, it will be cached. You won't have to download it again.
hubert_model = hubert_bundle.get_model().to(device)              

## RMPVE, Real-time Monophonic Vocal Pitch Extractor through local files.

File is around 180MB. 

For more details you can check the paper:
'https://arxiv.org/abs/2306.15412' <br>
implementation: 'https://github.com/Dream-High/RMVPE?tab=readme-ov-file' <br>
huggingface .pt file link: 'https://huggingface.co/lj1995/VoiceConversionWebUI/blob/main/rmvpe.pt'

In [None]:
# Getting it from the local files. RMVPE class definition is in the pretrained/rmvpe/RMVPE/rmvpe.py file.
# Pretrained model is in the pretrained/rmvpe/RMVPE/rmvpe.pt file which is an OrderedDict.
rmpve_model = RMVPE(model_path='pretrained/rmvpe/RMVPE/rmvpe.pt', is_half=False, device=device)



In [None]:
rmpve_model

## Hifi-GAN through Nvidia pipelines

For more details you can check the paper: 'https://arxiv.org/abs/2010.05646' 

pretrained model details: 'https://huggingface.co/nvidia/tts_hifigan'

In [None]:
hifigan, vocoder_train_setup, denoiser = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_hifigan')
hifigan_model = hifigan.to(device)
denoiser_model = denoiser.to(device)

print("\nHifiGAN Model:\n",hifigan_model)
print("\nDenoiser:\n",denoiser_model)
print(vocoder_train_setup)

## Dataset Class / Loaders

In [None]:
import soundfile as sf
import librosa
import torch
from torch.utils.data import Dataset

class VoiceConversionDataset(Dataset):
    def __init__(self, file_paths, hubert_model, rmvpe_model, device, n_mels=80):
        """
        file_paths: List of paths to audio files.
        hubert_model: Pre-trained HuBERT model.
        rmvpe_model: Pre-trained RMVPE model.
        device: Torch device to run the models on.
        n_mels: Number of mel frequency bands (default 80).
        """
        self.file_paths = file_paths
        self.hubert_model = hubert_model
        self.rmvpe_model = rmvpe_model
        self.device = device
        self.n_mels = n_mels

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        # Load and preprocess audio
        audio, sampling_rate = sf.read(self.file_paths[idx])
        orig_audio = audio.copy()   
        if len(audio.shape) > 1:
            audio = librosa.to_mono(audio.transpose(1, 0))
        if sampling_rate != 16000:
            audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=16000)

        # Convert to tensor and add batch dimension
        audio_tensor = torch.from_numpy(audio).float().unsqueeze(0).to(self.device)

        # Extract features
        with torch.no_grad():
            self.hubert_model.eval()
            hubert_features,_ = self.hubert_model(audio_tensor)  # Remove batch dimension
            hubert_features = hubert_features.squeeze(0)
            rmvpe_features = self.rmvpe_model.infer_from_audio(audio, thred=0.03)
            rmvpe_features = torch.tensor(rmvpe_features).float()  # Convert to tensor if not already


        # Generate target mel spectrogram
        # audio, sr = librosa.load(self.file_paths[idx], sr=22050)  # Sample rate 22050 Hz

        # Extract mel-spectrogram
        """mel_spectrogram = librosa.feature.melspectrogram(
            y=audio,
            sr=sr,
            n_fft=1024,            # FFT window size
            hop_length=256,        # Window stride
            n_mels=80,             # Number of Mel bands
            fmin=0,                # Minimum frequency
            fmax=8000,             # Maximum frequency
            window='hann'          # Window type
        )"""
        
        #mel_spec = librosa.feature.melspectrogram(y=audio, sr=sampling_rate, n_mels=self.n_mels)
        #mel_spec = torch.from_numpy(mel_spec).float()

        mel_spec = librosa.feature.melspectrogram(
            y=audio,
            sr=sr,
            n_fft=1024,            # FFT window size
            hop_length=256,        # Window stride
            n_mels=80,             # Number of Mel bands
            fmin=0,                # Minimum frequency
            fmax=8000,             # Maximum frequency
            window='hann'          # Window type
        )
        mel_spec = torch.from_numpy(mel_spectrogram).float()

        assert mel_spec.shape[0] == self.n_mels, f"Mel spectrogram shape mismatch {mel_spec.shape} != {self.n_mels}"
        
        return hubert_features, rmvpe_features, mel_spec

    
def pad_sequences_1d(sequences):
    """Pad 1D sequences to the maximum length sequence in the batch."""
    max_len = max([s.size(0) for s in sequences])  # Assuming each sequence in sequences is already a tensor
    padded_sequences = torch.zeros((len(sequences), max_len))
    for i, seq in enumerate(sequences):
        end = seq.size(0)
        padded_sequences[i, :end] = seq  # Directly using the tensor
    return padded_sequences


def pad_sequences_2d(sequences, pad_dim=1):
    """Pad 2D sequences to the maximum length sequence in the batch."""
    max_len = max([s.size(pad_dim) for s in sequences])
    other_dim = sequences[0].size(1 - pad_dim)

    padded_sequences = torch.zeros((len(sequences), max_len, other_dim) if pad_dim == 0 else (len(sequences), other_dim, max_len))
    for i, seq in enumerate(sequences):
        length = seq.size(pad_dim)
        if pad_dim == 0:
            padded_sequences[i, :length, :] = seq[:length, :]
        else:
            padded_sequences[i, :, :length] = seq[:, :length]
    return padded_sequences




def collate_fn(batch):
    hubert_features, rmvpe_features, target_mels = zip(*batch)

    padded_hubert = pad_sequences_2d(hubert_features, pad_dim=0)  # Pad along time dimension (dim 0 for HuBERT)
    padded_rmvpe = pad_sequences_1d(rmvpe_features)
    padded_target_mels = pad_sequences_2d(target_mels, pad_dim=1)  # Pad along time dimension (dim 1 for Mel)

    return padded_hubert, padded_rmvpe, padded_target_mels




In [None]:
# Specify the directory path
directory_path = "data-/"

# Get all the files in the directory that end with .wav
file_paths = glob.glob(directory_path + "/*.wav")



voice_conversion_dataset = VoiceConversionDataset(file_paths[:], hubert_model, rmpve_model, device)

voice_loader = DataLoader(voice_conversion_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

# Iterate over the DataLoader
for i,batch in enumerate(voice_loader):
    padded_hubert, padded_rmvpe, padded_target_mels = batch
    print(f"(batch{i}) Padded HuBERT Features:", padded_hubert.shape)
    print(f"(batch{i}) Padded RMVPE Features:", padded_rmvpe.shape)
    print(f"(batch{i}) Padded Target Mel Spectrograms:", padded_target_mels.shape)
    if i == 2:
        break


## Custom Network

### Architecture

In [None]:
class AutoEncoder(nn.Module):
    def __init__(self, hubert_feature_size=1024, rmvpe_feature_size=1, num_mels=80, hidden_size=256):
        super(AutoEncoder, self).__init__()

        # Encoder LSTM layers
        self.encoder_lstm1 = nn.LSTM(
            input_size=hubert_feature_size + rmvpe_feature_size, 
            hidden_size=hidden_size, 
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        self.encoder_batchnorm1 = nn.BatchNorm1d(hidden_size * 2)  # *2 for bidirectional output
        self.encoder_lstm2 = nn.LSTM(
            input_size=hidden_size * 2, 
            hidden_size=hidden_size, 
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        self.encoder_batchnorm2 = nn.BatchNorm1d(hidden_size * 2)

        # Additional layer for processing encoded features
        self.encoded_linear = nn.Linear(hidden_size * 2, hidden_size)

        # Decoder LSTM layers
        self.decoder_lstm1 = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True
        )
        self.decoder_batchnorm1 = nn.BatchNorm1d(hidden_size)
        self.decoder_lstm2 = nn.LSTM(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True
        )
        self.decoder_batchnorm2 = nn.BatchNorm1d(hidden_size)
        self.decoder_linear = nn.Linear(hidden_size, num_mels)

    def forward(self, hubert_features, rmvpe_features, target_time_steps):
        batch_size, time_steps, _ = hubert_features.size()

        # Resample RMVPE features to match the time steps of HuBERT features
        rmvpe_features = F.interpolate(rmvpe_features.unsqueeze(1), size=time_steps, mode='linear', align_corners=False)
        rmvpe_features = rmvpe_features.squeeze(1)

        # Concatenate HuBERT and RMVPE features
        combined_features = torch.cat((hubert_features, rmvpe_features.unsqueeze(-1)), dim=-1)

        # Encoding
        encoded_features, _ = self.encoder_lstm1(combined_features)
        encoded_features = encoded_features.contiguous().transpose(1, 2)
        encoded_features = self.encoder_batchnorm1(encoded_features)
        encoded_features = encoded_features.transpose(1, 2)
        encoded_features, _ = self.encoder_lstm2(encoded_features)
        encoded_features = encoded_features.contiguous().transpose(1, 2)
        encoded_features = self.encoder_batchnorm2(encoded_features)
        encoded_features = encoded_features.transpose(1, 2)

        # Process encoded features
        encoded_features = self.encoded_linear(encoded_features)

        # Decoding
        decoded_features, _ = self.decoder_lstm1(encoded_features)
        decoded_features = decoded_features.contiguous().transpose(1, 2)
        decoded_features = self.decoder_batchnorm1(decoded_features)
        decoded_features = decoded_features.transpose(1, 2)
        decoded_features, _ = self.decoder_lstm2(decoded_features)
        decoded_features = decoded_features.contiguous().transpose(1, 2)
        decoded_features = self.decoder_batchnorm2(decoded_features)
        decoded_features = decoded_features.transpose(1, 2)
        decoded_features = self.decoder_linear(decoded_features)

        # Transpose to bring mel bands dimension before time steps
        decoded_features = decoded_features.transpose(1, 2)

        # Ensure output time dimension matches target
        mel_output = F.interpolate(decoded_features, size=(target_time_steps), mode='linear', align_corners=False)

        return mel_output


### Train/Eval Loops/Functions

In [None]:
def train(model, data_loader, optimizer, criterion, device, num_epochs=10):
    model.to(device)

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, data in enumerate(data_loader, 0):
            # Unpack the data
            hubert_features, rmvpe_features, target_mels = data
            hubert_features, rmvpe_features, target_mels = hubert_features.to(device), rmvpe_features.to(device), target_mels.to(device)

            # Get target time steps
            target_time_steps = target_mels.size(2)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(hubert_features, rmvpe_features, target_time_steps)

            # Calculate loss and perform backpropagation
            loss = criterion(outputs, target_mels)
            loss.backward()
            optimizer.step()

            # Print statistics
            running_loss += loss.item()
            if i % 10 == 9:
                print(f"[{epoch + 1}, {i + 1}] loss: {running_loss / 10:.3f}")
                running_loss = 0.0

    print('Finished Training')

In [None]:
def evaluate(model, dataloader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    with torch.no_grad():  # No need to track gradients
        for batch in dataloader:
            inputs, targets = batch
            
            # Move data to the device
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss


### Training

In [None]:
# Initialize and train the model
autoencoder = AutoEncoder()
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.001)
criterion = nn.MSELoss()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loss = train(autoencoder, voice_loader, optimizer, criterion, device, num_epochs=5)

In [None]:
for item in voice_loader:
    autoencoder.eval()
    with torch.no_grad():
        hubert_features, rmvpe_features, target_mels = item
        hubert_features, rmvpe_features, target_mels = hubert_features.to(device), rmvpe_features.to(device), target_mels.to(device)

        # Get target time steps
        target_time_steps = target_mels.size(2)

        # Forward pass
        outputs = autoencoder(hubert_features, rmvpe_features, target_time_steps)
        break

In [None]:
outputs.cpu()[0].shape
mel_spectrogram_db = outputs.cpu()[0].numpy()
target_mels.cpu().shape

In [None]:
# Normalize mel-spectrogram to be between -10 and 10
# mel_spectrogram_db = target_mels[0].cpu().numpy()
min_level = np.min(mel_spectrogram_db)
max_level = np.max(mel_spectrogram_db)
mel_spectrogram_normalized =  (mel_spectrogram_db - min_level) / (max_level - min_level) * 10 - 10


# Ensure the mel-spectrogram is in the correct format for HiFi-GAN
mel_spectrogram_torch = torch.tensor(mel_spectrogram_normalized).unsqueeze(0).to(torch.float32).to(device)  # Add batch dimension and convert to float32


# Generate audio from mel-spectrogram
with torch.no_grad():
    audio_output = hifigan_model(mel_spectrogram_torch).float()
    audio_output = denoiser_model(audio_output.squeeze(1), 0.01)
    audio_output = audio_output.squeeze(1) * vocoder_train_setup['max_wav_value']

print(mel_spectrogram_torch.shape)
# Convert to numpy and normalize
audio_numpy = audio_output.cpu().numpy()
audio_numpy = np.int16(audio_numpy / np.max(np.abs(audio_numpy)) * 22050)
Audio(audio_numpy, rate=22050)