## Spoken GAN

### A Generative Adversarial Network (GAN) for Spoken Digits

In [None]:
# SOURCE SPOKEN DIGIT DATASET

!git clone https://github.com/Jakobovski/free-spoken-digit-dataset.git

In [1]:
# GET FILE NAMES OF SAMPLES

from os import listdir, getcwd
from os.path import isfile, join

data_dir = join(getcwd(), 'free-spoken-digit-dataset', 'recordings')

file_names = [f for f in listdir(data_dir) if f[-4:] == '.wav']

In [2]:
# IMPORT DATA, PAD IT, ARRANGE INTO TENSOR

from scipy.io import wavfile
import numpy as np

wavs = []
for f in file_names:
    _, wav = wavfile.read(join(data_dir, f))
    wavs.append(wav)
    
max_length = max([w.shape[0] for w in wavs])
# padded_length is least multiple of 128 greater
# than 512 containing max_length
padded_length = -128 * (-max_length // 128)


padded_wavs = []
for wav in wavs:
    pad_size = padded_length - wav.shape[0]
    left_pad = pad_size // 2
    right_pad = pad_size - left_pad
    padded_wavs.append(np.pad(wav.astype(np.float32) / (2 ** 15), (left_pad, right_pad), mode='constant'))
    
X = np.stack(padded_wavs, axis=0)
Y = np.array([[int(f[:1]),] for f in file_names])

In [3]:
import tensorflow as tf
from tensorflow.keras import layers

BUFFER_SIZE = X.shape[0]
BATCH_SIZE = 128

train_dataset = tf.data.Dataset.from_tensor_slices(X).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

In [4]:
from IPython.display import Audio
from IPython import display 

def speak(sample):
    return Audio(sample, rate=8000, autoplay=True)

In [5]:
class InverseSTFTLayer(tf.keras.layers.Layer):
    def __init__(self, frame_length, frame_step):
        super(InverseSTFTLayer, self).__init__()
        self.frame_length = frame_length
        self.frame_step = frame_step
        #self.num_frames = 1 + (num_outputs - frame_length) // frame_step
        #self.input_shape = (self.num_frames, 1 + frame_length // 2)


    def build(self, input_shape):
        self.num_frames = input_shape[-2]
        self.num_outputs = self.frame_length + (self.num_frames - 1) * self.frame_step

    def call(self, input):
        inverse_stft = tf.signal.inverse_stft(
            input, self.frame_length, self.frame_step,
            window_fn=tf.signal.inverse_stft_window_fn(self.frame_step)
        )
        return inverse_stft

In [6]:
class STFTLayer(tf.keras.layers.Layer):
    def __init__(self, frame_length, frame_step):
        super(STFTLayer, self).__init__()
        self.frame_length = frame_length
        self.frame_step = frame_step

    def build(self, input_shape):
        pass
    
    def call(self, input):
        stft = tf.signal.stft(
            input, self.frame_length, self.frame_step,
            pad_end=False
        )
        return stft

In [7]:
class RealLayer(tf.keras.layers.Layer):
    def __init__(self):
        super(RealLayer, self).__init__()

    def build(self, input_shape):
        pass

    def call(self, input):
        real = tf.math.real(input)
        imag = tf.math.imag(input)
        return tf.stack([real, imag], axis=-1)

In [8]:
class ComplexLayer(tf.keras.layers.Layer):
    def __init__(self):
        super(ComplexLayer, self).__init__()

    def build(self, input_shape):
        pass

    def call(self, input):
        real = input[...,0]
        imag = input[...,1]
        return tf.complex(real, imag)

In [9]:
frame_length = 512
frame_step = 128

l = InverseSTFTLayer(frame_length, frame_step)

waveform = tf.random.normal(dtype=tf.float32, shape=[padded_length])
stft = tf.signal.stft(waveform, frame_length, frame_step, pad_end=False)
output = l(stft)


In [10]:
stft.shape

TensorShape([140, 257])

In [11]:
frame_length = 512
frame_step = 128
waveform = tf.random.normal(dtype=tf.float32, shape=[4096])
stft = tf.signal.stft(waveform, frame_length, frame_step, pad_end=False)
inverse_stft = tf.signal.inverse_stft(
    stft, frame_length, frame_step,
    window_fn=tf.signal.inverse_stft_window_fn(frame_step)
)
output = tf.where(tf.math.is_nan(inverse_stft), tf.zeros_like(inverse_stft), inverse_stft)

speak(waveform - output)

In [12]:
# speak(tf.pad(inverse_stft, tf.constant([[0,1024 - inverse_stft.shape[0]]]))[1:] - waveform[1:])

In [13]:
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(10*10*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((10, 10, 256)))
    assert model.output_shape == (None, 10, 10, 256) # Note: None is the batch size
    
    model.add(layers.Conv2DTranspose(128, (5,5), strides=(3,4), padding='valid', use_bias=False))
    assert model.output_shape == (None, 32, 41, 128)
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(64, (6,5), strides=(2,2), padding='valid', use_bias=False))
    assert model.output_shape == (None, 68, 85, 64)
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(2, (6,5), strides=(2,3), padding='valid', use_bias=False, activation='tanh'))
    assert model.output_shape == (None, 140, 257, 2)
    
    model.add(ComplexLayer())
    model.add(InverseSTFTLayer(512, 128))
    
    return model

