In [1]:
!pip install soundata jams mir_eval --no-dependencies

Collecting soundata
  Downloading soundata-0.1.2-py3-none-any.whl (5.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.9/5.9 MB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jams
  Downloading jams-0.3.4.tar.gz (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.3/51.3 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mir_eval
  Downloading mir_eval-0.7.tar.gz (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.7/90.7 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: jams, mir_eval
  Building wheel for jams (setup.py) ... [?25l[?25hdone
  Created wheel for jams: filename=jams-0.3.4-py3-none-any.whl size=64900 sha256=8430b442d162de11a6c7c6b141724c0d4de8a6040821e0c551e99ad9b76ae170
  Stored in directory: /root/.cache/pip/wheels/28/9a/f7/fb386b6bc

In [2]:
import soundata

dataset = soundata.initialize('urbansound8k')
dataset.download()
dataset.validate()

example_clip = dataset.choice_clip()
print(example_clip)

5.61GB [04:38, 21.6MB/s]                            
100%|██████████| 1/1 [00:00<00:00, 212.73it/s]
100%|██████████| 8732/8732 [00:44<00:00, 195.80it/s]

Clip(
  audio_path="/root/sound_datasets/urbansound8k/audio/fold1/203356-3-0-2.wav",
  clip_id="203356-3-0-2",
  audio: The clip's audio
            * np.ndarray - audio signal
            * float - sample rate,
  class_id: The clip's class id.
            * int - integer representation of the class label (0-9). See Dataset Info in the documentation for mapping,
  class_label: The clip's class label.
            * str - string class name: air_conditioner, car_horn, children_playing, dog_bark, drilling, engine_idling, gun_shot, jackhammer, siren, street_music,
  fold: The clip's fold.
            * int - fold number (1-10) to which this clip is allocated. Use these folds for cross validation,
  freesound_end_time: The clip's end time in Freesound.
            * float - end time in seconds of the clip in the original freesound recording,
  freesound_id: The clip's Freesound ID.
            * str - ID of the freesound.org recording from which this clip was taken,
  freesound_start_time: T




In [51]:
import pandas as pd
from pathlib import Path

download_path = '/root/sound_datasets/urbansound8k'

# Read metadata file
metadata_file = download_path + '/metadata/UrbanSound8K.csv'
df = pd.read_csv(metadata_file)
df.head()

# Construct file path by concatenating fold and file name
df['relative_path'] = '/fold' + df['fold'].astype(str) + '/' + df['slice_file_name'].astype(str)

df = df[['relative_path', 'classID']]
df.head()


Unnamed: 0,relative_path,classID
0,/fold5/100032-3-0-0.wav,3
1,/fold5/100263-2-0-117.wav,2
2,/fold5/100263-2-0-121.wav,2
3,/fold5/100263-2-0-126.wav,2
4,/fold5/100263-2-0-137.wav,2


In [52]:
from re import A
from numpy.core.numerictypes import maximum_sctype
from numpy.core.overrides import ARRAY_FUNCTION_ENABLED
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio


class AudioUtil():

  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)


  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud

    if (sig.shape[0] == new_channel):
      # Nothing to do
      return aud

    if (new_channel == 1):
      # Convert from stereo to mono by selecting only the first channel
      resig = sig[:1, :]
    else:
      # Convert from mon to stereo by duplicating the first channel
      resig = torch.cat([sig, sig])

    return ((resig, sr))


  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud

    if (sr == newsr):
      # Nothing to do
      return aud

    num_channels = sig.shape[0]
    # Resample first channel
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1, :])
    if (num_channels > 1):
      # Resample the second channel and merge both channels
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:, :])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))

  @staticmethod
  def pad_trunc(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms

    if (sig_len > max_len):
      # Truncate the signale to the given length
      sig = sig[:, :max_len]

    elif (sig_len < max_len):
      # Length of padding to add at the beginning and end of the signal
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len

      # Pad with 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)

    return (sig, sr)


  @staticmethod
  def time_shift(aud, shift_limit):
    sig, sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)


  @staticmethod
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig, sr = aud
    top_db = 80

    # spec the sample [channel, n_mels, time], where channels is mono, stereio etc.
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)

    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)

  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec


    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)


    return aug_spec

In [53]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

