<a href="https://colab.research.google.com/github/j4ndrw/bcu-ai-hack-team-2/blob/master/djenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
!mv ./models ./drive/My\ Drive/Djenerator

In [1]:
import torch
from torch import nn, optim
from torch.autograd.variable import Variable
import torchvision

from tensorflow import summary
import tensorflow as tf

import numpy as np

import sys
import datetime
import librosa
import os
from IPython.core.debugger import set_trace

In [15]:
sample_rate = 44100
seconds = 30

placeholder_dataset = []

for wav_file in os.listdir("/content/drive/My Drive/Djenerator/data"):
  if wav_file.endswith(".wav"):
    y, sample_rate = librosa.load(path = f"/content/drive/My Drive/Djenerator/data/{wav_file}", sr = sample_rate)
    y = y[y != 0]
    duration = y.shape[0] // sample_rate
    for i in range(0, duration, seconds):
      placeholder_dataset.append(y[i * sample_rate : (i + seconds) * sample_rate])

num_subsamples = len(placeholder_dataset)
del placeholder_dataset

dataset = np.empty((num_subsamples, sample_rate * seconds), dtype = np.float32)

for wav_file in os.listdir("/content/drive/My Drive/Djenerator/data/"):
  if wav_file.endswith(".wav"):
    y, sample_rate = librosa.load(path = f"/content/drive/My Drive/Djenerator/data/{wav_file}", sr = sample_rate)
    y = y[y != 0]
    duration = y.shape[0] // sample_rate
    for i in range(0, duration, seconds):
      np.append(dataset, y[i * sample_rate : (i + seconds) * sample_rate])
      # dataset.append(y[i * sample_rate : (i + 1) * sample_rate])

In [18]:
dataset.shape, np.max(dataset[2]), np.min(dataset[2]), dataset

((50, 1323000),
 0.5631256,
 -0.5678406,
 array([[ 1.5258789e-05, -1.5258789e-05,  3.0517578e-05, ...,
         -2.2033691e-02, -2.1316528e-02, -4.6112061e-02],
        [-5.3390503e-02, -5.1086426e-02, -2.0339966e-02, ...,
          7.4005127e-03,  1.6098022e-02,  2.4765015e-02],
        [ 3.1936646e-02,  3.7017822e-02,  4.0191650e-02, ...,
         -4.8599243e-02, -3.6102295e-02, -3.3523560e-02],
        ...,
        [ 2.3559570e-01,  3.6682129e-02,  2.0269775e-01, ...,
          8.3251953e-02,  1.7260742e-01,  5.6945801e-02],
        [ 1.8127441e-01,  2.0202637e-02,  1.6107178e-01, ...,
          8.7982178e-02, -7.5408936e-02,  9.0087891e-02],
        [-8.8562012e-02,  8.0749512e-02, -7.7148438e-02, ...,
         -1.7547607e-01, -5.8441162e-02, -2.9122925e-01]], dtype=float32))

In [28]:
class Discriminator(nn.Module):
  def __init__(self, input_features, output_features):
    super(Discriminator, self).__init__()
    self.input_features = input_features
    self.output_features = output_features

    self.l_in = nn.Sequential(
        nn.Linear(
            in_features = self.input_features,
            out_features = 64
        ),
        nn.LeakyReLU(
            negative_slope = 0.2
        ),
        nn.Dropout(0.2)
    )

    self.batch_norm = nn.BatchNorm1d(64, eps = 1e-03, momentum = 0.5)


    self.h1 = nn.Sequential(
        nn.Linear(
            in_features = 64,
            out_features = 32
        ),
        nn.LeakyReLU(
            negative_slope = 0.2
        ),
        nn.Dropout(0.2)
    )

    self.l_out = nn.Sequential(
        nn.Linear(
            in_features = 32,
            out_features = output_features
        ),
        nn.Sigmoid()
    )
  
  def forward(self, x):
    x = self.l_in(x)
    x = self.batch_norm(x)
    x = self.h1(x)
    x = self.l_out(x)
    return x

