In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import os
import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import librosa
import IPython.display as ipd
import glob
import random
import warnings
import re
from sklearn import preprocessing
import pickle

In [2]:
audio_dir = os.path.join(os.getcwd(), 'music-data')

In [3]:
songs = pd.read_csv(audio_dir + '/fma_metadata/tracks-edited.csv')
electronic_genres = [15, 26, 107, 181, 468]
songs.head()

Unnamed: 0,track_id,comments,date_created,date_released,engineer,favorites,id,information,listens,producer,...,information.1,interest,language_code,license,listens.1,lyricist,number,publisher,tags.2,title.1
0,2,0,2008-11-26 01:44:45,2009-01-05 00:00:00,,4,1,<p></p>,6073,,...,,4656,en,Attribution-NonCommercial-ShareAlike 3.0 Inter...,1293,,3,,[],Food
1,3,0,2008-11-26 01:44:45,2009-01-05 00:00:00,,4,1,<p></p>,6073,,...,,1470,en,Attribution-NonCommercial-ShareAlike 3.0 Inter...,514,,4,,[],Electric Ave
2,5,0,2008-11-26 01:44:45,2009-01-05 00:00:00,,4,1,<p></p>,6073,,...,,1933,en,Attribution-NonCommercial-ShareAlike 3.0 Inter...,1151,,6,,[],This World
3,10,0,2008-11-26 01:45:08,2008-02-06 00:00:00,,4,6,,47632,,...,,54881,en,Attribution-NonCommercial-NoDerivatives (aka M...,50135,,1,,[],Freeway
4,20,0,2008-11-26 01:45:05,2009-01-06 00:00:00,,2,4,"<p> ""spiritual songs"" from Nicky Cook</p>",2710,,...,,978,en,Attribution-NonCommercial-NoDerivatives (aka M...,361,,3,,[],Spiritual Level


In [4]:
song_genres = songs.genres.to_numpy()
r = re.compile(r"15|26|107|181|468")
vmatch = np.vectorize(lambda x: bool(r.findall(x)))
genre_match = vmatch(song_genres)
matched_songs = songs.iloc[np.where(genre_match == True)[0]]

song_ids = matched_songs.track_id.to_numpy()

In [5]:
filenames = glob.glob(audio_dir + '/fma_large/' + '/*[0-9]/*')

In [6]:
matched_fns = []
for i in tqdm.trange(len(filenames)):
    song_id = int(filenames[i][-10:-4])
    if song_id in song_ids:
        matched_fns.append(filenames[i])
len(matched_fns)

100%|██████████| 106574/106574 [00:01<00:00, 95193.06it/s]


31429

In [7]:
# with open('electronic-songs.p', 'wb') as f:
#     pickle.dump(matched_fns, f)

In [8]:
random.shuffle(matched_fns)

SR = 14400
INPUT_LENGTH = 10500
BATCH_SIZE = 32
SOUND_DIM = 8

In [10]:
x, _ = librosa.load(matched_fns[100], sr=SR, res_type='kaiser_fast')
print('Duration: {:.2f}s, {} samples'.format(x.shape[-1] / SR, x.shape))
SAMPLE_SIZE = x.shape[0]
ipd.Audio(data=x, rate=SR)



Duration: 29.98s, (431663,) samples


In [11]:
spec = librosa.feature.mfcc(y=x, sr=SR)
spec

array([[-247.74263   , -248.27437   , -238.43266   , ..., -164.25858   ,
        -154.55293   , -212.01587   ],
       [  63.409904  ,   63.296135  ,   80.54374   , ...,   64.3367    ,
          61.251442  ,   79.47264   ],
       [  54.941032  ,   55.330708  ,   33.98622   , ...,   24.137676  ,
          30.889923  ,   43.935837  ],
       ...,
       [ -17.741302  ,  -14.586364  ,   -8.863795  , ...,    0.87686574,
          -2.9603317 ,   -7.3332005 ],
       [  -5.2362165 ,   -6.1873035 ,   -2.8552406 , ...,   -4.8472204 ,
          -7.383808  ,   -2.5500135 ],
       [  21.427002  ,   20.794632  ,   23.740517  , ...,    4.1179433 ,
           5.14603   ,    6.244093  ]], dtype=float32)

In [12]:
normalizer = preprocessing.MaxAbsScaler()
f = normalizer.fit_transform(spec)

In [51]:
librosa.feature.inverse.mfcc_to_audio(spec)

array([-0.02321216, -0.03481547, -0.04106137, ..., -0.01067016,
       -0.00616931, -0.00600537], dtype=float32)

In [35]:
normalizer.inverse_transform(f)

array([[-78.80318  , -50.479237 , -37.088512 , ...,   6.723261 ,
         -1.5510135,  -0.6545768],
       [102.28461  ,  91.50378  ,  84.20572  , ...,  95.293    ,
         84.87894  ,  79.74289  ],
       [-28.15629  , -25.982609 , -21.49276  , ..., -30.35669  ,
        -35.344376 , -31.721764 ],
       ...,
       [ -3.4251401,  -2.0955262,   0.7203481, ...,  -3.5904326,
         -0.265198 ,  -1.4823204],
       [ -8.834181 , -12.996206 , -16.446259 , ..., -19.715181 ,
        -18.81232  , -14.429343 ],
       [ -1.5332875,   4.862012 ,   7.945154 , ...,  -5.447157 ,
         -8.853652 ,  -8.368054 ]], dtype=float32)

In [13]:
# https://www.kaggle.com/fizzbuzz/beginner-s-guide-to-audio-data
def load_mp3s():
    for i in range(len(filenames)):
        # ignore PySoundFile failure warning
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            try:
                x, _ = librosa.load(filenames[i], sr=SR, res_type='kaiser_fast')
            except:
                continue

        # random offset / padding
        if len(x) > INPUT_LENGTH:
            max_offset = len(x) - INPUT_LENGTH
            offset = np.random.randint(max_offset)
            x = x[offset:(INPUT_LENGTH+offset)]
        else:
            if INPUT_LENGTH > len(x):
                max_offset = INPUT_LENGTH - len(x)
                offset = np.random.randint(max_offset)
            else:
                offset = 0
            x = np.pad(x, (offset, INPUT_LENGTH - len(x) - offset), 'constant')
        
        # audio data is loaded in the range [-1, 1]
        yield x.reshape(-1, 1) # + pos_encoding ?

In [14]:
# https://www.kaggle.com/fizzbuzz/beginner-s-guide-to-audio-data
def load_spectrograms():
    max_dim = 844
    for i in range(len(filenames)):
        # ignore PySoundFile failure warning
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            try:
                x, _ = librosa.load(filenames[i], sr=SR, res_type='kaiser_fast')
            except:
                continue

        spec = librosa.feature.mfcc(y=x, sr=SR)
        spec = normalizer.fit_transform(spec)  # normalize in range [-1, 1]

        if spec.shape[-1] < max_dim:
            offset = max_dim - spec.shape[-1]
            spec = np.pad(spec, ((0, 0), (0, offset)), 'constant')
        elif spec.shape[-1] > max_dim:
            spec = spec[:, :844]
        yield spec

In [15]:
dataset = tf.data.Dataset.from_generator(load_mp3s, (tf.float32)).batch(BATCH_SIZE)
spec_dataset = tf.data.Dataset.from_generator(load_spectrograms, (tf.float32)).batch(BATCH_SIZE)

In [16]:
@tf.function
def scaled_dot_product_attention(q, k, v):
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    z = tf.matmul(q, k, transpose_b=True) / tf.math.sqrt(dk)
    attn_weights = tf.nn.softmax(z)
    output = tf.matmul(tf.nn.softmax(z, axis=-1), v)
    return output, attn_weights

In [17]:
class MultiHeadAttn(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttn, self).__init__()
        
        self.num_heads = num_heads
        self.d_model = d_model
        
        self.depth = d_model // num_heads
        
        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)
        
        self.linear = tf.keras.layers.Dense(d_model)
        
    def split_heads(self, x, batch_size):
        # split last dimension into (num_heads, depth)
        # transpose result to shape (batch_size, num_heads, seq_len, depth)
        
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, q, k, v):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attn, attn_weights = scaled_dot_product_attention(q, k, v)
        
        scaled_attn = tf.transpose(scaled_attn, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, num_heads, depth)
        concat_attn = tf.reshape(scaled_attn, (batch_size, -1, self.d_model)) # (batch_size, seq_len_q, d_model)
        
        output = self.linear(concat_attn) # (batch_size, seq_len_q, d_model)
        
        return output, attn_weights