class SoundDS(Dataset):
  def __init__(self, df, data_path):
    self.df = df
    self.data_path = str(data_path)
    self.duration = 4000
    self.sr = 44100
    self.channel = 2
    self.shift_pct = 0.4


  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx):
    audio_file = self.data_path + self.df.loc[idx, 'relative_path']

    class_id = self.df.loc[idx, 'classID']

    aud = AudioUtil.open(audio_file)

    reaud = AudioUtil.resample(aud, self.sr)

    rechan = AudioUtil.rechannel(reaud, self.channel)

    dur_aud = AudioUtil.pad_trunc(rechan, self.duration)

    shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)

    sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)

    aug_sgram = AudioUtil.spectro_gram(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

    return aug_sgram, class_id

In [54]:
from torch.utils.data import random_split


data_path ="/root/sound_datasets/urbansound8k/audio"

myds = SoundDS(df, data_path)


# Random split of 80:20 between training and validation

num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train

train_ds, val_ds = random_split(myds, [num_train, num_val])

print(len(train_ds), len(val_ds))

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=True)


6986 1746


In [55]:
import torch.nn.functional as F
from torch.nn import init
from torch import nn


class AudioClassifier(nn.Module):
  def __iniit__(self):
    super().__init__()
    conv_layers = []

    self.conv1 = nn.Conv2d(2, 8, kerner_size=(5,5), stride=(2,2), padding=(2,2))
    self.relu1 = nn.ReLU()
    self.bn1 = nn.BatchNorm2d(8)
    init.kaiming_normal_(self.conv1.weight, a=0.1)
    self.conv1.bais.data.zero_()
    conv_layers += [self.conv1, self.relu1, self.bn1]

     # Second Convolution Block
    self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    self.relu2 = nn.ReLU()
    self.bn2 = nn.BatchNorm2d(16)
    init.kaiming_normal_(self.conv2.weight, a=0.1)
    self.conv2.bias.data.zero_()
    conv_layers += [self.conv2, self.relu2, self.bn2]

    # Second Convolution Block
    self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    self.relu3 = nn.ReLU()
    self.bn3 = nn.BatchNorm2d(32)
    init.kaiming_normal_(self.conv3.weight, a=0.1)
    self.conv3.bias.data.zero_()
    conv_layers += [self.conv3, self.relu3, self.bn3]

    # Second Convolution Block
    self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    self.relu4 = nn.ReLU()
    self.bn4 = nn.BatchNorm2d(64)
    init.kaiming_normal_(self.conv4.weight, a=0.1)
    self.conv4.bias.data.zero_()
    conv_layers += [self.conv4, self.relu4, self.bn4]

    # Linear Classi
    self.ap = nn.AdaptiveAvgPool2d(output_size=1)
    self.lin = nn.Linear(in_features=64, out_features=10)

    # Wrap the Convolutional Blocks

    self.conv = nn.Sequential(*conv_layers)

  def forward(self, x):
    # Run the convolutional blocks
    x = self.conv(x)

    # Adaptive pool and flatten for input to linear layer
    x = self.ap(x)
    x = x.view(x.shape[0], -1)

    # Linear layer
    x = self.lin(x)

    # Final output
    return x

myModel = AudioClassifier()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

myModel = myModel.to(device)

# Chck that if is on cuda
#next(myModel.parameters()).device



cuda:0


In [56]:
def training(model, train_dl, num_epochs):
  # Loss function, optimizer and schedular
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer,
                                                  max_lr=0.001,
                                                  steps_per_epoch=int(len(train_dl)),
                                                  epochs=num_epochs,
                                                  anneal_strategy='linear')

  for epoch in range(num_epochs):
    running_loss = 0.0
    correction_prediction = 0
    total_prediction = 0


    for i, data in enumerate(train_dl):
      inputs, labels = data[0].to(device), data[1].to(device)

      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      optimizer.zero_grad()

      # forwared + backward + optimize

      outputs = model(inputs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      scheduler.step()

      running_loss = loss.item()

      _, prediction = torch.max(outputs,1)
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]

      if i % 10 == 0:
        print(f'{epoch+1}, {i+1}, loss: {running_loss / 10}')

    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')

In [57]:
def inference(model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  with torch.no_grad():
    for data in val_dl:

      inputs, labels = data[0].to(device), data[1].to(device)

      # Normalize the inputs
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s


      outputs = model(inputs)

      _, prediction = torch.max(outputs, 1)
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]


  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

inference(myModel, val_dl)

TypeError: ignored