# Structure in GAN
    GAN trained on the SC09 dataset, which consists of raw audio files of the words 'zero', 'one', ..., 'nine.'

In [16]:
import wave, os, glob
from IPython.display import display, Audio
from scipy.io.wavfile import read
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

from utils import *
from models import *

# for auto-reloading external modules
# see http://stackoverflow.com/questions/1907993/autoreload-of-modules-in-ipython
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


First load dataset. 
test, train, and validation are lists of dictionaries with the following fields:
<br> &emsp; filename: path to file
<br> &emsp; data: numpy array of the .wav file
<br> &emsp; label: label of the data, an integer between 0 and 9 (inclusive)

In [2]:
PATH_TEST = './sc09/test'
PATH_TRAIN = './sc09/train'
PATH_VALID = './sc09/valid'

to_num = {'Zero': 0, 'One': 1, 'Two': 2, 'Three': 3, 'Four': 4, 'Five': 5,
    'Six': 6, 'Seven': 7, 'Eight': 8, 'Nine': 9}

test, train, valid = [], [], []

for filename in glob.glob(os.path.join(PATH_TRAIN, '*.wav')):
    d = read(filename)
    label = to_num[filename.split('_')[0][13:]]
    data = {'filename': filename, 'data': d[1], 'label' : label}
    train.append(data)
  
for filename in glob.glob(os.path.join(PATH_TEST, '*.wav')):
    d = read(filename)
    label = to_num[filename.split('_')[0][12:]]
    data = {'filename': filename, 'data': d[1], 'label' : label}
    test.append(data)
    
for filename in glob.glob(os.path.join(PATH_VALID, '*.wav')):
    d = read(filename)
    label = to_num[filename.split('_')[0][13:]]
    data = {'filename': filename, 'data': d[1], 'label' : label}
    valid.append(data)
    
print(f'test contains: {len(test)} items')
print(f'train contains: {len(train)} items')
print(f'valid contains: {len(valid)} items')

test contains: 2552 items
train contains: 18620 items
valid contains: 2494 items


Quick demo of the dataset being used.

In [3]:
idx = 2300
play(train[idx]['data'])
print(train[idx]['label'])

play(test[idx]['data'])
print(test[idx]['label'])

play(valid[idx]['data'])
print(valid[idx]['label'])

5


2


0


# Preprocess Training Data

In [4]:
train_array = [np.concatenate([train[i]['data'], np.zeros(16000 - train[i]['data'].shape[0])]) for i in range(len(train))]
train_array = np.stack(train_array)
test_array = [np.concatenate([test[i]['data'], np.zeros(16000 - test[i]['data'].shape[0])]) for i in range(len(test))]
test_array = np.stack(test_array)
valid_array = [np.concatenate([valid[i]['data'], np.zeros(16000 - valid[i]['data'].shape[0])]) for i in range(len(valid))]
valid_array = np.stack(valid_array)

print(f'train_array has shape {train_array.shape}')
print(f'test_array has shape {test_array.shape}')
print(f'valid_array has shape {valid_array.shape}')

train_array has shape (18620, 16000)
test_array has shape (2552, 16000)
valid_array has shape (2494, 16000)


Normalizing each audio array has no effect on audio output

In [6]:
play(train_array[0])
play(normalize(train_array[0].copy()))

In [8]:
# normalize all data
train_array = normalize(train_array)
test_array = normalize(test_array)
valid_array = normalize(valid_array)

#  Sanity check for D and G architectures

In [10]:
G = Generator().double()
D = Discriminator().double()

z = from_numpy(np.random.uniform(-1, 1, size=(4, 64)))
G_z = to_numpy(G(z))
for i in range(4):
    display(Audio(G_z[i], rate = 16000))

print(to_numpy(D(from_numpy(G_z))))

[[0.43610784]
 [0.49248111]
 [0.56525138]
 [0.55424417]]


# Testing Accuracy of Random D should be 0.5

In [11]:
def test_discriminator(D, fake_data, real_data):
    assert(len(fake_data) == len(real_data))
    # Assuming that fake_label = 0, real_labael = 1
    fake_pred = D(fake_data) < 0.5
    real_pred = D(real_data) > 0.5
    accuracy = torch.sum(fake_pred) + torch.sum(real_pred)
    return (accuracy / (2*len(fake_data))).item()

In [14]:
D = Discriminator().float()
test_batch_size = len(test_array)

fake_data = from_numpy(normalize(np.random.uniform(-1, 1, size=(test_batch_size, 16000)))).float()
real_data = test_array[:test_batch_size].copy()
np.random.shuffle(real_data)
real_data = from_numpy(real_data).float()

print(test_discriminator(D, fake_data, real_data))

0.5597570538520813


# Test that Discriminator can Learn to Distinguish Real from Noise

In [None]:
batch_size = 100
shuffle = True
lr = 0.001
num_epochs = 2
weight_decay = 0.99 # Discriminator weights decay
gamma = 0.95 # LR decay factor per epoch

D = Discriminator().float()
optimizer = optim.Adam(D.parameters(), lr=lr, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=gamma)
real_label = 1.
fake_label = 0.
loss = nn.BCELoss()

