In [16]:
import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [17]:

from random import randint
import torch

# @staticmethod
def pad_trunc(aud, max_ms:int):

    sig, sr = aud

    if sig.dim() > 1: # if there are more than one channels

        num_channel, sig_len = sig.shape

        max_len = sr//1000*max_ms

        if (sig_len > max_len):
            sig = sig[:, :max_len].to(device)

        elif(sig_len < max_len):

            pad_begin_len = randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            pad_begin = torch.zeros((num_channel, pad_begin_len), device=device)
            pad_end = torch.zeros((num_channel, pad_end_len), device=device)

            sig = torch.cat((pad_begin, sig, pad_end), dim=1).to(device)



    elif sig.dim()==1:             # if there is only one channel

        sig_len = sig.shape[0]
        max_len = sr//1000*max_ms

        if (sig_len > max_len):
            sig = sig[:max_len].to(device)

        elif(sig_len < max_len):

            pad_begin_len = randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            pad_begin = torch.zeros(pad_begin_len, device=device)
            pad_end = torch.zeros(pad_end_len, device=device)

            sig = torch.cat((pad_begin, sig, pad_end), dim=0).to(device)

    
    return (sig, sr)



In [18]:
import random

# @staticmethod
def time_shift(aud, shift_limit):
    sig, sr = aud
    _, sig_len = sig.shape

    shift_amt = int(random.random() * shift_limit * sig_len)
    sig = sig.roll(shift_amt).to(device)
    return (sig, sr)

In [19]:
# @staticmethod
import torch
from random import randint


def rechannel(aud, n_new_channel):
    
    sig, sr = aud

    #if new channel count equals old channel count is'nt
    if(sig.dim() == 1 ):
        n_current_channel = 1


    #detect current channel count
    elif(sig.dim() > 1 ):
        n_current_channel = sig.shape[0]

    if (n_current_channel == n_new_channel):
        return aud


    #new channel count required is greater than current channel count
    elif(n_new_channel > n_current_channel):
        
        dif = n_new_channel - n_current_channel

        new_sig = sig[ randint(0, n_current_channel-1) ]
        new_sig = torch.unsqueeze(new_sig, dim=0)
        
        for i in range(dif-1):
            temp_sig = sig[ randint(0, n_current_channel-1) ]
            temp_sig = torch.unsqueeze(temp_sig, dim=0)
            
            new_sig = torch.cat((new_sig, temp_sig), dim=0)

        resig = torch.cat((sig, new_sig), dim= 0)
        
        

    elif n_new_channel < n_current_channel :

        n_mix_channel = n_current_channel - n_new_channel + 1

        mix_begin = randint(0, n_current_channel-n_mix_channel)
        mix_end = mix_begin + n_mix_channel

        mix_aud = torch.mean( sig[mix_begin:mix_end], dim=0 )
        mix_aud = torch.unsqueeze(mix_aud, dim=0)
        
        resig = torch.cat([sig[0:mix_begin,:], mix_aud, sig[mix_end:,:]]).to(device)     
        
    return ((resig, sr))

In [20]:
import torch
a = torch.tensor([1.,2,3,4]).to(device)
# b = torch.t_copy(a)
# b = a.t_copy()
b = torch.t_copy(a)
k = torch.cat((a,b))
b[0] =17
print(a.mean())
print(k)

tensor(2.5000, device='cuda:0')
tensor([1., 2., 3., 4., 1., 2., 3., 4.], device='cuda:0')


In [21]:
import librosa.display
import torchaudio.transforms as transforms


# @staticmethod
def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    
    sig, sr = aud
    top_db = 80
    
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels).to(device)(sig)
    spec = transforms.AmplitudeToDB(top_db=top_db).to(device)(spec)
    
    return (spec)
    

In [22]:
# @staticmethod
def spectro_augment(spec, max_mask_pct=0.1, n_freq_mask=1, n_time_mask=1):
    _, n_mels, n_steps =spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_masks_param = max_mask_pct * n_mels
    
    for _ in range(n_freq_mask):
        aug_spec = transforms.FrequencyMasking(freq_masks_param)(aug_spec, mask_value)
        time_mask_param = max_mask_pct * n_steps

    for _ in range(n_time_mask):
        aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [23]:
# CREATE DATA FRAME FOR DATASET

import glob
import os
import pandas as pd

def create_df():

    PATH = './datasets/data/'

    
    sound_paths=[]
    sound_classes = []

    for folderName in glob.glob(PATH+'*'):
    
        class_name = os.path.basename(folderName)

        for sound_path in glob.glob(PATH+"/"+class_name+'/*'):
            
            sound_paths.append(sound_path)
            sound_classes.append(class_name)
       

    return pd.DataFrame({"file_path":sound_paths, "class":sound_classes})




