<a href="https://colab.research.google.com/github/masa512/audio_ML/blob/main/basic_audio_nn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Audio Deep Learning Basics

reference : https://pytorch.org/tutorials/intermediate/speech_command_classification_with_torchaudio_tutorial.html

the architecture : 
https://arxiv.org/pdf/1610.00087.pdf

## Dataloader generation

In [None]:
import torchaudio 
import torch
import os
import matplotlib.pyplot as plt
import torch.nn as nn
import numpy as np
from tqdm import tqdm

In [None]:
# Change device to cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Load classification dataset class and randomly split dataset into 80% Train and 20% Test
SC_data = torchaudio.datasets.SPEECHCOMMANDS('.', download=True)

# The partitioning
N_total = SC_data.__len__()
N_train = int(N_total*0.8)
N_test = N_total-N_train

train_data, test_data = torch.utils.data.random_split(SC_data,[N_train,N_test])


In [None]:
# We will look at the data content in detail

'''
train_data

1. audio (X)
2. Sample rate
3. Classification (Y)
4. Speaker ID
5. Utterance number
'''

print(train_data[0])
#train_data[0]
train_data[0]

train_data.__len__()


## Observe the Data a bit more visually

In [None]:
# List file names starting with char and is a folder

label_names = [name for name in os.listdir("./SpeechCommands/speech_commands_v0.02") if name[0].isalpha() and os.path.isdir(os.path.join('./SpeechCommands/speech_commands_v0.02',name))]

# The number labels for this dataset is shown below
print(len(label_names))

# Read number of audios per label
label_cnt = [len(os.listdir(os.path.join("./SpeechCommands/speech_commands_v0.02",label))) for label in label_names]



In [None]:
# Visualize the count using pie chart 
plt.figure(figsize =(10,10))
plt.pie(label_cnt,labels = label_names)
plt.title('Label Distribution')
plt.show()

## The Model : M5

The model is based on elementary conv network


In [None]:
class conv_module(nn.Module):
  # The chain of 1D Conv -> BN -> Relu

  def __init__(self,in_channel,out_channel,kernel_size,stride = 1):
    super().__init__()

    self.conv = nn.Conv1d(in_channel, out_channel, kernel_size, stride = 1)
    self.bn = nn.BatchNorm1d(out_channel)
    self.relu = nn.ReLU()
  
  def forward(self,x):
    x = self.conv(x)
    x = self.bn(x)
    return self.relu(x)

class M5_model(nn.Module):
  
  def __init__(self,in_channel,base_channel,n_class):
    super().__init__()
    self.conv1 = conv_module(in_channel,base_channel,kernel_size=80, stride = 16)
    self.conv2 = conv_module(base_channel,base_channel,kernel_size=3)
    self.conv3 = conv_module(base_channel,2 * base_channel,kernel_size=3)
    self.conv4 = conv_module(2 * base_channel,2 * base_channel,kernel_size=3)

    self.pool = nn.MaxPool1d(4)
    self.fc1 = nn.Linear(2 * base_channel, n_class)


  def forward(self,x):

    x = self.conv1(x)
    x = self.pool(x)

    x = self.conv2(x)
    x = self.pool(x)

    x = self.conv3(x)
    x = self.pool(x)

    x = self.conv4(x)
    x = self.pool(x)

    # Compress the signal dimension to length 1 using Average pooling
    x = nn.functional.avg_pool1d(x,x.shape[-1]) # Last dim has length 1
    # We then shuffle the dimensions so that channel dimension comes last
    x = x.permute(0,2,1)
    # Then we do a last FC to make the channel dimenson output the n_class
    x = self.fc1(x)

    # Do a softmax over the channel dimension (last dim)
    x = nn.functional.log_softmax(x,dim=-1)

    return x

In [None]:
# Test code with length 16000

x = torch.randn(1, 1, 13000)
model = M5_model(1,32,35)
y = model(x)
print(y.shape)

print(torch.argmax(y))

## Some useful function for accessing word or index from the other information



In [None]:
def word2idx(word_list,word):
  idx = np.where(np.array(word_list) == word)[0][0]
  return idx


def idx2word(word_list,idx):
  word = word_list[idx]
  return word

In [None]:
# test word2idx

word = 'backward'
print(word2idx(label_names,word))

#test idx2word
print(idx2word(label_names,31))

## Data transformation function