dataloader = DataLoader(train_array, batch_size, shuffle=shuffle)
loss_log = []
accuracy_log = []

test_batch_size = len(test_array)
real_test_data = test_array[:test_batch_size].copy()
np.random.shuffle(real_test_data)
real_test_data = from_numpy(real_test_data).float()

for epoch in range(num_epochs):
    for i, data in enumerate(dataloader, 0):
        b_size = len(data)
        real_data = data.float()
        fake_data = from_numpy(normalize(np.random.uniform(-1, 1, size=(b_size, 16000)))).float()
        real_y = torch.full(size=(b_size, 1), fill_value=real_label)
        fake_y = torch.full(size=(b_size, 1), fill_value=fake_label)
        
        real_pred = D(real_data.detach())
        fake_pred = D(fake_data.detach())
        D_loss = loss(real_pred, real_y) + loss(fake_pred, fake_y)
        D_loss.backward()
        optimizer.step()
        
        loss_log.append(D_loss.item())
        if i%(len(dataloader)//10) == 0:
            fake_test_data = from_numpy(normalize(np.random.uniform(-1, 1, size=(test_batch_size, 16000)))).float()
            accuracy = test_discriminator(D, fake_test_data, real_test_data).item()
            accuracy_log.append(accuracy)
            
            print(f'batch: {i}/{len(dataloader)} \t D_error = {D_loss.item()} \t accuracy = {accuracy}')

    torch.save(D.state_dict(), './model_checkpoints/D_iter'+str(epoch)+'_test.pt')
    scheduler.step()

In [None]:
x_loss = np.array([i for i in range(len(loss_log))])
plt.plot(x_loss, loss_log)
plt.title('Training loss vs batches')
plt.show()

In [None]:
x_accuracy = np.array([i for i in range(len(accuracy_log))])
plt.plot(x_accuracy, accuracy_log)
plt.title('Test Accuracy vs Epochs')
plt.show()

# Define Training Parameters


In [None]:
G = Generator().float()
D = Discriminator().float()
# G.load_state_dict(torch.load('./model_checkpoints/G_iter5.pt'))
# D.load_state_dict(torch.load('./model_checkpoints/D_iter5.pt'))

batch_size = 100
shuffle = True
lr = 0.001
num_epochs = 5

optimizerD = optim.Adam(D.parameters(), lr=lr, weight_decay=0.99)
optimizerG = optim.Adam(G.parameters(), lr=lr, weight_decay=0.99)
schedulerD = optim.lr_scheduler.ExponentialLR(optimizerD, gamma=0.95)
schedulerG = optim.lr_scheduler.ExponentialLR(optimizerG, gamma=0.95)
real_label = 1.
fake_label = 0.
loss = nn.BCELoss()

dataloader = DataLoader(train_array, batch_size, shuffle=shuffle)

G_losses, D_losses, D_accuracy = [], [], []
fixed_noise = from_numpy(np.random.uniform(-1, 1, size=(2, 64))).float()
G_z = to_numpy(G(fixed_noise))
for i in range(2):
    display(Audio(G_z[i], rate = 16000))

# Using Minimax Loss for GAN

In [None]:
for epoch in range(num_epochs):
    print(f'########## Epoch {epoch} ##########')
    for i, data in enumerate(dataloader, 0):
        b_size = len(data)
        data = data.float()
        
        # Update Discriminator network with batch of real data
        D.zero_grad()
        label = torch.full((b_size, 1), real_label, dtype=torch.float)
        output = D(data) # Forward pass through D
        D_error_real = loss(output, label)
        D_error_real.backward() # Get gradients for backward pass
        
        # Update Discriminator network with batch of fake data
        z = from_numpy(np.random.uniform(-1, 1, size=(b_size, 64))).float()
        fake_data = G(z)
        label.fill_(fake_label)
        output = D(fake_data.detach())
        D_error_fake = loss(output, label)
        D_error_fake.backward()
        D_error = D_error_fake + D_error_real # Get sum of gradients over fake and real data

        optimizerD.step()
        
        # Update Generator network 
        G.zero_grad()
        label.fill_(real_label) # the fake labels are the real labels for generator
        output = D(fake_data)
        G_error = loss(output, label)
        G_error.backward() # Compute backward pass gradients
        optimizerG.step()
        
        
        # Log Data 
        G_losses.append(G_error)
        D_losses.append(D_error)
        
        real_test_data = test_array.copy()
        np.random.shuffle(real_test_data)
        real_test_data = from_numpy(real_test_data).float()
        accuracy = test_discriminator(D, fake_data.float(), real_test_data[0:len(fake_data)].float()).item()
        accuracy_log.append(accuracy)
        D_accuracy.append(accuracy)
        
        if i%(len(dataloader)//10) == 0:
            print(f'batch: {i}/{len(dataloader)} \t D_error = {D_error.item()} \t D_acc = {accuracy} \t G_error = {G_error.item()}')
    
    torch.save(G.state_dict(), './model_checkpoints/G_iter'+str(epoch)+'_lr01_schedular95.pt')
    torch.save(D.state_dict(), './model_checkpoints/D_iter'+str(epoch)+'_lr01_schedular95.pt')
    
    schedulerG.step()
    schedulerD.step()
    
    G_z = to_numpy(G(fixed_noise))
    for i in range(2):
        display(Audio(G_z[i], rate = 16000))
    print()   

In [None]:
x = np.array([i for i in range(len(G_losses))])
plt.plot(x, G_losses, label = 'G_losses')
plt.plot(x, D_losses, label = 'D_losses')
plt.plot(x, D_accuracy, label = 'D_accuracy')
plt.legend()
plt.show()

In [None]:
real_data = from_numpy(test_array).float()
random_fake_data = from_numpy(normalize(np.random.uniform(-1, 1, size = (len(test_array), 16000)))).float()
print(test_discriminator(D, random_fake_data, real_data).item())

noise = from_numpy(np.random.uniform(-1, 1, size = (len(test_array), 64))).float()
g_fake_data = G(noise)
print(test_discriminator(D, g_fake_data, real_data))

# Using Wasserstein Metric for GAN

In [18]:
wG = Generator().float()
C = Critic().float()

batch_size = 100
shuffle = True
lr = 0.001
num_epochs = 3
critic_iterations = 5

optimizer_C = optim.Adam(C.parameters(), lr=lr, weight_decay=0.99)
optimizer_wG = optim.Adam(wG.parameters(), lr=lr, weight_decay=0.99)
scheduler_C = optim.lr_scheduler.ExponentialLR(optimizer_C, gamma=0.95)
scheduler_wG = optim.lr_scheduler.ExponentialLR(optimizer_wG, gamma=0.95)

# Fake label now changes to -1 for ease of computation in WGAN
real_label = 1.
fake_label = -1.

dataloader = DataLoader(train_array, batch_size, shuffle=shuffle)

wG_losses, C_losses = [], []
fixed_noise = from_numpy(np.random.uniform(-1, 1, size=(2, 64))).float()
wG_z = to_numpy(wG(fixed_noise))
for i in range(2):
    display(Audio(wG_z[i], rate = 16000))
    

buffer_size = 5
buffer = [C]

In [None]:
for epoch in range(num_epochs):
    print(f'########## Epoch {epoch} ##########')
    for i, data in enumerate(dataloader, 0):
        b_size = len(data)
        data.float()
        
        # Training critic 
        for _ in range(critic_iterations):
            noise = from_numpy(np.random.uniform(-1, 1, size=(b_size, 64))).float()
            fake_data = wG(noise)
            c_pred_real = C(data.float())
            c_pred_fake = C(fake_data)
            loss_critic = -(torch.mean(c_pred_real - c_pred_fake))
            C.zero_grad()
            loss_critic.backward(retain_graph=True)
            optimizer_C.step()
    
        # Training Generator
        loss_gen = 0
        for critic in buffer:
            pred_fake = critic(fake_data)
            loss_gen += -torch.mean(pred_fake) 
        wG.zero_grad()
        loss_gen.backward()
        optimizer_wG.step()
        
        # Log Data
        wG_losses.append(loss_gen.item())
        C_losses.append(loss_critic.item())
        
        if i%(len(dataloader)//10) == 0:
            print(f'batch: {i}/{len(dataloader)} \t C_error = {loss_critic.item()} \t G_error = {loss_gen.item()}')
            wG_z = to_numpy(wG(fixed_noise))
            for i in range(2):
                display(Audio(wG_z[i], rate = 16000))
            
            # Update buffer
            temp_C = Critic().float()
            temp_C.load_state_dict(C.state_dict())
            if len(buffer) < buffer_size:
                buffer.append(temp_C)
            else:
                buffer.pop(0)
                buffer.append(temp_C)
                
            # Save model
            torch.save(wG.state_dict(), './model_checkpoints/G_iter'+str(epoch)+'_batch'+str(i)+'_wasser.pt')
            torch.save(C.state_dict(), './model_checkpoints/C_iter'+str(epoch)+'_batch'+str(i)+'_wasser.pt')
            
    scheduler_wG.step()
    scheduler_C.step()

    print()

# Testing the Discriminator on Test Data and Generated Data

In [23]:
G = Generator().float()
G.load_state_dict(torch.load('./model_checkpoints/G_iter1_batch1_wasser.pt'))

noise = from_numpy(np.zeros((64, 64)))
for i in range(len(noise)):
    noise[i][i] = 50

fake = to_numpy(G(noise))
for sample in fake:
    play(sample)

In [None]:
x = [i for i in range(len(G_losses))]
G_losses_np = [to_numpy(G_losses[i]) for i in range(len(G_losses))]
D_losses_np = [to_numpy(D_losses[i]) for i in range(len(D_losses))]
plt.plot(x, G_losses_np, label = 'G')
plt.plot(x, D_losses_np, label = 'D')
plt.legend()
plt.show()

In [None]:
z = from_numpy(np.random.uniform(-1, 1, size = 64))
z[51] = 1000000
G_z = to_numpy(G(z))
display(Audio(G_z, rate = 16000))

print(to_numpy(D(from_numpy(test_array[0]))))
print(to_numpy(D(from_numpy(G_z))))