# GANSynth - Síntese de timbres musicais

### Alunos: Gabriel, Gleyson, Patrick

O objetivo deste trabalho foi a reprodução do modelo GANSynth, cuja
arquitetura consiste em um gerador e um discriminador convolucionais e cujo
objetivo era a síntese musical.

---

## Dataset

O primeiro passo foi a definição da classe de dataset, que utiliza a base de
dados Nsynth, assim como os autores do paper original: https://magenta.tensorflow.org/datasets/nsynth

É necessário baixar os arquivos json/wav e descompactar para execução deste código.

A classe construída é definida abaixo e utiliza o framework PyTorch.

In [None]:
import json
import os

import pandas as pd
from scipy import signal
from scipy.io import wavfile
from torch.utils.data import Dataset


class NsynthDatasetFourier(Dataset):

    def __init__(self, path="nsynth-train/", noise_length=256, shuffle=True):
        super().__init__()
        self.noise_length = noise_length
        self.path = path

        instr_fmly_name = ["bass", "brass", "flute", "guitar", "keyboard",
                           "mallet", "organ", "reed", "string", "synth_lead",
                           "vocal"]
        instr_fmly_num = range(len(instr_fmly_name))  # 11 familias

        self.instr_fmly_dict = dict(zip(instr_fmly_name, instr_fmly_num))

        notas_nomes = ["do", "do_s", "re", "re_s", "mi", "fa", "fa_s", "sol",
                       "sol_s", "la", "la_s", "si"]
        self.notas_indices = range(len(notas_nomes))  # 12 seminotas

        self.notas_dict = dict(zip(notas_nomes, self.notas_indices))

        with open(os.path.join(path, 'examples.json'), 'r') as file:
            summary_dict = json.load(file)

        self.summary_df = pd.DataFrame(list(summary_dict.values()))
        # self.summary_df = pd.get_dummies(self.summary_df, columns=["instrument_family_str", ])

        self.shuffle_array = torch.arange(self.summary_df.shape[0])
        if shuffle is True: self.shuffle_array = torch.randperm(self.summary_df.shape[0])

    def __getitem__(self, index):
        idx = self.shuffle_array[index].item()

        sample_info = self.summary_df.loc[idx]

        sample_audio_array = wavfile.read(os.path.join(self.path, "audio", sample_info["note_str"]) + ".wav")[1] / 1.0
        # sample_audio_array = _scale_data(sample_audio_array)
        sample_audio_array = np.pad(sample_audio_array, [(700, 700), ], mode='constant')

        f, t, espectro = signal.stft(x=sample_audio_array, fs=2048, nperseg=2048, noverlap=3 * 2048 // 4, padded=False)

        # Jogamos fora a frequencia de Nyquist
        espectro = espectro[:-1]
        f = f[:-1]

        # # Visualizar o espectro gerado
        # plt.pcolormesh(t, f, np.abs(espectro), vmin=0, vmax=1e5, shading='gouraud')
        # plt.show()

        instr_fmly_one_hot = torch.zeros(((len(self.instr_fmly_dict.keys()),) + espectro.shape))
        notas_one_hot = torch.zeros(((len(self.notas_indices),) + espectro.shape))

        # Decompoe o espectro em magnitude e fase (angulo).
        amplitude_espectro = np.abs(espectro)
        amplitude_espectro[amplitude_espectro == 0] = 1e-6
        magnitude_espectro = np.log10(amplitude_espectro)
        phase_espectro = np.angle(espectro)

        # Setamos como 1 o elemento daquela familia (one hot)
        instr_fmly_one_hot[self.instr_fmly_dict[sample_info["instrument_family_str"]]] = 1
        # pitch dividido por 12 (qtde de notas), para transcrever em nota
        # dentro da "oitava" em que aquele pitch se encontra.
        notas_one_hot[self.notas_indices[sample_info["pitch"] % len(self.notas_indices)]] = 1

        return torch.normal(mean=0, std=1.0, size=(256, self.noise_length, self.noise_length)), \
               torch.cat((torch.tensor(magnitude_espectro[np.newaxis, ...]), torch.tensor(phase_espectro[np.newaxis, ...]),), dim=0)

    def __len__(self, ):
        return self.summary_df.shape[0]

## Funções de ativação

Utilizamos a LeakyReLU como função de ativação, e definimos ela em uma instância
sempre seguida de normalização, para maior comodidade. Camadas convolucionais
usam batch norm, enquanto que camadas densas usam layer norm, o que parece ser
uma tendência nos últimos tempos.

In [None]:
import numpy as np
from torch import nn


class BnActivation(nn.Module):
    def __init__(self, num_features=1):
        super().__init__()

        self.activation = nn.Sequential(
            nn.LeakyReLU(),
            nn.BatchNorm2d(num_features=num_features, momentum=0.99),
        )

    def forward(self, x):
        return self.activation(x)


class LnActivation(nn.Module):
    def __init__(self, normalized_shape=1):
        super().__init__()

        self.activation = nn.Sequential(
            nn.LeakyReLU(),
            nn.LayerNorm(normalized_shape=normalized_shape),
        )

    def forward(self, x):
        return self.activation(x)

## Modelos

Os modelos construídos são iguais aos do paper original, exceto por 1 única
intervenção, que foi a adição de skip connections nas camadas convolucionais no
estilo ResNet, para melhor propagação do gradiente.

In [None]:
import torch
from torch import nn
from torch.nn import Sequential, Linear, Sigmoid


class ResBlock(nn.Module):
    def __init__(self, n_input_channels=6, n_output_channels=7,
                 kernel_size=7, stride=1, padding=0, dilation=1,
                 groups=1, bias=True, padding_mode='zeros'):
        """
    ResNet-like block, receives as arguments the same that PyTorch's Conv1D
    module.
        """
        super(ResBlock, self).__init__()

        self.feature_extractor = \
            Sequential(
                nn.Conv2d(n_input_channels, n_output_channels, kernel_size,
                          stride, kernel_size // 2 * dilation, dilation,
                          groups, bias, padding_mode),
                BnActivation(n_output_channels),
                nn.Conv2d(n_output_channels, n_output_channels, kernel_size,
                          stride, kernel_size // 2 * dilation,
                          dilation, groups, bias, padding_mode),
            )

        self.skip_connection = \
            Sequential(
                nn.Conv2d(n_input_channels, n_output_channels, 1,
                          stride, padding, dilation, groups, bias, padding_mode)
            )

        self.activation = BnActivation(n_output_channels)

    def forward(self, input_seq):
        return self.activation(self.feature_extractor(input_seq) + self.skip_connection(input_seq))


class Generator2DUpsampled(nn.Module):
    def __init__(self, n_input_channels=24, bias=False):
        super().__init__()

        self.feature_generator = Sequential(
            nn.Conv2d(n_input_channels, 256, kernel_size=(2, 16), stride=(1, 1), dilation=(1, 1), padding=(1, 15), bias=bias), BnActivation(256),
            ResBlock(256, 256, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(256, 256, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(256, 256, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(256, 256, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(256, 128, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(128, 64, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Upsample(scale_factor=2.0, mode='bilinear', align_corners=None),
            ResBlock(64, 32, kernel_size=3, stride=1, dilation=1, padding=0, bias=bias),
            nn.Conv2d(32, 2, kernel_size=(3, 3), stride=(1, 1), dilation=(1, 1), padding='same', bias=bias),
        )

        self.activation = nn.Tanh()

    def forward(self, x):
        som_2_canais = self.feature_generator(x).transpose(2, 3)
        som_2_canais[:, 1, :, :] = self.activation(som_2_canais[:, 1, :, :]) * 3.1415926535897
        som_2_canais[:, 0, :, :] = self.activation(som_2_canais[:, 0, :, :]) * 11.85 - 7.35
        return som_2_canais


class Discriminator2D(nn.Module):
    def __init__(self, seq_length=64000, n_input_channels=24,
                 kernel_size=7, stride=1, padding=0, dilation=1, bias=False):
        super().__init__()

        n_output_channels = 256

        self.feature_extractor = Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=3, dilation=2, bias=bias, ), BnActivation(32),
            ResBlock(32, 64, kernel_size=3, stride=1, dilation=1, bias=bias),
            nn.AvgPool2d(2, 2),
            ResBlock(64, 128, kernel_size=3, stride=1, dilation=1, bias=bias),
            nn.AvgPool2d(2, 2),
            ResBlock(128, n_output_channels, kernel_size=3, stride=1, dilation=1, bias=bias),
            nn.AvgPool2d(2, 2),
            ResBlock(n_output_channels, n_output_channels, kernel_size=3, stride=1, dilation=1, bias=bias),
            nn.AvgPool2d(2, 2),
            ResBlock(n_output_channels, n_output_channels, kernel_size=3, stride=1, dilation=1, bias=bias),
            nn.AvgPool2d(2, 2),
            ResBlock(n_output_channels, n_output_channels, kernel_size=3, stride=1, dilation=1, bias=bias),
        )

        self.mlp = nn.Sequential(
            Linear(2560, 1024, bias=bias),
            LnActivation(1024),
            Linear(1024, 1, bias=bias),
        )

        self.activation = Sigmoid()

    def forward(self, x):
        return self.activation(self.mlp(self.feature_extractor(x).flatten(start_dim=1)))

## Função de Loss

O paper original utiliza a BCE como Loss de treino. Porém, o PyTorch não permite
utilizá-la durante a aceleração de treino com precisão mista, então fizemos a
implementação manual da loss Hiperbólica, que possui características parecidas.

In [None]:
class HyperbolicLoss(nn.Module):
    def __init__(self, epsilon=1e-6, *args, **kwargs):
        super().__init__()
        self.epsilon = epsilon + 1

    def forward(self, y, y_hat):
        return (1 / (self.epsilon - ((y - y_hat) ** 2))).mean()

## Treinamento em GPU

Definimos abaixo o dispositivo de treinamento como GPU, quando disponível.

In [None]:
if torch.cuda.is_available():
    dev = "cuda:0"
    print("Usando GPU")
else:
    dev = "cpu"
    print("Usando CPU")
device = torch.device(dev)

## LR - get and set

Funçãoes utilitárias para definir e verificar o learning rate de um modelo.

In [None]:
def get_lr(optimizer):
    return optimizer.param_groups[0]['lr']


def set_lr(optimizer, new_lr=0.01):
    for param_group in optimizer.param_groups:
        param_group['lr'] = new_lr
        return

Tenho um pacote em python com algumas funções e classes de uso recorrente. Uma
delas é o DataManager, uma classe que converte o dispositivo e tipo dos tensores
em uma thread separada do código, deixando o script mais organizado e mais rápido.

In [None]:
# Install my utilities
!pip install ptk-patrickctrf

## Parâmetros de treino

Abaixo definimos alguns parâmetros relativos ao treino, como tamanho do mini-batch.

In [None]:
from ptk.utils import DataManager
from tqdm import tqdm
import csv
from torch.utils.data import DataLoader
from torch.cuda.amp import GradScaler, autocast

batch_size = 32
noise_length = 1
target_length = 128
use_amp = True
max_examples = 1_000_000

Instanciamos nossos modelos

In [None]:
# Models
generator = Generator2DUpsampled(n_input_channels=256)
discriminator = Discriminator2D(seq_length=target_length, n_input_channels=2, kernel_size=7, stride=1, padding=0, dilation=1, bias=True)

# Put in GPU (if available)
generator.to(device)
discriminator.to(device)

Selecionamos os tipos de gradiente que utilizaremos (Adam, o mais comum).

Definimos também um calendário de LR, de forma que o learning rate vá
diminuindo gradativamente ao longo do treino e permita uma otimização mais fina.

In [None]:
# Optimizers
generator_optimizer = torch.optim.Adam(generator.parameters(), lr=8e-4, )
discriminator_optimizer = torch.optim.Adam(discriminator.parameters(), lr=8e-4, )
generator_scaler = GradScaler()
discriminator_scaler = GradScaler()

# Variable LR
generator_scheduler = torch.optim.lr_scheduler.LambdaLR(generator_optimizer, lambda epoch: max(1 - epoch / 30000.0, 0.1))
discriminator_scheduler = torch.optim.lr_scheduler.LambdaLR(discriminator_optimizer, lambda epoch: max(1 - epoch / 30000.0, 0.1))

Instanciamos a função de Loss.

In [None]:
# loss
criterion = HyperbolicLoss()

Abrimos o dataset. LEMBRE-SE de mudar o caminho do dataset para a pasta do seu
computador onde você o extraiu.

In [None]:
# Train Data
train_dataset = NsynthDatasetFourier(path="<NSYNTH-PATH-HERE>/nsynth-train/", noise_length=noise_length)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=1)

Abaixo temos o loop de treinamento, com precisão mista e alguns logs em tela e
em arquivo para acompanhar o progresso de treinamento.

In [None]:
# Log data
total_generator_loss = best_loss = 9999.0
f = open("loss_log.csv", "w")
w = csv.writer(f)
w.writerow(["epoch", "training_loss"])
tqdm_bar_epoch = tqdm(range(max_examples))
tqdm_bar_epoch.set_description("epoch: 0. ")
last_checkpoint = 0

n_examples = 0
while n_examples < max_examples:
    generator.train()
    discriminator.train()

    # Facilita e acelera a transferência de dispositivos (Cpu/GPU)
    train_datamanager = DataManager(train_dataloader, device=device, buffer_size=3)
    for x_train, y_train in train_datamanager:
        # Comodidade para dizer que as saidas sao verdadeiras ou falsas
        true_labels = torch.ones((x_train.shape[0], 1), device=device)
        fake_labels = torch.zeros((x_train.shape[0], 1), device=device)

        # zero the gradients on each iteration
        generator_optimizer.zero_grad()
        discriminator_optimizer.zero_grad()
        with autocast(enabled=use_amp):
            generated_data = generator(x_train)

            # Train the generator
            # We invert the labels here and don't train the discriminator because we want the generator
            # to make things the discriminator classifies as true.
            generator_discriminator_out = discriminator(generated_data)
            generator_loss = criterion(generator_discriminator_out, true_labels)

        generator_scaler.scale(generator_loss).backward()
        generator_scaler.step(generator_optimizer)
        generator_scaler.update()

        # Train the discriminator on the true/generated data
        discriminator_optimizer.zero_grad()
        generator_optimizer.zero_grad()
        with autocast(enabled=use_amp):
            true_discriminator_out = discriminator(y_train)
            true_discriminator_loss = criterion(true_discriminator_out, true_labels)

            # add .detach() here think about this
            generator_discriminator_out = discriminator(generated_data.detach())
            generator_discriminator_loss = criterion(generator_discriminator_out, fake_labels)

            discriminator_loss = (true_discriminator_loss + generator_discriminator_loss) / 2

        discriminator_scaler.scale(discriminator_loss).backward()
        discriminator_scaler.step(discriminator_optimizer)
        discriminator_scaler.update()

        # LR scheduler update
        discriminator_scheduler.step()
        generator_scheduler.step()

        tqdm_bar_epoch.set_description(
            f'current_generator_loss: {total_generator_loss:5.5f}' +
            f'. disc_fake_err: {generator_discriminator_out.detach().mean():5.5f}' +
            f'. disc_real_acc: {true_discriminator_out.detach().mean():5.5f}' +
            f'. gen_lr: {get_lr(generator_optimizer):1.6f}' +
            f'. disc_lr: {get_lr(discriminator_optimizer):1.6f}'
        )
        tqdm_bar_epoch.update(x_train.shape[0])

        n_examples += x_train.shape[0]

        w.writerow([n_examples, total_generator_loss])
        f.flush()

        total_generator_loss = 0.9 * total_generator_loss + 0.1 * generator_loss.detach().item()

        # Checkpoint to best models found.
        if n_examples > last_checkpoint + 100 * batch_size and (best_loss > total_generator_loss or total_generator_loss < 2.0):
            # Update the new best loss.
            best_loss = total_generator_loss
            last_checkpoint = n_examples
            generator.eval()
            torch.save(generator, "checkpoints/best_generator.pth")
            torch.save(generator.state_dict(), "checkpoints/best_generator_state_dict.pth")
            discriminator.eval()
            torch.save(discriminator, "checkpoints/best_discriminator.pth")
            torch.save(discriminator.state_dict(), "checkpoints/best_discriminator_state_dict.pth")
            print("\ncheckpoint!\n")
            generator.train()
            discriminator.train()

        # training is over
        if n_examples > max_examples:
            break

    # Save everything after each epoch
    generator.eval()
    torch.save(generator, "checkpoints/generator.pth")
    torch.save(generator.state_dict(), "checkpoints/generator_state_dict.pth")
    discriminator.eval()
    torch.save(discriminator, "checkpoints/discriminator.pth")
    torch.save(discriminator.state_dict(), "checkpoints/discriminator_state_dict.pth")
    generator.train()
    discriminator.train()
f.close()

## Geração de timbres

Após treinado, pode usar o código abaixo para geração sonora.

In [None]:
import numpy as np
import torch
import sounddevice as sd
from scipy import signal


def polar_to_rect(amplitude, angle):
    return amplitude * (np.cos(angle) + 1j * np.sin(angle))


generator = torch.load("checkpoints/generator.pth", map_location=torch.device("cpu")).train()

ruido_e_classes = torch.normal(mean=0, std=1.0, size=(256, 1)).view(1, 256, 1, 1)

fourier_sintetizado = generator(ruido_e_classes)[0].detach().numpy()

espectro = polar_to_rect(10 ** fourier_sintetizado[0], fourier_sintetizado[1])

t, x = signal.istft(Zxx=espectro, fs=2046, nperseg=2046, noverlap=3 * 2046 // 4, )

x = x.astype(np.int16)

sd.play(x, samplerate=16000, blocking=True)