In [24]:
# CREATE DATASET CLASS

from torch.utils.data import Dataset
import librosa
import torch

class SoundDS(Dataset):

    def __init__(   self,
                    df,
                    sr=22050,
                    duration=None,
                    n_channel=1,
                    shift_pct=0.4,
                    max_mask_pct=0.1,
                    n_freq_mask=1,
                    n_time_mask=1
                ):
        
        self.df = df
        self.sr = sr
        self.duration = duration
        self.n_channel = n_channel
        self.shift_pct = shift_pct
        self.max_mask_pct = max_mask_pct
        self.n_freq_mask = n_freq_mask
        self.n_time_mask = n_time_mask
        



        self.classes = set()
        self.classes.update(self.df['class'].to_list())
        self.classes = [ x for x in self.classes]
        self.classes.sort()
        print(self.classes)




    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):

        file_path = self.df.loc[index, "file_path"]
        class_name = self.df.loc[index, "class"]
        class_num = self.classes.index(class_name)


        aud, sr = librosa.load( file_path,
                                sr=self.sr,
                                mono=self.n_channel==1,
                                duration=self.duration)
                                
        aud = torch.from_numpy(aud).to(device)

        if aud.dim() == 1:
            aud = torch.unsqueeze(aud, dim=0).to(device)
        
        sig = (aud, sr)

        sig = pad_trunc(sig, self.duration)

        sig = time_shift(sig, self.shift_pct)

        sig = rechannel(sig, self.n_channel)

        sig = spectro_gram(sig, n_mels=64, n_fft=1024, hop_len=None)

        sig = spectro_augment(sig,
                              max_mask_pct=self.max_mask_pct,
                              n_freq_mask=self.n_freq_mask,
                              n_time_mask=self.n_time_mask)


        return sig.to(device), class_num

In [25]:
# p = iter(train_dl)
# k = p.next()
# print(model(k[0]).shape)

In [26]:
# CREATE A CONVOLUTIONAL MODEL CLASS

import torch.nn.functional as F
import torch.nn as nn
from torch.nn import init

class Classifier(nn.Module):

    def __init__(self, n_channel):
        super(Classifier, self).__init__()

        conv_layers = []

        self.relu = nn.ReLU()

        # First convolution block
        self.conv1 = nn.Conv2d(n_channel, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        # self.relu layer
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers.extend( [self.conv1, self.relu, self.bn1] )

        # Second convolution block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        # self.relu layer
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers.extend( [self.conv2, self.relu, self.bn2] )


        # Third convolution block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        #self.relu layer
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a= 0.1)
        self.conv3.bias.data.zero_()
        conv_layers.extend( [self.conv3, self.relu, self.bn3] )

        # Forth convolution block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        #self.relu layer
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a = 0.1)
        self.conv4.bias.data.zero_()
        conv_layers.extend( [self.conv4, self.relu, self.bn4] )

        # Last step for classification
        # Fully Connected Linear Layer

        # Downsizing with pooling
        self.avgPool = nn.AdaptiveAvgPool2d(output_size=1)
        
        #flattening for input to linear layer
        self.flatten = nn.Flatten(1, -1)

        self.linear_layer = nn.Linear(in_features=64, out_features=4)

        conv_layers.extend([self.avgPool, self.flatten, self.linear_layer])

        #The list of conv_layer is unpacked and sent
        self.conv = nn.Sequential(*conv_layers) # nn.Squential() gets *args or OrderedDict

    def forward(self, inputs):
        
        return self.conv(inputs)

        



In [27]:
# # x = torch.empty((16,2,64,215), dtype=torch.float32)
# # x.normal_()

# x = librosa.load('./datasets/data/301 - Crying baby/1-22694-A.ogg')
# x = torch.from_numpy(x[0])
# # x = torch.unsqueeze(x[0], dim=0)
# x.shape
# # y=model(x)

# # print(y.shape)


In [28]:
# PREPAIRING DATALOADER

from random import shuffle
from torch.utils.data import random_split
from torch.utils.data import DataLoader

dataframe = create_df()

audio_dataset = SoundDS(
                        df=dataframe,
                        sr=22050,
                        duration=5000,
                        n_channel=1,
                        shift_pct=0.4,
                        max_mask_pct=0.1,
                        n_freq_mask=1,
                        n_time_mask=1
                        )

num_item = len(audio_dataset)
num_train = round(num_item*0.8)
num_test = num_item - num_train

train_ds, test_ds = random_split(audio_dataset, [num_train, num_test] )

train_dl = DataLoader(train_ds,batch_size=8, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=10, shuffle=False)

