In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
import random
from torchvision import models
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from sklearn.model_selection import train_test_split
from datetime import datetime

In [None]:
data_path = Path(".../urbanSound8k/metadata/")
meta_file = data_path/ 'UrbanSound8K.csv'
df_meta = pd.read_csv(meta_file)
df_meta.head()

 

In [None]:
df = pd.DataFrame()
df_meta['relative_path'] = '/fold' + df_meta['fold'].astype(str) + '/' + df_meta['slice_file_name'].astype(str)
df = df_meta[['relative_path', 'classID']]
df.head()

In [None]:
class AudioUtil():
  
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return sig, sr

    @staticmethod
    def rechannel(audio, new_channel):
        sig, sr = audio
        if (sig.shape[0] == new_channel):
            return audio
        if (new_channel == 1):
          resig = sig[:1, :]
        else:
          resig = torch.cat([sig, sig])
        return ((resig, sr))
     
 
    @staticmethod
    def resample(aud, newsr):
        sig, sr = aud

        if (sr == newsr):
          return aud

        num_channels = sig.shape[0]
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
        if (num_channels > 1):
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
            resig = torch.cat([resig, retwo])
        return ((resig, newsr))
    
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000*max_ms
        
        if (sig_len > max_len):
            
            sig = sig[:, :max_len]
        elif (sig_len < max_len):
            pad_begin_len = random.randint(0, max_len-sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))
            sig = torch.cat((pad_begin, sig, pad_end), 1)
        return (sig, sr)
    

    @staticmethod
    def time_shift(aud, shift_limit):
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
  
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = aud
        top_db = 80
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
        
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return spec
   
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec
        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
        return aug_spec  


In [None]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

class SoundDS(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        audio_file = r".../urbanSound8k/audio"+ df.loc[idx,"relative_path"]
        class_id = self.df.loc[idx, 'classID']
        aud = AudioUtil.open(audio_file)
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)
        dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
        return aug_sgram, class_id

In [None]:
myds = SoundDS(df, data_path)
num_items = len(myds)
num_train = round(num_items * 0.7)
num_val = round(num_items * 0.15)
num_test = round(num_items * 0.15)

train_ds, test_ds ,val_ds= random_split(myds, [num_train, num_test , num_val])

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=64, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=64, shuffle=False)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=64, shuffle=False)



In [None]:
def specturume(y, sr ):
    n_fft = 2048 
    ft = np.abs(librosa.stft(y[:n_fft], hop_length = n_fft+1))
    spec = np.abs(librosa.stft(y, hop_length=512)) 
    spec = librosa.amplitude_to_db(spec, ref=np.max)
    mel_spect = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, hop_length=1024) 
    mel_spect = librosa.power_to_db(mel_spect, ref=np.max)
    librosa.display.specshow(mel_spect, y_axis='mel', fmax=8000, x_axis='time'); 
    plt.title('Mel Spektrogramı'); 
    plt.colorbar(format='%+2.0f dB');
    return (mel_spect)

In [None]:
pip install matplotlib

In [None]:
import matplotlib.pyplot as plt
plt.style.use("dark_background")
def plot_spectrogram(batch_data, title=None, ylabel='freq_bin', aspect='auto', xmax=None):
    fig, axs = plt.subplots(4,4, figsize=(16,10))
    for idx, data in enumerate(batch_data):
        row, col = idx//4, idx%4
        ax = axs[row, col]
        spec = data[0]
        im = ax.imshow(spec, origin='lower', aspect=aspect)
        if xmax:
            ax.set_xlim((0, xmax))
            fig.colorbar(im, ax=ax)
            plt.show(block=False)

In [None]:
NUM_CLASSES = df.classID.nunique()
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model_config = {
    "base_model_name": "resnet50",
    "pretrained": False,
}

melspectrogram_parameters = {
    "n_mels": 128,
    "fmin": 20,
    "fmax": 16000
}

weights_path = None

In [None]:
import torch.nn.functional as F
from torch.nn import init