1. Padding to match batch-wise length (Similar to NLP)
2. Define collating fn (batch-wise processing function)
3. Define the dataloader class using the collating fn


In [None]:
# First the padding function

def pad_common(batch):

  """
  Input 
  batch : List of Torch tensors with (Nc,Nt)

  Returns
  batch : Torch tensor with (Lb, Nc, Nt)
  """

  # We want to transpose our individual data to have dimension (Nt,Nc)

  batch = [data.T for data in batch]

  # Feed entire list of data into nn.utils.rnn.pad_sequence to act on dim = 0 -> this function returns torch tensor

  batch = nn.utils.rnn.pad_sequence(batch, batch_first=True)

  # Rearrange from (B, Nt, Nc) to (B, Nc, Nt)

  batch = batch.permute(0,2,1)
  return batch


# Testing for padding

a = torch.randn(1,150)
b = torch.randn(1,149)
c = torch.randn(1,120)

batch = [a,b,c]

print(pad_common(batch).size())

In [None]:
# Then the collating function for batch generator

def collate_fn(batch):
  """
  collate function used for dataloader function to define a way the training sequence extracts data batch & target batch

  input : 
  batch (torch tensor with (B,C,L))


  returns :
  X : Batch of input tensors after preprocessing
  Y : Batch of target labels after preprocessing 
  """
  
  #Initiate X,Y as empty list - we will append our extracted data on here
  X,Y = [],[]

  for x, _,label, _, _ in batch:

    X.append(x)
    Y.append(word2idx(label_names,label))
  
  # Pad sequence using helper function
  
  X = pad_common(X)

  # Generate torch tensor (B,1) for target
  Y = torch.tensor(Y)[:,np.newaxis]

  return X,Y


# Testing function for collating_fn

batch = [train_data[i] for i in range(5)]
X,Y = collate_fn(batch)

print(X.shape)
print(Y)

In [None]:
# Now the train, test dataloaders

train_loader = torch.utils.data.DataLoader(dataset = train_data,
                                           batch_size=256,
                                           shuffle=True,
                                           collate_fn = collate_fn)

test_loader = torch.utils.data.DataLoader(dataset = test_data,
                                           batch_size=256,
                                           shuffle=False,
                                           drop_last=False,
                                           collate_fn = collate_fn)

## The optimizer and criterion function

1. We will use ADAM optimizer with the parameters given on the pytorch tutorial to relieve the stress

2. We will use step-based learning rate decay (constant reduction)

3. We will use negative log likelihood function for loss
  a) We will use vector of size (B,C) and compare with the one-hot target (B,1) and nn.nll_loss function for gradient descent

In [None]:
# The optimizer - Also add the regularization for weights by setting lambda = 0.0001
optimizer = torch.optim.Adam(model.parameters(),lr=0.01, weight_decay=0.0001)

# The stepLR learning decay scheduler
# This means -> Every 20 steps, decrease learning rate by new_lr = old_lr * gamma
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

# Also define a criterion funciton
criterion = nn.NLLLoss()

## Finally the Training sequence

1. For conciseness, define a single run (single epoch) -> Better for debug

### Training function

In [None]:
def train_model(model, cur_epoch):

  """
  Single epoch run through the model

  Input:
  model : Current instance of the trained model so far # PASSED BY REFERENCE
  cur_epoch : The current iteration we are on

  Return:

  """

  # now model to train mode  
  model.train()

  # for loop over batches

  losses = [] # This is the history of losses for evaluation of average
  for X,Y in train_loader:
    
    optimizer.zero_grad() # Reset to accumulation of gradient

    # Send both X and Y to cuda device
    X = X.to(device)
    Y = Y.to(device)

    # Forward pass through model
    Yhat = model(X)

    # Loss
    loss = criterion(Y,Yhat)
    losses.append(loss.item())

    # Backward-pass
    loss.backward()
    optimizer.step()
  
  print(f'\nThe training loss for current epoch {cur_epoch} : {sum(losses)/len(losses)}')


### Testing function

The accuracy criterion is the percent correct over testing dataset which we define here too

In [None]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

sample_rate = 16000
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
transform = transform.to(device)


In [None]:
def test_model(model, cur_epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # update progress bar
        #pbar.update(pbar_update)

    print(f"\nTest Epoch: {cur_epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

In [None]:
n_epoch = 2
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train_model(model, epoch)
        test_model(model, epoch)
        lr_scheduler.step()