In [18]:
class Generator(tf.keras.Model):
    def __init__(self, input_length, d_model):
        super(Generator, self).__init__()
        
        self.seq_len = input_length
        self.d_model = d_model

        self.leaky_relu = layers.LeakyReLU()
        self.layer_norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm3 = layers.LayerNormalization(epsilon=1e-6)
        
        self.dense1 = layers.Dense(self.seq_len * d_model, input_shape=(8,))
        self.mha1 = MultiHeadAttn(d_model, 4)
        self.mha2 = MultiHeadAttn(d_model, 4)

        self.linear = layers.Dense(self.seq_len)
        self.fake_output = layers.Dense(d_model, activation='tanh')
        
    def call(self, x):
        x = self.dense1(x)
        x = self.layer_norm1(x)
        x = self.leaky_relu(x)
        
        x = tf.reshape(x, [-1, self.seq_len, self.d_model])

        # self attention blocks
        x, _ = self.mha1(x, x, x)
        x = self.layer_norm2(x)
        x = self.leaky_relu(x)
        
        x, _ = self.mha2(x, x, x)
        x = self.layer_norm3(x)
        x = self.leaky_relu(x)
        
        x = self.linear(x)
        return self.fake_output(x)

In [19]:
class Discriminator(tf.keras.Model):
    def __init__(self, input_length, d_model, rate=0.1):
        super(Discriminator, self).__init__()
        
        self.seq_len = input_length
        self.d_model = d_model
        
        self.leaky_relu = layers.LeakyReLU()
        self.flatten = layers.Flatten()
        
        self.mha1 = MultiHeadAttn(d_model, 4)
        self.mha2 = MultiHeadAttn(d_model, 4)

        self.linear = layers.Dense(d_model)
        self.out = layers.Dense(1)

        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.dropout3 = layers.Dropout(rate)
        
    def call(self, x, training=True):
        attn1, _ = self.mha1(x, x, x)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1)
        out1 = self.leaky_relu(out1)

        attn2, _ = self.mha2(out1, out1, out1)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2)
        out2 = self.leaky_relu(out2)

        out3 = self.linear(out2)
        out3 = self.dropout3(out3, training=training)
        out3 = self.layernorm3(out3)
        out3 = self.leaky_relu(out3)

        prediction = self.flatten(out3)
        return self.out(prediction)