class AudioClassifier (nn.Module):

    def __init__(self):
        super().__init__()
        conv_layers = []

        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        self.conv4 = nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]
        
        self.conv5 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu5 = nn.ReLU()
        self.bn5 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv5.weight, a=0.1)
        self.conv5.bias.data.zero_()
        conv_layers += [self.conv5, self.relu5, self.bn5]
        
        self.conv6 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu6 = nn.ReLU()
        self.bn6 = nn.BatchNorm2d(128)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv6.bias.data.zero_()
        conv_layers += [self.conv6, self.relu6, self.bn6]

     
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Sequential(
            nn.Linear(in_features=128,out_features=256),
            nn.Linear(in_features=256,out_features=512),
            nn.Linear(in_features=512,out_features=1024),
            nn.Linear(in_features=1024,out_features=10))
        
        self.conv = nn.Sequential(*conv_layers)
        
        

    def forward(self, x):
        x = self.conv(x)

        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        
        x = self.lin(x)
        
        
        return x

In [None]:
myModel = AudioClassifier().to(DEVICE)

In [None]:
def train(model, train_dl):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                            steps_per_epoch=int(len(train_dl)),
                                            epochs=10,
                                            anneal_strategy='linear')
    running_loss = 0.0
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0
    

    for i, data in enumerate(tqdm(train_dl)):
        inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE)

        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        running_loss += loss.item()

        _, prediction = torch.max(outputs,1)
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction / total_prediction
    return avg_loss,acc,model 

In [None]:
from datetime import datetime
model = AudioClassifier()
loss_fn = torch.nn.CrossEntropyLoss()
running_loss = 0.0
running_loss = 0.0
vcorrect_prediction = 0
vtotal_prediction = 0
 
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
epoch_number = 0

EPOCHS = 10
model = myModel
best_vloss = 1_000_000.

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch_number + 1))

    model.train()
    avg_loss, acc_train,model = train(myModel, train_dl)
    with torch.no_grad():
        
        running_vloss = 0.0
        for i, vdata in enumerate(val_dl):
            vinputs, vlabels = vdata[0].to(DEVICE), vdata[1].to(DEVICE)
            voutputs = model(vinputs)
            vloss = loss_fn(voutputs, vlabels)
            running_vloss += vloss

            _ , prediction = torch.max(voutputs,1)
            vcorrect_prediction += (prediction == vlabels).sum().item()
            vtotal_prediction += prediction.shape[0]

    acc_val = vcorrect_prediction/vtotal_prediction
    avg_vloss = running_vloss / (i + 1)
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))
    
    writer.add_scalars('Training vs. Validation Loss',
                    { 'Training Loss' : avg_loss, 'Validation Loss' : avg_vloss },
                    epoch_number + 1)
    writer.flush()
    print(f"Validation accuricy :{acc_val}") 
    print(f"Training accuricy :{acc_train}\n") 
    
    
    if avg_vloss < best_vloss:
        best_vloss = avg_vloss
        model_path = 'model_{}_{}'.format(timestamp, epoch_number)
        torch.save(model.state_dict(), model_path)

    epoch_number += 1

In [None]:
import torchmetrics
from torchmetrics import F1Score
from torchmetrics import ConfusionMatrix

def inference (model, val_dl):
    correct_prediction = 0
    total_prediction = 0
    f1_scor  =[]
    Confmat = []

    with torch.no_grad():
        for data in tqdm(val_dl):
            inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE)

            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            outputs = model(inputs)
            _, prediction = torch.max(outputs,1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
            
            f1 = F1Score(num_classes=10)
            f1_scor.append(f1(outputs, labels)) 
            
            confmat = ConfusionMatrix(num_classes=10)
            Confmat.append(confmat(outputs, labels))

    acc = correct_prediction/total_prediction
    print(f'Accuracy: {acc}, Total items: {total_prediction} \nF1 Score : {max(f1_scor)}')
    print(f"Confusition Matrix : {Confmat}")

inference(myModel, test_dl)