class Generator(nn.Module):
  def __init__(self, input_features, output_features):
    super(Generator, self).__init__()
    self.input_features = input_features
    self.output_features = output_features

    self.l_in = nn.Sequential(
        nn.Linear(
            in_features = self.input_features,
            out_features = 32
        ),
        nn.ReLU()
    )

    self.batch_norm1 = nn.BatchNorm1d(32, eps = 1e-03, momentum = 0.2)

    self.h1 = nn.Sequential(
        nn.Linear(
            in_features = 32,
            out_features = 64
        ),
        nn.Dropout(0.5),
        nn.ReLU()
    )

    self.batch_norm2 = nn.BatchNorm1d(64, eps = 1e-04, momentum = 0.2)

    self.h2 = nn.Sequential(
        nn.Linear(
            in_features = 64,
            out_features = 128
        ),
        nn.Dropout(0.5),
        nn.ReLU()
    )

    self.l_out = nn.Sequential(
        nn.Linear(
            in_features = 128,
            out_features = output_features
        )
    )

  def nn_sin(self, x):
    return x * torch.sin(x)
  
  def forward(self, x):
    x = self.l_in(x)
    x = self.batch_norm1(x)
    x = self.h1(x)
    x = self.batch_norm2(x)
    x = self.h2(x)
    x = self.nn_sin(self.l_out(x))
    return x

