<a href="https://colab.research.google.com/github/myazann/Voice-Activity-Detection/blob/main/VAD_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Connect to Google Drive and install required packages

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install torchaudio
!pip install pydub

from torch.utils import data
import torchaudio
import os
from pydub import AudioSegment
import numpy as np
import pydub
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torch.nn import *
from torch.optim import *
from tqdm import tqdm
import copy
from sklearn.metrics import classification_report
from torch import nn
import torch.nn.functional as F

##Get the data from drive

In [None]:
os.chdir("/content/drive/My Drive")

!cp Audio_Pad.zip /content
!cp Eklenti.zip /content

os.chdir("/content")

!unzip Audio_Pad.zip
!unzip Eklenti.zip

!mv -v Eklenti/Train/Speech/* Audio_Pad/Train/Speech
!mv -v Eklenti/Train/Non_Speech/* Audio_Pad/Train/Non_Speech
!mv -v Eklenti/Val/Speech/* Audio_Pad/Val/Speech
!mv -v Eklenti/Val/Non_Speech/* Audio_Pad/Val/Non_Speech

!rm -rf Audio_Pad.zip
!rm -rf Eklenti.zip

## Create CustomDataset class

In [4]:
class CustomDataset(torch.utils.data.Dataset):

    def __init__(self, data, label):
        self.data = torch.tensor(data, dtype=torch.float)
        self.label = torch.tensor(label, dtype=torch.long)
        

    def __len__(self):
        return len(self.label)

    def __getitem__(self, index):

        item_data = self.data[index]
        item_label = self.label[index]

        return item_data, item_label


##Create Model

In [5]:
class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.layer_norm(x)
        x = F.gelu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SoundDetectorModel(Module):   
    def __init__(self):
        super(SoundDetectorModel, self).__init__()
          
        self.sound_detector_model =  Sequential(
            self.ConvBlock(1, 32, 3, 1, 1),
            self.ConvBlock(32, 32, 3, 1, 1, True),
            self.ConvBlock(32, 64, 3, 1, 1),
            self.ConvBlock(64, 64, 3, 1, 1, True),
            self.ConvBlock(64, 128, 3, 1, 1, True),
            Dropout(0.5),
            self.ConvBlock(128, 256, 3, 1, 1),
            self.ConvBlock(256, 256, 3, 1, 1, True),
            Dropout(0.5),
            self.ConvBlock(256, 512, 3, 1, 1),
            self.ConvBlock(512, 1024, 3, 1, 1, True),
            Dropout(0.5)
            )
        
        self.classifier = nn.Sequential(
            BidirectionalGRU(1024, 1024, 0.25, True),
            Flatten(),
            self.LinearBlock(32768, 4096),
            self.LinearBlock(4096, 2048),
            self.LinearBlock(2048, 256),

            Linear(256, 2)
        )

    def ConvBlock(self, input_channels, output_channels, kernel_size=3, stride=1, padding = 1, maxpool = False):

      if maxpool:
        return Sequential(
          Conv2d(input_channels, output_channels, kernel_size, stride, padding),
          ReLU(inplace=True),
          MaxPool2d(kernel_size=2, stride=2)
          )
      else:
        return Sequential(
          Conv2d(input_channels, output_channels, kernel_size, stride, padding),
          ReLU(inplace=True)
        )

    def LinearBlock(self, input_channels, output_channels):

      return Sequential(
            Linear(input_channels, output_channels), 
            ReLU(inplace=True),
            LayerNorm(output_channels)
      )
          

    def forward(self, x):

        x = self.sound_detector_model(x)
        
        sizes = x.size()
        x = x.view(sizes[0], sizes[2] * sizes[3], sizes[1])
        ##x = x.transpose(1, 2)
        x = self.classifier(x)

        
        return x


## Load data and create data generators

I merged test with train because of the data scarcity.

In [None]:
data_mode = ["Train", "Val", "Test"]

for mode in data_mode:

  i = 0
  sp_path = "Audio_Pad/" + mode + "/Speech"
  nonsp_path = "Audio_Pad/" + mode + "/Non_Speech"

  sp_tensor = torch.empty((len(os.listdir(sp_path)),128, 157))
  nonsp_tensor = torch.empty((len(os.listdir(nonsp_path)),128, 157))  

  for song in os.listdir(sp_path):
    sound, _ = torchaudio.load(sp_path + "/" + song, channels_first = False)
    sound = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128, n_fft = 2048)(sound.flatten())
    if mode != "Val":
      sound = torchaudio.transforms.FrequencyMasking(freq_mask_param = 15)(sound)
      sound = torchaudio.transforms.TimeMasking(time_mask_param = 35)(sound)


    sp_tensor[i] = sound
    i += 1

  i = 0
  for song in os.listdir(nonsp_path):
    sound, _ = torchaudio.load(nonsp_path + "/" + song, channels_first = False)
    sound = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128, n_fft = 2048)(sound.flatten())
    if mode != "Val":
      sound = torchaudio.transforms.FrequencyMasking(freq_mask_param = 15)(sound)
      sound = torchaudio.transforms.TimeMasking(time_mask_param = 35)(sound)

    nonsp_tensor[i] = sound
    i += 1
      
  if mode == "Train":

    train_data = torch.cat((sp_tensor, nonsp_tensor))
    train_labels = torch.cat((torch.ones(len(os.listdir(sp_path))), torch.zeros(len(os.listdir(nonsp_path)))))

    aug_train_data = torch.zeros((50,128,157))
    aug_train_labels = torch.zeros((50))

    train_data = torch.cat((train_data, aug_train_data))
    train_labels = torch.cat((train_labels,aug_train_labels))

    train_data = train_data[:,None, ...]

  elif mode == "Val":

    val_data = torch.cat((sp_tensor, nonsp_tensor))
    val_labels = torch.cat((torch.ones(len(os.listdir(sp_path))), torch.zeros(len(os.listdir(nonsp_path)))))

    val_data = val_data[:,None, ...]

  else:

    test_data = torch.cat((sp_tensor, nonsp_tensor))
    test_labels = torch.cat((torch.ones(len(os.listdir(sp_path))), torch.zeros(len(os.listdir(nonsp_path))))) 

    test_data = test_data[:,None, ...]


  sp_tensor = []
  nonsp_tensor = []

  del sp_tensor
  del nonsp_tensor


train_data = torch.cat((train_data, test_data))
train_labels = torch.cat((train_labels, test_labels))

training_set = CustomDataset(train_data, train_labels)
training_generator = DataLoader(training_set, batch_size = 128, shuffle = True)

val_set = CustomDataset(val_data, val_labels)
val_generator = DataLoader(training_set, batch_size = 128, shuffle = True)

## Initialize model, loss, and optimizer

In [7]:
model = SoundDetectorModel()

if torch.cuda.is_available():
    model = model.cuda()

criterion = CrossEntropyLoss()

optimizer = AdamW(model.parameters(), lr=0.0001)

scheduler = lr_scheduler.OneCycleLR(optimizer,
	max_lr=0.0004,
	steps_per_epoch=int(len(training_generator)),
	epochs=50,
	anneal_strategy='linear')

## Training

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
n_epochs = 50

best_acc = 0.0
best_model_wts = copy.deepcopy(model.state_dict())
val_acc_history = []


for epoch in tqdm(range(1, n_epochs+1)):
    
    for phase in ['train', 'val']:
        
        if phase == 'train':
            model.train()  
        else:
            model.eval()   

        running_loss = 0.0
        running_corrects = 0
 

        for inputs, labels in training_generator:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            with torch.set_grad_enabled(phase == 'train'):

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                _, preds = torch.max(outputs, 1)

                if phase == 'train':
                    loss.backward()
                    optimizer.step()           

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(training_generator.dataset)
        epoch_acc = running_corrects.double() / len(training_generator.dataset)

        print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
    
        if phase == 'val' and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        if phase == 'val':
            val_acc_history.append(epoch_acc)

In [9]:
model.load_state_dict(best_model_wts)

torch.save(best_model_wts, "SoundDetector.pth")

torch.save(model,'SoundDetector.pt')