<a href="https://colab.research.google.com/github/jdasam/ant5015/blob/2024F/notebooks/5_auto_tagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Autotagging

In [None]:
import torch
import torchaudio
import IPython.display as ipd
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt

- Download Small MTAT if you need


In [None]:
!pip install --upgrade gdown
!gdown 15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
# !wget https://sogang365-my.sharepoint.com/:u:/g/personal/dasaem_jeong_o365_sogang_ac_kr/EdkHWV-qvxBEi-d0Ua73VG4BEp7EZO7HMvrXsWqeJvMJzg?e=GbYylV&download=1

!unzip -q mtat_8000.zip

Downloading...
From (original): https://drive.google.com/uc?id=15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6
From (redirected): https://drive.google.com/uc?id=15e9E3oZdudErkPKwb0rCAiZXkPxdZkV6&confirm=t&uuid=d0296d66-96e8-4c01-9b05-5398300bb416
To: /content/mtat_8000.zip
100% 921M/921M [00:26<00:00, 34.2MB/s]


In [None]:
class MTATDataset:
  def __init__(self, dir_path, split='train', num_max_data=6000, sr=16000):
    self.dir = Path(dir_path)
    self.labels = pd.read_csv(self.dir / "meta.csv", index_col=[0])
    self.sr = sr

    if split=="train":
      sub_dir_ids = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c']
    elif split=='valid':
      sub_dir_ids = ['d']
    elif split=='test': #test
      sub_dir_ids = ['e', 'f', 'g']
    else:
      raise NotImplementedError

    is_in_set = [True if x[0] in sub_dir_ids else False for x in self.labels['mp3_path'].values.astype('str')]
    self.labels = self.labels.iloc[is_in_set] # filter label by is_in_set
    self.labels = self.labels[:num_max_data]
    self.vocab = self.labels.columns.values[1:-1]
    self.label_tensor = self.convert_label_to_tensor()

  def convert_label_to_tensor(self):
    return torch.tensor(self.labels.values[:, 1:-1].astype('bool'), dtype=torch.float)

  def __len__(self):
    return len(self.labels)

  def __getitem__(self, idx):
    info = self.labels.iloc[idx]
    mp3_path = self.dir / info['mp3_path']

    audio, sr = torchaudio.load(mp3_path)
    assert sr == self.sr
    label = self.label_tensor[idx]
    return audio.mean(0), label

data_dir = Path('MTAT_SMALL')
dataset= MTATDataset(data_dir)
dataset[0]

(tensor([ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -8.8974e-08,
         -5.8156e-08, -5.6856e-08]),
 tensor([0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.]))

In [None]:
dataset[0][0].to(torch.float16)

tensor([ 0.0000e+00,  0.0000e+00,  0.0000e+00,  ..., -5.9605e-08,
        -5.9605e-08, -5.9605e-08], dtype=torch.float16)

In [None]:
csv_fn = 'MTAT_SMALL/meta.csv'
meta = pd.read_csv(csv_fn, index_col=[0])
vocab = meta.columns.values[1:-1]

idx = 10

mp3_path = meta['mp3_path'].values[idx]
mp3_path = meta.iloc[idx]['mp3_path']
label = meta.iloc[idx].values[1:-1]
label_ids = np.where(label)[0]

mp3_path, label

ipd.display(ipd.Audio('MTAT_SMALL/'+mp3_path)), print(vocab[label_ids])

['classical' 'quiet' 'ambient' 'string' 'harp' 'slow']


(None, None)

In [None]:
# check audio channel
y, sr = torchaudio.load('MTAT_SMALL/'+mp3_path)
y.shape

torch.Size([1, 465984])

In [None]:
import numpy as np
label_ids = np.where(label)[0]
print(label, label_ids)
vocab = meta.columns.values[1:-1]
vocab[label_ids]

array(['guitar', 'male', 'vocal'], dtype=object)

In [None]:
from tqdm.auto import tqdm

for i in tqdm(range(100)):
  audio, label = dataset[i]


  0%|          | 0/100 [00:00<?, ?it/s]

## Make Dataset

In [None]:
class OnMemoryDataset(MTATDataset):
  def __init__(self, dir_path, split='train', num_max_data=6000, sr=16000):
    super().__init__(dir_path, split, num_max_data, sr)
    self.loaded_audio = self.load_audio()

  def load_audio(self):
    audios = []
    for idx in tqdm(range(len(self))):
      info = self.labels.iloc[idx]
      mp3_path = self.dir / info['mp3_path']
      audio, sr = torchaudio.load(mp3_path)
      assert sr == self.sr
      audios.append(audio.to(torch.float16))
    return audios

  def __getitem__(self, idx):
    label = self.label_tensor[idx]
    audio = self.loaded_audio[idx].to(torch.float32)
    return audio.mean(0), label


memory_dataset = OnMemoryDataset('MTAT_SMALL/')
memory_dataset.label_tensor

  0%|          | 0/5000 [00:00<?, ?it/s]