In [20]:
generator = Generator(20, 844)
discriminator = Discriminator(20, 844)

In [21]:
gen_opt = tf.keras.optimizers.Adam(1e-4)
dis_opt = tf.keras.optimizers.Adam(1e-6)

checkpoint_dir = "./mgan-spec"
checkpoint_prefix = os.path.join(checkpoint_dir, "spec_ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=gen_opt,
                                 discriminator_optimizer=dis_opt,
                                 generator=generator,
                                 discriminator=discriminator)

checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7efb567c4e90>

In [91]:
seed = tf.random.normal([1, 6])
s = generator(seed, training=False)

In [92]:
inverted = normalizer.inverse_transform(s[0])

In [95]:
sample = librosa.feature.inverse.mel_to_audio(inverted, sr=SR, dtype=np.float64)

In [100]:
ipd.Audio(data=sample, rate=SR)

In [105]:
os.path.join(".", "woow.wav")

'./woow.wav'

In [102]:
librosa.output.write_wav('sample.wav', sample, SR, norm=True)

In [24]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def d_loss(real_output, fake_output):
    
    def random_noise(output):
        return 0.05 * tf.random.uniform(output.shape)
    
    scale = 1 / BATCH_SIZE
    real_loss = cross_entropy(tf.ones_like(real_output) + random_noise(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output) + random_noise(fake_output), fake_output)
    
    # (1/M) * (cross_entropy loss)
    total_loss = (1 / BATCH_SIZE) * (real_loss + fake_loss)
    return total_loss

def g_loss(fake_output):
    total_loss = (1 / BATCH_SIZE) * cross_entropy(tf.ones_like(fake_output), fake_output)
    return total_loss

In [25]:
EPOCHS = 50
noise_dim = 6
num_samples = 5

d_metric_loss = tf.keras.metrics.BinaryCrossentropy(name='dis_loss')
g_metric_loss = tf.keras.metrics.BinaryCrossentropy(name='gen_loss')

random.seed(8)

@tf.function
def train_step(audio, sgd_rate=32):
    noise = tf.random.normal([audio.shape[0], noise_dim])
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as dis_tape:
        generated_audio = generator(noise, training=True)
        
        real_output = discriminator(audio, training=True)
        fake_output = discriminator(generated_audio, training=True)
        
        dis_loss = d_loss(real_output, fake_output)
        gen_loss = g_loss(fake_output) 
    
    dis_grads = dis_tape.gradient(dis_loss, discriminator.trainable_variables)
    gen_grads = gen_tape.gradient(gen_loss, generator.trainable_variables)

    dis_opt.apply_gradients(zip(dis_grads, discriminator.trainable_variables))
    gen_opt.apply_gradients(zip(gen_grads, generator.trainable_variables))
    
    d_metric_loss(real_output, fake_output)
    g_metric_loss(tf.ones_like(real_output), fake_output)

In [None]:
for e in range(EPOCHS):
    pbar = tqdm.tqdm(total=len(filenames) // BATCH_SIZE)
    for i, batch in enumerate(spec_dataset):
        train_step(batch)
        description = "Epoch: {} | gen loss: {:.4f} | dis loss: {:.4f}".format(e+1, g_metric_loss.result(), d_metric_loss.result())
        pbar.set_description(description)

        # checkpoint every 2000 batches
        if i % 2000 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

        pbar.update(1)

    pbar.close()

    g_metric_loss.reset_states()
    d_metric_loss.reset_states()