# Audio Playground

## Setup

In [1]:
!nvidia-smi

Wed Jan 18 09:36:21 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.60.13    Driver Version: 525.60.13    CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla V100-SXM2...  Off  | 00000000:61:00.0 Off |                    0 |
| N/A   33C    P0    40W / 300W |      0MiB / 32768MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla V100-SXM2...  Off  | 00000000:62:00.0 Off |                    0 |
| N/A   33C    P0    42W / 300W |      0MiB / 32768MiB |      0%      Defaul

In [9]:
import IPython.display as ipd
from pydub import AudioSegment
from pydub.silence import split_on_silence
import numpy as np
import os, sys
import torch
import torchaudio
from torch.utils.data import DataLoader
import librosa
os.environ["CUDA_VISIBLE_DEVICES"] = '1'
PROJECT_ROOT = "/project/fdreyer/projects/vqvae-vc"
EMONYMOUS_DATA_DIR = "/data/share/emonymous"
VCTK_DATA_DIR = os.path.join(EMONYMOUS_DATA_DIR, "vctk")
sys.path.append(PROJECT_ROOT)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Use {device} device")

Use cuda device


## Load Audio File with Librosa

In [7]:
path_to_audio = os.path.join(VCTK_DATA_DIR, "wav48_silence_trimmed", "s5", "s5_301_mic2.flac")
path_to_audio = "/scratch/sghosh/datasets/vctkall-voxceleb-24k/p307/19.wav"
audio, sr = librosa.load(path_to_audio, sr=16384)

In [12]:
mu_encoder = torchaudio.transforms.MuLawEncoding()
encoded_audio = mu_encoder(torch.tensor(audio))
encoded_audio

tensor([122, 122, 124,  ..., 137, 136, 128])

In [13]:
mu_decoder = torchaudio.transforms.MuLawDecoding()
decoded_audio = mu_decoder(encoded_audio)
decoded_audio

tensor([-1.0598e-03, -1.0598e-03, -6.4477e-04,  ...,  2.0063e-03,
         1.7540e-03,  8.6213e-05])

In [14]:
ipd.Audio(decoded_audio, rate=sr)

In [14]:
# 37813
audio.shape

(65185,)

## Correct Train/Test Split

In [17]:
vctk20_split_dir = os.path.join(PROJECT_ROOT, "config", "data", "vctk20", "split")
train_20_list = open(os.path.join(vctk20_split_dir, "train_20_list.txt")).readlines()
val_20_list = open(os.path.join(vctk20_split_dir, "val_20_list.txt")).readlines()

In [25]:
train_20_list