tensor([[0., 0., 0.,  ..., 0., 1., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

In [None]:
for i in tqdm(range(100)):
  audio, label = memory_dataset[i]
audio, label

  0%|          | 0/100 [00:00<?, ?it/s]

(tensor([0.0000e+00, 0.0000e+00, 0.0000e+00,  ..., 0.0000e+00, 1.1921e-07,
         2.3842e-07]),
 tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.]))

In [None]:
train_loader = torch.utils.data.DataLoader(memory_dataset, batch_size=32, shuffle=True)

# for batch in train_loader:
#   print(batch)
#   break
batch = next(iter(train_loader))
audios, labels = batch
print(audios.shape, labels.shape)

torch.Size([32, 465984]) torch.Size([32, 50])


## Make Model

In [None]:
import torch.nn as nn

dummy = torch.arange(32).reshape(1, 1, 4, 8).to(torch.float32) # N x C x H x W
dummy
# RNN: Time x N x C
# Transformer: N x Time x C

cnn_layer = nn.Conv2d(in_channels=1, out_channels=3, kernel_size=3, stride=1, padding=0, dilation=1)
out = cnn_layer(dummy)
print(out.shape)

torch.Size([1, 3, 2, 6])


In [None]:
import torch.nn as nn

class MelDbConverter(torch.nn.Module):
  def __init__(self):
    super().__init__()
    self.mel_conv = torchaudio.transforms.MelSpectrogram(n_fft=1024,
                                                     hop_length=512,
                                                     f_min=20,
                                                     f_max=4000,
                                                     n_mels=80,
                                                     sample_rate=16000)
    self.db_conv = torchaudio.transforms.AmplitudeToDB()

  def forward(self, x):
    return self.db_conv(self.mel_conv(x)) / 100



class CNNModel(nn.Module):
  def __init__(self, dim=8, num_out=10):
    super().__init__() # nn.Module init
    self.mel_conv = MelDbConverter()

    self.conv = nn.Sequential(
        nn.Conv2d(1, 16, 3),
        nn.ReLU(),
        nn.MaxPool2d(3),
        nn.Conv2d(16, 32, 3),
        nn.MaxPool2d(3),
        nn.ReLU(),
        nn.Conv2d(32, 64, 3),
        nn.MaxPool2d((1,3)),
        nn.ReLU(),
        nn.Conv2d(64, 64, 3),
        nn.MaxPool2d((1,3)),
        nn.ReLU(),
        )
    # self.conv2 = nn.Conv2d(16, 16, 3)
    self.proj = nn.Linear(256, num_out)

  def forward(self, x):
    # x: N x Num_Sample
    spec = self.mel_conv(x)
    # spec: N x MelBin x MelTimeFrame
    spec = spec.unsqueeze(1) # add empty dimension in 1st dim
    # spec: N x 1(Channel) x MelBin x MelTimeFrame
    out = self.conv(spec)
    # out = self.conv2(out)
    # out: N x C x Freq x Time
    # Global pooling in Time axis
    out = torch.max(out, dim=-1)[0]
    out = out.flatten(1,)
    out = self.proj(out)

    return out

model = CNNModel(num_out=len(dataset.vocab))

out = model(audios.cpu())
prob = out.sigmoid()
prob

tensor([[0.4939, 0.4958, 0.4881,  ..., 0.4965, 0.4971, 0.4964],
        [0.4926, 0.4959, 0.4872,  ..., 0.4968, 0.4969, 0.4963],
        [0.4944, 0.4959, 0.4876,  ..., 0.4959, 0.4975, 0.4965],
        ...,
        [0.4931, 0.4959, 0.4869,  ..., 0.4969, 0.4971, 0.4966],
        [0.4924, 0.4963, 0.4869,  ..., 0.4968, 0.4973, 0.4960],
        [0.4930, 0.4960, 0.4875,  ..., 0.4963, 0.4970, 0.4967]],
       grad_fn=<SigmoidBackward0>)

In [None]:
batch = next(iter(train_loader))

## Train the model

In [None]:
def get_bce_loss(pred, target):
  return -(target * torch.log(pred+1e-8) + (1-target)*torch.log(1-pred+1e-8)).mean()
get_bce_loss(prob, labels)

tensor(0.6910, grad_fn=<NegBackward0>)

In [None]:
train_loader = torch.utils.data.DataLoader(memory_dataset, batch_size=32, shuffle=True)
model = CNNModel(num_out=len(dataset.vocab))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epoch = 5
dev = 'cuda'
loss_record = []
model = model.to(dev)
for epoch in tqdm(range(num_epoch)):
  for batch in train_loader:
    audios, labels = batch
    audios = audios.to(dev)
    labels = labels.to(dev)
    logit = model(audios)
    prob = logit.sigmoid()
    loss = get_bce_loss(prob, labels)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    loss_record.append(loss.item())
  print(loss.item())


  0%|          | 0/5 [00:00<?, ?it/s]

0.24557463824748993
0.2205829918384552


KeyboardInterrupt: 

In [None]:
prob[0]

tensor([0.0757, 0.0451, 0.0418, 0.0190, 0.0128, 0.0116, 0.2372, 0.0798, 0.0161,
        0.2150, 0.0411, 0.0281, 0.0080, 0.0760, 0.0232, 0.0634, 0.1056, 0.0115,
        0.0280, 0.1184, 0.0582, 0.0784, 0.0207, 0.0973, 0.0902, 0.0044, 0.0233,
        0.0275, 0.0119, 0.0057, 0.0256, 0.0315, 0.0255, 0.0821, 0.0090, 0.0186,
        0.1782, 0.0654, 0.1078, 0.1058, 0.0731, 0.0669, 0.0229, 0.0360, 0.1110,
        0.0458, 0.0737, 0.0435, 0.0520, 0.0151], device='cuda:0',
       grad_fn=<SelectBackward0>)