['301 - Crying baby', '901 - Silence', '902 - Noise', '903 - Baby laugh']


In [29]:
torch.cuda.is_available()

True

In [30]:
#CREATE A FUNCTIN FOR MODEL TRAINING

from sched import scheduler
import torch
import os
from torchvision.transforms import Normalize

from torch.utils.tensorboard import SummaryWriter

from ignite.handlers import create_lr_scheduler_with_warmup


 #create folde for saving checkpoints
if not os.path.exists('./check_points_for_detection'):
    os.mkdir('./check_points_for_detection')



def train_model(model, train_dl, num_epoch, lr):

    writer = SummaryWriter('./runs/Cry_Detection/writer')
    writer2 = SummaryWriter('./runs/Cry_Detection/writer2')

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    model.train()
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # torch_lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
    #                                                     mode='min',
    #                                                     factor=0.75,
    #                                                     patience=50)

    torch_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer,
                                                    gamma=0.98)

    # scheduler = create_lr_scheduler_with_warmup(torch_lr_scheduler,
    #                                         warmup_start_value=0,
    #                                         warmup_duration=3,
    #                                         warmup_end_value=0.01)

    num_batches = len(train_dl)

    # scheduler(None)

    count = 0


    for epoch in range(num_epoch):
        
        running_loss       = 0.0
        correct_prediction = 0
        total_prediction   = 0

        last_4_loss = 0.0
        last_4_correct_pred = 0
        last_4_pred = 0

        torch.set_grad_enabled(False)

        for i, data in enumerate(train_dl):

            torch.set_grad_enabled(False)

            inputs:torch.Tensor = data[0].to(device)
            labels:torch.Tensor = data[1].to(device)

            mean:torch.Tensor = inputs.mean()
            std:torch.Tensor  = inputs.std()

            inputs = Normalize(mean, std)(inputs)  # OR inputs = (inputs - mean) / std

            optimizer.zero_grad()

            torch.set_grad_enabled(True)

            outputs = model(inputs)
            loss:torch.Tensor = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.mean().item()
            _, predictions = torch.max(outputs, 1)
            acc = (predictions==labels).sum().item()/predictions.shape[0]
            correct_prediction += (predictions==labels).sum().item()
            total_prediction += predictions.shape[0]

            last_4_loss += loss.item()
            last_4_correct_pred += (predictions==labels).sum().item()
            last_4_pred += predictions.shape[0] 

            count += 1
            if count%4 == 0 or count == 1:

                torch_lr_scheduler.step()
                
                writer.add_scalar('Training loss - Average of Last 4 Steps', last_4_loss/4, (epoch*len(train_dl)+i+1))
                
                writer.add_scalar('Accuracy - Average of Last 4 steps',
                                   last_4_correct_pred/last_4_pred,
                                   (epoch*len(train_dl)+i+1))
                
                writer.add_scalars('Loss and Accuracy - Average of Last 4 Steps',
                                  {'Accuracy': last_4_correct_pred/last_4_pred,
                                   'Loss': last_4_loss/4}, (epoch*len(train_dl)+i+1))

                last_4_loss = 0
                last_4_correct_pred= 0
                last_4_pred = 0
                
            writer.add_scalar("Learning Rate", torch_lr_scheduler.get_last_lr()[0], (epoch*len(train_dl)+i+1))
            writer.add_scalar("Steps - Epochs", len(train_dl)*epoch + i + 1 , epoch)

        avg_loss = running_loss / num_batches
        avg_acc = correct_prediction / total_prediction

        
        writer2.add_scalar('Training loss - Average of Last Epoch', avg_loss, (epoch+1))
        writer2.add_scalar('Accuracy - Average Last epoch', avg_acc, (epoch+1))
        writer2.add_scalars('Comparison - Loss and Accuracy', {'Accuracy': avg_acc, 'Loss': avg_loss}, (epoch+1))
        
        print(f'Epoch: {epoch+1}, Loss: {avg_loss:.2f}, Accuracy: {avg_acc:.2f}')

        path = "./check_points_for_detection/checkpoint_"+str(epoch+1)+".pth"

        checkPoint = {"model_state": model.state_dict(),
                      "optim_state": optimizer.state_dict,
                      "scheduler"  : torch_lr_scheduler.state_dict()}

        torch.save(checkPoint, path)

    
        
    writer.close()

In [31]:
# CREATE A MODEL AND TRAIN IT

import torch.utils.tensorboard

n_channel = audio_dataset.n_channel

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model         = Classifier(n_channel).to(device)
num_epoch     = 12
learning_rate = 0.01

# Train model
train_model(model, train_dl, num_epoch, learning_rate)