class GAN():
  def __init__(self, dataset, batch_size, shuffle, song_features, noise_vector_latent_dim, num_output_samples):
    
    self.dataset = dataset
    self.batch_size = batch_size
    self.shuffle = shuffle

    self.song_features = song_features

    self.noise_vector_latent_dim = noise_vector_latent_dim
    self.num_output_samples = num_output_samples

    self.data_loader = torch.utils.data.DataLoader(self.dataset, batch_size = self.batch_size, shuffle = self.shuffle)
    self.num_batches = len(self.data_loader)

    self.discriminator = Discriminator(input_features = song_features, output_features = 1)
    self.generator = Generator(input_features = noise_vector_latent_dim, output_features = song_features)

    self.d_opt = optim.Adam(self.discriminator.parameters(), lr = 0.0002, betas=(0.5, 0.999))
    self.g_opt = optim.Adam(self.generator.parameters(), lr = 0.0002, betas=(0.5, 0.999))

    self.samples = []

    self.BCELoss = nn.BCELoss()

  def train_disc(self, opt, real, fake, step):
    opt.zero_grad()

    self.pred_real = self.discriminator(real)

    smoothed_labels = np.zeros((real.size(0), 1), dtype = np.float32)
    for i in range(len(smoothed_labels)):
      smoothed_labels[i] = 0.9
    self.error_real = self.BCELoss(self.pred_real, torch.from_numpy(smoothed_labels))
    self.error_real.backward()

    self.pred_fake = self.discriminator(fake)
    self.error_fake = self.BCELoss(self.pred_fake, torch.zeros(real.size(0), 1))
    self.error_fake.backward()

    opt.step()

    return self.error_real, self.error_fake, self.error_real + self.error_fake

  def train_gen(self, opt, fake, step):
    opt.zero_grad()
    
    self.pred_fake = self.discriminator(fake)
    self.error_fake = self.BCELoss(self.pred_fake, torch.ones(fake.size(0), 1))
    self.error_fake.backward()

    opt.step()

    return self.error_fake

  def noise(self,  N):
    return torch.randn((N, self.noise_vector_latent_dim))

  def challenge_discriminator(self, real: torch.Tensor, noise_size: int, rate: float):
    chance = np.random.randint(0, 100)
    if chance <= int(rate * 100):
      return real * torch.randn(noise_size)
    else:
      return real

  def vec2wave(self, vec, size):
    return vec.view(vec.size(0), size)

  def train(self, epochs, start_epoch, eval_every):
    step = 0

    test_noise = self.noise(self.num_output_samples)

    for epoch in range(start_epoch, epochs):
      for n_batch, real in enumerate(self.data_loader):
        N = real.size(0)
        step += 1

        real = real.view(N, self.song_features)

        noisify_real_rate = 0.01
        if step % 50 == 0:
          noisify_real_rate = 0.1
        if step % 100 == 0:
          noisify_real_rate = 0.2
        if step % 1000 == 0:
          noisify_real_rate = 0.3

        real = self.challenge_discriminator(real = real, noise_size = self.song_features, rate = noisify_real_rate)

        fake = self.generator(self.noise(N)).detach()

        d_error_real, d_error_fake, d_error_total = self.train_disc(self.d_opt, real, fake, step)

        fake = self.generator(self.noise(N))
        g_error = self.train_gen(self.g_opt, fake, step)
        
        sys.stdout.write("\r" + f"d_error_real = {d_error_real:.2f} -> d_error_fake = {d_error_fake:.2f} -> d_error_total = {d_error_total:.2f} -> g_error = {g_error:.2f} -> epoch = {epoch + 1} -> batch = {n_batch + 1} / {self.num_batches}")

        if (epoch + 1) % eval_every == 0 and n_batch == 0:
          sys.stdout.write("\r" + "Updating list of samples | Saving Discriminator model | Saving Generator model")

          torch.save(
          {
              "epoch" : epoch,
              "model_state_dict" : self.discriminator.state_dict(),
              "optimizer_state_dict" : self.d_opt.state_dict(),
              "losses" : [d_error_real, d_error_fake, d_error_total]
          }, 
          "/content/drive/My Drive/Djenerator/models/discriminator.pth")

          torch.save(
          {
              "epoch" : epoch,
              "model_state_dict" : self.generator.state_dict(),
              "optimizer_state_dict" : self.g_opt.state_dict(),
              "losses" : [g_error]
          }, 
          "/content/drive/My Drive/Djenerator/models/generator.pth")    

          self.samples.append(self.vec2wave(self.generator(test_noise), self.song_features).data)
          np.save(f"./content/drive/My Drive/Djenerator/djenerated_samples_raw/{self.num_output_samples}_samples_at_epoch_{epoch + 1}.npy", self.samples[-1].numpy())  
        
  def resume_gan_training(self, epochs, eval_every):
    sys.stdout.write("\r" + "Loading discriminator and generator models")
    discriminator_checkpoint = torch.load("/content/drive/My Drive/Djenerator/models/discriminator.pth")
    generator_checkpoint = torch.load("/content/drive/My Drive/Djenerator/models/generator.pth")

    sys.stdout.write("\r" + "Getting most recent epoch")
    start_epoch = discriminator_checkpoint['epoch']
    
    sys.stdout.write("\r" + "Loading discriminator optimizers")
    self.d_opt = discriminator_checkpoint['optimizer_state_dict']
    self.discriminator.load_state_dict(discriminator_checkpoint['model_state_dict'])

    sys.stdout.write("\r" + "Loading discriminator losses")
    d_error_real, d_error_fake, d_error_total = discriminator_checkpoint['losses'][0], discriminator_checkpoint['losses'][1], discriminator_checkpoint['losses'][2]

    sys.stdout.write("\r" + "Loading generator optimizers")
    self.g_opt = generator_checkpoint['optimizer_state_dict']
    self.generator.load_state_dict(generator_checkpoint['model_state_dict'])

    sys.stdout.write("\r" + "Loading generator loss")
    g_error = generator_checkpoint['losses'][0]

    sys.stdout.write("\r" + "Fetching batch norm gradients")
    self.discriminator.eval()
    self.generator.eval()

    sys.stdout.write("\r" + "Setting models in train mode")
    self.discriminator.train()
    self.generator.train()

    self.train_gan(epochs = epochs, start_epoch = epoch, eval_every = eval_every)
  
  def get_all_generated_samples(self):
    return self.samples

In [29]:
gan = GAN(
  dataset = dataset,
  batch_size = 10,
  shuffle = True,
  song_features = sample_rate * seconds, 
  noise_vector_latent_dim = 100,
  num_output_samples = 10
)

In [None]:
gan.train(start_epoch = 0, epochs = 100000, eval_every = 10)

d_error_real = 0.70 -> d_error_fake = 0.29 -> d_error_total = 0.99 -> g_error = 2.01 -> epoch = 244 -> batch = 3 / 5

In [30]:
gan.resume_gan_training(epochs = 100000, eval_every = 300)

Setting models in train mode

AttributeError: ignored

In [None]:
samples = gan.get_all_generated_samples()

In [None]:
np.save(f"/content/drive/My Drive/Djenerator/djenerated_samples_raw/10_samples_at_epoch_10.npy", samples[-1].numpy())