['/project/sughosh/dataset/VCTK-24k/p225/22.wav|0\n',
 '/project/sughosh/dataset/VCTK-24k/p239/163.wav|7\n',
 '/project/sughosh/dataset/VCTK-24k/p227/144.wav|11\n',
 '/project/sughosh/dataset/VCTK-24k/p258/67.wav|16\n',
 '/project/sughosh/dataset/VCTK-24k/p259/74.wav|17\n',
 '/project/sughosh/dataset/VCTK-24k/p230/103.wav|3\n',
 '/project/sughosh/dataset/VCTK-24k/p225/7.wav|0\n',
 '/project/sughosh/dataset/VCTK-24k/p226/67.wav|10\n',
 '/project/sughosh/dataset/VCTK-24k/p228/27.wav|1\n',
 '/project/sughosh/dataset/VCTK-24k/p243/23.wav|13\n',
 '/project/sughosh/dataset/VCTK-24k/p228/3.wav|1\n',
 '/project/sughosh/dataset/VCTK-24k/p244/61.wav|9\n',
 '/project/sughosh/dataset/VCTK-24k/p230/178.wav|3\n',
 '/project/sughosh/dataset/VCTK-24k/p240/120.wav|8\n',
 '/project/sughosh/dataset/VCTK-24k/p228/154.wav|1\n',
 '/project/sughosh/dataset/VCTK-24k/p230/146.wav|3\n',
 '/project/sughosh/dataset/VCTK-24k/p240/149.wav|8\n',
 '/project/sughosh/dataset/VCTK-24k/p254/56.wav|14\n',
 '/project/sugho

In [27]:
relative_paths = []
for path in train_20_list:
    relative_path = "/".join(path.split("|")[0].split("/")[-2:])
    relative_paths.append(relative_path)
new_val_20_path = os.path.join(vctk20_split_dir, "train_20_list_n.txt")
file = open(new_val_20_path, "w")
file.writelines("\n".join(relative_paths))

## Chop audio by silence

In [5]:
def pydub_to_np(audio: AudioSegment) -> (np.ndarray, int):
    """
    Converts pydub audio segment into np.float32 of shape [duration_in_seconds*sample_rate, channels],
    where each value is in range [-1.0, 1.0].
    Returns tuple (audio_np_array, sample_rate).
    """
    return np.array(audio.get_array_of_samples(), dtype=np.float32).reshape((-1, audio.channels)) / (
            1 << (8 * audio.sample_width - 1)), audio.frame_rate

In [6]:
sound = AudioSegment.from_file(path_to_audio, format="flac")

In [7]:
chunks = split_on_silence(sound, min_silence_len=100, silence_thresh=sound.dBFS-16, keep_silence=100)

In [8]:
chunks = [pydub_to_np(chunk) for chunk in chunks]

In [9]:
for a, sr in chunks:
    ipd.display(ipd.Audio(np.squeeze(a), rate=sr))

## Check VCTKDataset and vc_pad_collate Functionality

In [29]:
speaker_info_path = os.path.join(VCTK_DATA_DIR, "speaker-info.txt")
audio_directory = os.path.join(VCTK_DATA_DIR, "wav48_silence_trimmed")

vctk_dataset = VCTKDataset(speaker_info_path, audio_directory)
vctk_dataloader =DataLoader(vctk_dataset, batch_size=512, shuffle=True, collate_fn=vc_pad_collate)

In [23]:
padded_waveforms, speaker, len_waveforms = next(iter(vctk_dataloader))

In [24]:
padded_waveforms.shape, speaker.shape, len_waveforms.shape

(torch.Size([64, 1, 419561]), torch.Size([64]), torch.Size([64]))

In [25]:
torch.max(len_waveforms)

tensor(419561)

## Learned Upsampling and Downsampling Functionality:

In [14]:
encoder = DownsamplingSequenceEncoder(
    1, [32 for _ in range(6)], [4 for _ in range(6)], [2 for _ in range(6)]).to(device)
decoder = UpsamplingSequenceDecoder(
    32, [32 for _ in range(6)], [4 for _ in range(6)], [2 for _ in range(6)]).to(device)

In [15]:
encoded_inputs = encoder(padded_waveforms.to(device))
encoded_inputs.shape

torch.Size([8, 32, 3204])

In [16]:
decoded_inputs = decoder(encoded_inputs)
decoded_inputs.shape

torch.Size([8, 32, 205056])

In [17]:
padded_waveforms.shape[-1]

205031

## Conditional WaveNet Functionality:

In [18]:
from src.models.base import BaseModule
import torch
from torch import nn


class WaveNet(BaseModule):

    def __init__(self, in_channels: int, residual_channels: int, dilation_channels: int, skip_channels: int,
                 out_channels: int, kernel_size: int = 2, n_stacks: int = 2, dilation_steps: int = 9,
                 dilation_growth_factor: int = 2, use_local_conditioning: bool = False,
                 in_channels_local_condition: int = None, use_global_conditioning: bool = False,
                 in_features_global_condition: int = None):
        super(WaveNet, self).__init__()
        self.in_transform = CausalConv1D(in_channels, residual_channels, kernel_size, dilation=1)
        self.gated_activation_stacks = nn.ModuleList()
        for stack in range(n_stacks):
            for dilation_step in range(dilation_steps):
                dilation = dilation_growth_factor ** dilation_step
                use_residual = not ((stack == n_stacks - 1) and (dilation_step == dilation_steps - 1))
                self.gated_activation_stacks.append(
                    GatedActivationResidualBlock(residual_channels, dilation_channels, skip_channels,
                                                 kernel_size, dilation, use_residual,
                                                 use_local_conditioning, in_channels_local_condition,
                                                 use_global_conditioning, in_features_global_condition)
                )
        self.out_transform = nn.Sequential(
            nn.ReLU(),
            nn.Conv1d(skip_channels, skip_channels, 1),
            nn.ReLU(),
            nn.Conv1d(skip_channels, out_channels, 1)
        )

    def forward(self, sequences: torch.Tensor, local_conditions: torch.Tensor = None,
                global_conditions: torch.Tensor = None):
        summed_skip_outs = None
        residual_out = self.in_transform(sequences)
        for gated_activation_stack in self.gated_activation_stacks:
            residual_out, skip_out = gated_activation_stack(residual_out, local_conditions,
                                                            global_conditions)
            if summed_skip_outs == None:
                summed_skip_outs = skip_out
            else:
                summed_skip_outs += skip_out
        logits = self.out_transform(summed_skip_outs)
        return logits


class GatedActivationResidualBlock(nn.Module):

    def __init__(self, in_channels: int, dilation_channels: int, skip_channels: int, kernel_size: int,
                 dilation: int = 1, use_residual: bool = True, use_local_conditioning: bool = False,
                 in_channels_local_condition: int = None, use_global_conditioning: bool = False,
                 in_features_global_condition: int = None):
        super(GatedActivationResidualBlock, self).__init__()
        self.gated_activation_unit = GatedActivationUnit(in_channels, dilation_channels, kernel_size,
                                                         dilation, use_local_conditioning,
                                                         in_channels_local_condition,
                                                         use_global_conditioning,
                                                         in_features_global_condition)
        self.use_residual = use_residual
        if self.use_residual:
            self.residual_1x1conv = nn.Conv1d(dilation_channels, in_channels, 1)
        self.skip_1x1conv = nn.Conv1d(dilation_channels, skip_channels, 1)

    def forward(self, sequences: torch.Tensor, local_conditions: torch.Tensor = None,
                global_conditions: torch.Tensor = None):
        gated_out = self.gated_activation_unit(sequences, local_conditions, global_conditions)
        residual_out = self.residual_1x1conv(gated_out) + sequences if self.use_residual else None
        skip_out = self.skip_1x1conv(gated_out)
        return residual_out, skip_out


class GatedActivationUnit(nn.Module):

    def __init__(self, in_channels: int, out_channels: int, kernel_size: int, dilation: int = 1,
                 use_local_conditioning: bool = False, in_channels_local_condition: int = None,
                 use_global_conditioning: bool = False, in_features_global_condition: int = None):
        super(GatedActivationUnit, self).__init__()
        self.use_local_conditioning = use_local_conditioning
        self.use_global_conditioning = use_global_conditioning
        self.conv_sequence = CausalConv1D(in_channels, out_channels, kernel_size, dilation=dilation)
        self.conv_sequence_gating = CausalConv1D(in_channels, out_channels, kernel_size, dilation=dilation)
        if self.use_local_conditioning:
            self.conv_local_condition = nn.Conv1d(in_channels_local_condition, out_channels, 1)
            self.conv_local_condition_gating = nn.Conv1d(in_channels_local_condition, out_channels, 1)
        if self.use_global_conditioning:
            self.linear_global_condition = nn.Linear(in_features_global_condition, out_channels)
            self.linear_global_condition_gating = nn.Linear(in_features_global_condition, out_channels)

    def forward(self, sequences: torch.Tensor, local_conditions: torch.Tensor = None,
                global_conditions: torch.Tensor = None):
        out = self._compute_ungated_output(sequences, local_conditions, global_conditions)
        gate = self._compute_gate(sequences, local_conditions, global_conditions)
        return out * gate

    def _compute_ungated_output(self, sequences: torch.Tensor, local_conditions: torch.Tensor = None,
                global_conditions: torch.Tensor = None):
        ungated_out = self.conv_sequence(sequences)
        if self.use_local_conditioning:
            ungated_out += self.conv_local_condition(local_conditions)
        if self.use_global_conditioning:
            ungated_out += self.linear_global_condition(global_conditions).unsqueeze(-1)
        return torch.tanh(ungated_out)

    def _compute_gate(self, sequences: torch.Tensor, local_conditions: torch.Tensor = None,
                global_conditions: torch.Tensor = None):
        gate = self.conv_sequence_gating(sequences)
        if self.use_local_conditioning:
            gate += self.conv_local_condition_gating(local_conditions)
        if self.use_global_conditioning:
            gate += self.linear_global_condition_gating(global_conditions).unsqueeze(-1)
        return torch.sigmoid(gate)


class CausalConv1D(nn.Module):

    def __init__(self, in_channels: int, out_channels: int, kernel_size: int, dilation: int=1):
        super(CausalConv1D, self).__init__()
        left_pad = kernel_size + (kernel_size-1) * (dilation-1) - 1
        pad = (left_pad, 0)
        self.pad = nn.ConstantPad1d(pad, 0.0)
        self.conv1d = nn.Conv1d(in_channels, out_channels, kernel_size, dilation=dilation)

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
        padded_inputs = self.pad(inputs)
        return self.conv1d(padded_inputs)


In [57]:
padded_waveforms = padded_waveforms.to(device)

In [58]:
local_condition = decoded_inputs[:, :, :padded_waveforms.shape[-1]]
global_condition = torch.rand(8, 64)

In [59]:
gated_activation_unit = WaveNet(1, 2, 4, 8, 16, use_local_conditioning=True,
                                in_channels_local_condition=32,
                                use_global_conditioning=True,
                                in_features_global_condition=64).to(device)

In [60]:
gated_activation_unit(padded_waveforms.to(device), local_condition.to(device), global_condition.to(device)).shape

torch.Size([8, 16, 226340])

In [45]:
padded_waveforms.shape

torch.Size([8, 1, 226340])