Epoch: 1, Loss: 0.65, Accuracy: 0.76
Epoch: 2, Loss: 0.53, Accuracy: 0.81
Epoch: 3, Loss: 0.44, Accuracy: 0.86
Epoch: 4, Loss: 0.29, Accuracy: 0.90
Epoch: 5, Loss: 0.30, Accuracy: 0.90
Epoch: 6, Loss: 0.27, Accuracy: 0.90
Epoch: 7, Loss: 0.22, Accuracy: 0.92
Epoch: 8, Loss: 0.23, Accuracy: 0.93
Epoch: 9, Loss: 0.21, Accuracy: 0.93
Epoch: 10, Loss: 0.15, Accuracy: 0.96
Epoch: 11, Loss: 0.15, Accuracy: 0.95
Epoch: 12, Loss: 0.19, Accuracy: 0.95


In [32]:
#CREATE MODEL TEST FUNCTION

import torch
import os
from torchvision.transforms import Normalize

from torch.utils.tensorboard import SummaryWriter


def test_model(model:nn.Module, test_dl:DataLoader):

    torch.set_grad_enabled(False)

    writer = SummaryWriter('./runs/Cry_DetectionTest')
   

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model.eval()    
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()

    num_batches = len(test_dl)
        
    total_loss       = 0.0
    total_correct_prediction = 0
    total_prediction   = 0

    for i, data in enumerate(test_dl):

        batch_size = len(data[0])

        inputs:torch.Tensor = data[0].to(device)
        labels:torch.Tensor = data[1].to(device)

        mean:torch.Tensor = inputs.mean()
        std:torch.Tensor  = inputs.std()

        inputs = Normalize(mean, std)(inputs)  # OR inputs = (inputs - mean) / std

        outputs = model(inputs)
        loss:torch.Tensor = criterion(outputs, labels)

        total_loss += loss.mean().item()
        _, predictions = torch.max(outputs, 1)
        acc = (predictions==labels).sum().item()/predictions.shape[0]
        total_correct_prediction += (predictions==labels).sum().item()
        total_prediction += predictions.shape[0]

            
        writer.add_scalar('Test loss - Average of Every Batch', loss.item()/batch_size, i+1)
        writer.add_scalar('Test Accuracy - Average of Every Batch', acc, i+1)
        
        print(f'Bacth: {i+1}, Loss: {loss:.2f}, Accuracy: {acc:.2f}')

    avg_loss = total_loss / num_batches
    avg_acc = total_correct_prediction / total_prediction
    

       
    print(f'Test Finised:\nLoss: {avg_loss:.2f}, Accuracy: {avg_acc:.2f}')
        
    writer.close()


In [34]:
# TEST MODEL

import torch

modelForTest = Classifier(1)
path = './check_points_for_detection/checkpoint_10.pth'
device = torch.device("cuda:0")
modelForTest.load_state_dict( torch.load(path, map_location=device)['model_state'] )

test_model(modelForTest, test_dl)






Bacth: 1, Loss: 0.02, Accuracy: 1.00
Bacth: 2, Loss: 0.02, Accuracy: 1.00
Bacth: 3, Loss: 0.05, Accuracy: 1.00
Bacth: 4, Loss: 0.26, Accuracy: 0.90
Bacth: 5, Loss: 0.04, Accuracy: 1.00
Bacth: 6, Loss: 0.03, Accuracy: 1.00
Bacth: 7, Loss: 0.25, Accuracy: 0.90
Bacth: 8, Loss: 0.37, Accuracy: 0.90
Bacth: 9, Loss: 0.00, Accuracy: 1.00
Test Finised:
Loss: 0.11, Accuracy: 0.97


In [None]:
import torch
import torch.nn as nn
w = torch.empty(3, 5)
print(w)
nn.init.kaiming_normal_(w, mode='fan_in', nonlinearity='relu')
print(w.data.zero_())
print(w)

In [None]:
num_train

In [None]:
for k, i in enumerate( train_dl):
   print(k, i[0].shape)

In [None]:
import torchvision
tnsr = torch.tensor(  [ [
                        [1.,2,3,4],
                        [6,7,8,9],
                        [11,12,13,14],
                        ],
                        [
                        [1.,2,3,4],
                        [6,7,8,9],
                        [11,12,13,14],
                        ]
                        
                        ])


m = tnsr.mean()
s = tnsr.std()
print(m, s)

torchvision.transforms.Normalize(s, m)(tnsr)

print(tnsr)

In [None]:
a= [1,3,4]
b= [1,2,3]
a.extend([4,4,4,4,4,44,4])
print(a)

In [None]:
a = [1,2,3,4]
b = {"a":1, "b":2, "c":3}



def fnc(**arg):
    for i in arg.items():
        print(i)

fnc(b)