In [14]:
generator = make_generator_model()

noise = tf.random.normal([1, 100])
generated_speech = generator(noise, training=False)

speak(generated_speech)

In [15]:
def make_discriminator_model():
    model = tf.keras.Sequential()
    
    #model.add(layers.Lambda(lambda x: x, output_shape=[None, padded_length]))
    
    model.add(STFTLayer(512, 128))
    model.add(RealLayer())
    
    model.add(layers.Conv2D(64, (6, 5), strides=(2, 3), padding='same',
                                     input_shape=[140, 257, 2]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

In [16]:
discriminator = make_discriminator_model()

In [17]:
# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [18]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

In [19]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [20]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [21]:
checkpoint_dir = join(getcwd(), 'training_checkpoints')
checkpoint_prefix = join(checkpoint_dir, "spoken_gan_ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

In [22]:
EPOCHS = 50
noise_dim = 100
num_examples_to_generate = 16

# We will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)
seed = tf.random.normal([num_examples_to_generate, noise_dim])


In [23]:
# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(speeches):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_speeches = generator(noise, training=True)

        real_output = discriminator(speeches, training=True)
        fake_output = discriminator(generated_speeches, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))


In [27]:
from time import sleep
from scipy.io.wavfile import write

def generate_and_save_speeches(model, epoch, test_input):
    # Notice `training` is set to False.
    # This is so all layers run in inference mode (batchnorm).
    predictions = model(test_input, training=False)
    
    for i in range(predictions.shape[0]):
        audio = tf.where(tf.math.is_nan(predictions[i,:]), tf.zeros_like(predictions[i,:]), predictions[i,:]).numpy()
        write(join(getcwd(), 'example_wavs', f'epoch_{epoch}_speech_{i}.wav'), 8000, audio)
        Audio(audio, rate=8000, autoplay=True)
        sleep(3)


In [28]:
from time import time

def train(dataset, epochs):
    for epoch in range(epochs):
        start = time()

        for speech_batch in dataset:
              train_step(speech_batch)

        # Produce images for the GIF as we go
        display.clear_output(wait=True)
        generate_and_save_speeches(generator,
                             epoch + 1,
                             seed)

        # Save the model every 15 epochs
        if (epoch + 1) % 15 == 0:
              checkpoint.save(file_prefix = checkpoint_prefix)

        print ('Time for epoch {} is {} sec'.format(epoch + 1, time()-start))

    # Generate after the final epoch
    display.clear_output(wait=True)
    generate_and_save_speeches(generator,
                           epochs,
                           seed)


In [None]:
train(train_dataset, 1000)

Time for epoch 29 is 384.2598841190338 sec
