### mount google drive and unzip training data

In [0]:
from google.colab import drive

drive.mount('/content/drive')


!unzip -qq "/content/drive/My Drive/dataset/train.zip"

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


## process audio data

Load an audio file as a floating point time series using Librosa.

Audio will be automatically resampled to the given rate (default sr=22050)
if the sequence length is greater than 45391 thn we will avoid remaining part and if it is smaller we will pad generated array with zeros and reshaped to  [1,45921] from [4592,]  .
Mapped audio files to target classes and returned array( generated audio series ) with class category number

In [0]:

import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset
from glob import glob
import os
import librosa
from torchvision import datasets
from torchvision import transforms
  
torch.set_default_tensor_type(torch.FloatTensor)

# Function to one hot encode
def one_hot_encode(inp, size):
    temp = [0] * size
    temp[inp] = 1
    return temp


def one_hot_encode_batch(inp, size):
    batch = list()
    for item in inp:
        temp = [0] * size
        temp[item] = 1
        batch.append(temp)
    return torch.Tensor(batch)

class AudioDataset(Dataset):
    def __init__(self, audio_dir, csv_dir, sequence_length, num_classes):
        self.audio_dir = audio_dir
        self.sequence_length = sequence_length
        self.num_classes = num_classes
        
        df = pd.read_csv(csv_dir)
        #print(df['Class'][4890])
        
        names = [str(item) + ".wav" for item in df['ID']]
        targets = list(df['Class'])
        
        class_labels = list(set(targets))
        print(class_labels)
        labels_encoded = {class_labels[i]: i for i in range(len(class_labels))}
        targets = [labels_encoded[item] for item in targets]
        

        self.pairs = list(zip(names, targets))
        
    def __len__(self):
        return len(self.pairs)
    
    def __getitem__(self, idx):
        filepath = os.path.join(self.audio_dir, self.pairs[idx][0])
        #print("x", self.pairs[0][1])
        target = (self.pairs[idx][1])
        #print(target)
        #print(filepath)

        #Loading wav files
        x, sr = librosa.core.load(filepath)
        #print(sr)
        #print(len(x))
        # Sample rate is 22050 Hz
        if len(x) > self.sequence_length:
            x = x[0:self.sequence_length]
            #print(x.shape)
        else:
            # Padding audio files
            if len(x) % 2 == 0:
                x = np.append(np.pad(x, ((self.sequence_length - len(x))//2), "constant", constant_values=0), 0)
            else:
                x = np.pad(x, (self.sequence_length - len(x))//2, "constant", constant_values=0)

        x = x.reshape(1, x.shape[0])
        target = one_hot_encode(int(self.pairs[idx][1]), self.num_classes)
        return x.astype(np.float32), np.array(target, dtype=np.float32)

      
dataset = AudioDataset('/content/Train','train.csv', 45921,10)
print(dataset[12][0])
print(dataset[12][1])
print(len(dataset))
#for x, c in dataset:
#  print(x.shape, c.shape)

['car_horn', 'engine_idling', 'air_conditioner', 'street_music', 'drilling', 'dog_bark', 'jackhammer', 'gun_shot', 'children_playing', 'siren']
[[0. 0. 0. ... 0. 0. 0.]]
[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
5435


### Define DataLoader

In [0]:
#dataset = datasets.ImageFolder(dataset, transform= transforms.ToTensor())
  
dataloader = torch.utils.data.DataLoader(dataset,
                                batch_size=16,
                                num_workers=2,
                                shuffle=True)




In [0]:
#helper function to comeup with parameters like stride kernel and padding

def calc_len(dilation, output_pad,l_in,  kernel, stride, padding):
    # for ConvTranspose1d
    #l_out = (l_in - 1) * stride - 2 * padding + dilation * (kernel - 1) + output_pad + 1
    # for Conv1d
    l_out = (l_in-1 + 2*padding - dilation*(kernel - 1))//stride +1
    return l_out

l = 45921
for i in range(0, 7):
    #l = calc_len(l, 3, 1, 1, 9, 0)
    l = calc_len(l, 3, 5, 1, 25, 0)
    print(l)
l = calc_len(l, 3, 5, 1, 25, 0)    
#l = calc_len(1,1,319,3,2,1)
print(l)
print(81920//512)

1
1
1
1
1
1
1
1
160


### Define Discriminator and Generator class

#### Discriminator Class
It will take input of depth 1 and by applying  convolution we will enhance its depth( by increasing no of out channels ).

after passing through all of the conv layers input will end up with depth 160.

to pass through linear layer we will need 160*512 in_features in FC1 layer, which will in term produce 512 out_features, which  would be used by hidden layers as in_featuers

Out_features by hidden layer will end up in a single value either 0 or 1 or for being fake and real .

#### Structure
Discriminator(

  (conv): ModuleList(
  
    (0): Conv1d(1, 16, kernel_size=(9,), stride=(3,), padding=(1,))
    (1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01, inplace)
    (3): Conv1d(16, 16, kernel_size=(9,), stride=(3,), padding=(1,))
    (4): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): LeakyReLU(negative_slope=0.01, inplace)
     (6): Conv1d(16, 64, kernel_size=(6,), stride=(2,), padding=(1,))
    (7): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.01, inplace)
    (9): Conv1d(64, 64, kernel_size=(6,), stride=(2,), padding=(1,))
    (10): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=0.01, inplace)
    (12): Conv1d(64, 256, kernel_size=(3,), stride=(2,), padding=(1,))
    (13): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): LeakyReLU(negative_slope=0.01, inplace)
     (15): Conv1d(256, 512, kernel_size=(3,), stride=(2,), padding=(1,))
    (16): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (17): LeakyReLU(negative_slope=0.01, inplace)
    (18): Conv1d(512, 512, kernel_size=(3,), stride=(2,), padding=(1,))
    (19): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (20): LeakyReLU(negative_slope=0.01, inplace)
    
  )
   (lrelu): LeakyReLU(negative_slope=0.01, inplace)
  
  (sigmoid): Sigmoid()
  
  (fc1): Linear(in_features=81920, out_features=512, bias=True)
 
 (fc2): Linear(in_features=512, out_features=11, bias=True)
  
  (bn): BatchNorm1d(1, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

)


### Generator Class

#### Structure

Generator(
  (conv): ModuleList(
  
    (0): ConvTranspose1d(512, 256, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (2): LeakyReLU(negative_slope=0.01, inplace)
    
    (3): ConvTranspose1d(256, 128, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (4): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (5): LeakyReLU(negative_slope=0.01, inplace)
    
    (6): ConvTranspose1d(128, 128, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (7): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (8): LeakyReLU(negative_slope=0.01, inplace)
    
    (9): ConvTranspose1d(128, 64, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (10): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (11): LeakyReLU(negative_slope=0.01, inplace)
    
    (12): ConvTranspose1d(64, 32, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (13): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (14): LeakyReLU(negative_slope=0.01, inplace)
    
    (15): ConvTranspose1d(32, 32, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (16): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (17): LeakyReLU(negative_slope=0.01, inplace)
    
    (18): ConvTranspose1d(32, 16, kernel_size=(25,), stride=(3,), padding=(5,))
    
    (19): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    
    (20): LeakyReLU(negative_slope=0.01, inplace)
    
    (21): ConvTranspose1d(16, 1, kernel_size=(25,), stride=(3,), padding=(5,))
    
  )
  
  (lrelu): LeakyReLU(negative_slope=0.01, inplace)
  
  (tanh): Tanh()
  
  (fc1): Linear(in_features=522, out_features=512, bias=True)
)

In [0]:
import torch
from torch import nn


class Discriminator(nn.Module):
    def __init__(self, num_classes=10):
        super(Discriminator, self).__init__()
        self.num_classes = num_classes

        def convblock(in_ch, out_ch, k, s, p):
            init = lambda x: nn.init.kaiming_normal_(x)
            block = nn.ModuleList([nn.Conv1d(in_ch, out_ch, k, s, p), 
                                nn.BatchNorm1d(out_ch), 
                                nn.LeakyReLU(inplace=True)])
            init(block[0].weight)
            return block
        
        #sequence length is 45921
        self.conv = convblock(1, 16, 9, 3, 1)
        self.conv += convblock(16, 16, 9, 3, 1)
        self.conv += convblock(16, 64, 6, 2, 1)
        self.conv += convblock(64, 64, 6, 2, 1)
        self.conv += convblock(64, 256, 3, 2, 1)
        self.conv += convblock(256, 512, 3, 2, 1)
        self.conv += convblock(512, 512, 3, 2, 1) # 160
        
        self.lrelu = nn.LeakyReLU(inplace=True)
        self.sigmoid = nn.Sigmoid()

        self.fc1 = nn.Linear(81920, 512) #160*512 , 512
        self.fc2 = nn.Linear(512, 1 + self.num_classes)
        self.bn = nn.BatchNorm1d(1)

    def forward(self, x):
        x = self.bn(x)
        for layer in self.conv:
            x = layer(x)
        # Flatten x
        x = x.view(x.shape[0], x.numel() // x.shape[0])
        x = self.lrelu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

class Generator(nn.Module):
    def __init__(self, latent_size=512, num_classes=10):
        super(Generator, self).__init__()
        
        def convblock(in_ch, out_ch, k, s, p, tanh=False, linear=False):
            if tanh:
                block = nn.ModuleList([nn.ConvTranspose1d(in_ch, out_ch, k, s, padding=p),
                                    nn.Tanh()])
            elif linear:
                block = nn.ModuleList([nn.ConvTranspose1d(in_ch, out_ch, k, s, padding=p)])

            else:
                block = nn.ModuleList([nn.ConvTranspose1d(in_ch, out_ch, k, s, padding=p),
                                    nn.BatchNorm1d(out_ch),
                                    nn.LeakyReLU(inplace=True)])                
            init = lambda x: nn.init.kaiming_normal_(x)
            init(block[0].weight)
            return block
        
        #sequence length is 45921
        self.conv = convblock(latent_size, latent_size // 2, 25, 3, 5) #512 x 256 ,15
        self.conv += convblock(latent_size // 2, latent_size // 4, 25, 3, 5) # 256 x 128 , 57
        self.conv += convblock(latent_size // 4, latent_size // 4, 25, 3, 5) #128 x 128 , 183
        self.conv += convblock(latent_size // 4, latent_size // 8, 25, 3, 5) #128 x 64 ,561
        self.conv += convblock(latent_size // 8, latent_size // 16, 25, 3, 5) #64 x 32 ,1695
        self.conv += convblock(latent_size // 16, latent_size // 16, 25, 3, 5) #32 x 32,5097
        self.conv += convblock(latent_size // 16, latent_size // 32, 25, 3, 5) #32 X 16,15303
        self.conv += convblock(latent_size // 32, 1, 25, 3, 5, linear=True) #16 x 1, 45921
        
        self.lrelu = nn.LeakyReLU(inplace=True)
        self.tanh = nn.Tanh()

        self.fc1 = nn.Linear(latent_size + num_classes, latent_size)
        
    def forward(self, x):
        #print(x.shape)
        x = self.tanh(self.fc1(x))
        x = x.unsqueeze(2)
        #print(x.shape)
        for layer in self.conv:
            x = layer(x)
            #print(x.shape)
        return x

### Define Generator and Discriminator Losses

In [0]:


class GeneratorLoss:
    def __init__(self, latent_size=512, num_classes=10):
        self.num_classes = num_classes
        self.latent_size = latent_size
        self.criterion = torch.nn.BCELoss()

    def __call__(self, D, G, c_bar):
        D.zero_grad()
        G.zero_grad()

        z = torch.cat([torch.randn((c_bar.shape[0], self.latent_size)),
                        c_bar], dim=1)
        c_bar = torch.cat([torch.zeros((c_bar.shape[0], 1)), c_bar], dim=1)
        
        fake = G(z)
        D_G = D(fake)
        return self.criterion(D_G, c_bar) * -1

class DiscriminatorLoss:
    def __init__(self, latent_size=512, num_classes=10):
        self.num_classes = num_classes
        self.latent_size = latent_size
        self.criterion = torch.nn.BCELoss()

    def __call__(self, D, G, x, c, c_bar):
        D.zero_grad()
        G.zero_grad()

        c = torch.cat([torch.ones((c.shape[0], 1)), c], dim=1)
        d_real = D(x)
        d_real_loss = self.criterion(d_real, c)
        
        #print(c_bar.shape)
        z = torch.randn((c_bar.shape[0], self.latent_size))
        #print(z.shape)
        z = torch.cat([z, c_bar], dim=1)
        #print(z.shape)
        fake = G(z)
        #print(fake.shape)
        c_bar = torch.cat([torch.zeros((c_bar.shape[0], 1)), c_bar], dim=1)
        d_fake = D(fake)
        d_fake_loss = self.criterion(d_fake, c_bar)

        D_cost = d_real_loss + d_fake_loss

        return D_cost, d_real_loss, d_fake_loss
      



#### code to check cuda availability

In [0]:
#Check for a GPU
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
    print('No GPU found. Please use a GPU to train your neural network.')
else:
    print('Training on GPU!')

Training on GPU!


### Training loop



In [0]:
    import os

    import datetime  
    
 
    num_epochs = 100
    num_classes = 10
    batch_size = 16
    num_batches = len(dataset) // batch_size
    latent_size = 512
    model_dir = "/content/savemodel2/"
    checkpoint_frequency = 1
    log_frequency = 1
    
    # Create output directory
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)

    #Instantiate Discriminator and Generator models
    D = Discriminator(num_classes=num_classes)
    G = Generator(latent_size=latent_size, num_classes=num_classes)

    #Defined optimizer with lr and L2 regularization
    d_optim = torch.optim.Adam(D.parameters(), lr=0.0002,)
    g_optim = torch.optim.Adam(G.parameters(), lr=0.0002,)

    #Instantiate Discriminator and Generator losses
    g_cost = GeneratorLoss(latent_size=latent_size, num_classes=num_classes)
    d_cost = DiscriminatorLoss(latent_size=latent_size, num_classes=num_classes)
    
    if torch.cuda.is_available():
        D = D.cuda()
        G = G.cuda()
        torch.set_default_tensor_type(torch.cuda.FloatTensor)
        
    iteration_number = 0
    for ep in range(num_epochs):
        
        for x, c in dataloader:
            x = x.cuda()
            c = c.cuda()
            
            c_bar = torch.randint(low=0, high=num_classes, size=(batch_size, 1))
            c_bar = one_hot_encode_batch(c_bar, num_classes)
            #print(c_bar.size())

            d_losses = d_cost(D, G, x, c, c_bar)    
            d_losses = tuple(d_losses)
            D_loss = d_losses[0]
            D_loss.backward()

            d_optim.step()
            g_loss = g_cost(D, G, c_bar)
            g_loss.backward()

            print("epoch: [{}/{}], batch: [{}/{}], g_loss: {:4f}, D_loss: {:4f}".format(ep, num_epochs, iteration_number % num_batches, num_batches, g_loss.item(), D_loss.item()))
            if iteration_number % checkpoint_frequency == 0:
                torch.save(G.state_dict(), os.path.join(model_dir, "gen_{}.pth".format(iteration_number)))
                torch.save(D.state_dict(), os.path.join(model_dir, "dis_{}.pth".format(iteration_number)))
            
            iteration_number += 1

KeyboardInterrupt: ignored

In [0]:


def gen_sample(class_label, num_classes, gen_model_path, latent_size, output_path, sample_rate=22050):
  # Load model
  G = Generator(latent_size=latent_size, num_classes=num_classes)
  G.load_state_dict(torch.load(gen_model_path))
  
  
  
 
    #Prepare input tensor  
  c_bar = torch.Tensor(one_hot_encode(class_label, num_classes)).unsqueeze(0)
  z = torch.randn((1, latent_size))
  z_new = torch.cat([z, c_bar], dim=1)
  
  # Generate audio
  audio = G(z_new).squeeze(0).detach().cpu().numpy()
  librosa.output.write_wav(output_path, audio.T, sample_rate)
#g = Generator(512,10)
#g.load_state_dict(torch.load("/content/savemodel2/gen_310.pth"),strict=False)
gen_sample(0,10,"/content/savemodel2/gen_310.pth",512, "c.wav" )



In [85]:
from IPython.display import Audio
sound_file = '/content/c.wav'
Audio(sound_file)