In [1]:
import numpy as np

In [2]:
import pandas as pd

In [3]:
import os

In [4]:
import torch
from torch import nn

In [5]:
from torch.nn.utils.rnn import pad_sequence

# Defining Dataseat

In [6]:
class MusicDataset(torch.utils.data.Dataset):   
    def __init__(self, np_file_paths, labels, seq_len=10000):
        self.seq_len = seq_len
        self.files = np_file_paths
        self.padder = torch.zeros(96, seq_len)
        self.labels = labels
#         self.labels = []
#         for i in range(len(self.files)):
#             label = np.random.randint(0, 10, size=15)
#             label[label > 8] = 0
#             label[label >= 1] = 1
#             self.labels.append(label)
#         for i in range(len(self.files)):
#             label = np.array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
#             self.labels.append(label)
    def __len__(self):
        return len(self.files)

#     def __getitem__(self, index):
#         return (self.X[index:index+self.seq_len], self.y[index+self.seq_len-1])
    def __getitem__(self, index):
        x = np.load(self.files[index])
        x = torch.from_numpy(x).float()
        x = x[:,:self.seq_len]
        x = pad_sequence([x.T, self.padder.T], padding_value=-90, batch_first=True)[0].T
#         input,label_ids,label
        item = {"input": x, "label_ids":[index], "labels": torch.tensor(self.labels[index])}
        return item
#         return x

In [7]:
from datasets import load_dataset

In [8]:
from datasets import disable_caching
disable_caching()

In [9]:
test_files = ['00/1164200.mp3',
 '00/12100.mp3',
 '00/1295900.mp3',
 '00/985000.mp3',
 '00/1398500.mp3',
 '00/1210700.mp3',
 '00/818600.mp3',
 '00/1339600.mp3',
 '00/506100.mp3',
 '00/390000.mp3',
 '00/16000.mp3',
 '00/1052800.mp3',
 '00/699100.mp3']

In [10]:
good_files = ['01/16101.mp3',
 '01/1052801.mp3',
 '01/12101.mp3',
 '01/1121101.mp3',
 '01/986601.mp3',
 '01/1125001.mp3',
 '01/1086601.mp3',
 '01/1219101.mp3',
 '01/759301.mp3',
 '01/1018801.mp3',
 '01/824301.mp3',
 '01/1167301.mp3',
 '01/1380601.mp3',
 '01/661601.mp3',
 '01/1398501.mp3',
 '01/390001.mp3',
 '01/80501.mp3',
 '01/1125401.mp3',
 '01/399201.mp3',
 '01/1210701.mp3',
 '01/554901.mp3',
 '01/292501.mp3',
 '01/842401.mp3',
 '01/1157701.mp3',
 '01/1245901.mp3',
 '01/1062501.mp3',
 '01/1189901.mp3',
 '01/1398801.mp3',
 '01/1357701.mp3',
 '01/1164201.mp3',
 '01/1396501.mp3',
 '01/1304001.mp3',
 '01/913701.mp3',
 '01/718301.mp3',
 '01/1381001.mp3',
 '01/1264201.mp3',
 '01/361701.mp3',
 '01/1420701.mp3',
 '01/1406401.mp3',
 '01/708401.mp3',
 '01/1009701.mp3',
 '01/846501.mp3',]

In [11]:
def files_to_labels(files):
    col_names = ['TRACK_ID',
     'ARTIST_ID',
     'ALBUM_ID',
     'PATH',
     'DURATION',
     'TAGS',
     'TAGS2',
     'TAGS3',
     'TAGS4',
     'TAGS5',
     'TAGS6',
     'TAGS7',
     'TAGS8',
     'TAGS9']
    MOODPATH = "/mnt/c/Users/aag12/Downloads/autotagging_moodtheme.tsv.txt"
    df = pd.read_csv(MOODPATH, sep='\t', names=col_names)
    df = df[df["PATH"].isin(files)]
    inds = {'fast': 0,
     'sexy': 1,
     'mellow': 2,
     'heavy': 3,
     'horror': 4,
     'travel': 5,
     'holiday': 6,
     'groovy': 7,
     'funny': 8,
     'retro': 9,
     'hopeful': 10,
     'powerful': 11,
     'cool': 12,
     'nature': 13,
     'game': 14}

    final_labels = []
    for i in range(len(df)):
        curr = np.zeros(len(inds))
        moods = list(df.iloc[i])[5:]
        for theme in moods:
            if type(theme) == str and "mood" in theme:
                check = theme.split("---")[-1]
                if check in inds:
                    curr[inds[check]] = 1
            else:
                pass

        final_labels.append(curr)
    return final_labels

In [12]:
train_labels = files_to_labels(good_files)

In [13]:
test_labels = files_to_labels(test_files)

In [14]:
train_np_files = [("/mnt/c/Users/aag12/Documents/subset_moodtheme/" + g).replace(".mp3", ".npy") for g in good_files]
test_np_files = [("/mnt/c/Users/aag12/Documents/subset_moodtheme/" + g).replace(".mp3", ".npy") for g in test_files]

In [15]:
import time

In [16]:
# SEQ_LEN = 10000
SEQ_LEN = 1000

In [17]:
train_dataset = MusicDataset(train_np_files, train_labels, seq_len=SEQ_LEN)
test_dataset = MusicDataset(test_np_files, test_labels, seq_len=SEQ_LEN)

# Defining Model

In [21]:
import torch
import torchaudio
from torch import nn
import torch.nn.functional as F

In [165]:
class Res2DMaxPoolModule(nn.Module):
    def __init__(self, input_channels, output_channels, pooling=2):
        super(Res2DMaxPoolModule, self).__init__()
        self.conv_1 = nn.Conv2d(input_channels, output_channels, 3, padding=1)
        self.bn_1 = nn.BatchNorm2d(output_channels)
        self.conv_2 = nn.Conv2d(output_channels, output_channels, 3, padding=1)
        self.bn_2 = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()
        self.mp = nn.MaxPool2d(pooling)

        # residual
        self.diff = False
        if input_channels != output_channels:
            self.conv_3 = nn.Conv2d(input_channels, output_channels, 3, padding=1)
            self.bn_3 = nn.BatchNorm2d(output_channels)
            self.diff = True

    def forward(self, x):
        out = self.bn_2(self.conv_2(self.relu(self.bn_1(self.conv_1(x)))))
        if self.diff:
            x = self.bn_3(self.conv_3(x))
        out = x + out
        out = self.mp(self.relu(out))
        return out


class ResFrontEnd(nn.Module):
    """
    Evaluation of CNN based Music Tagging.
    Won et al., 2020
    
    Note that, different from the original work, we only stack 3 convolutional layers instead of 7.
    After the convolution layers, we flatten the time-frequency representation to be a vector.
    """

    def __init__(self, conv_ndim, attention_ndim, nfreq, nharmonics=1):
        super(ResFrontEnd, self).__init__()
        self.input_bn = nn.BatchNorm2d(nharmonics)
        self.layer1 = Res2DMaxPoolModule(nharmonics, conv_ndim, pooling=(2, 2))
        self.layer2 = Res2DMaxPoolModule(conv_ndim, conv_ndim, pooling=(2, 2))
        self.layer3 = Res2DMaxPoolModule(conv_ndim, conv_ndim, pooling=(2, 1))
#         fc_ndim = nfreq // 2 // 2 // 2 * conv_ndim
#         self.fc = nn.Linear(fc_ndim, attention_ndim)

    def forward(self, hcqt):
        # batch normalization
        out = self.input_bn(hcqt)

        # CNN
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)

        # permute and channel control
#         b, c, f, t = out.shape
#         out = out.permute(0, 3, 1, 2)  # batch, time, conv_ndim, freq
#         out = out.contiguous().view(b, t, -1)  # batch, time, fc_ndim
#         out = self.fc(out)  # batch, time, attention_ndim
        return out

In [217]:
class MusicTaggingTransformer(nn.Module):
    def __init__(
        self,
        conv_ndim=16,
        n_mels=128,
        sample_rate=22050,
        n_fft=1024,
        f_min=0,
        f_max=11025,
        attention_ndim=256,
        attention_nheads=8,
        attention_nlayers=4,
        attention_max_len=512,
        dropout=0.1,
        n_seq_cls=1,
        n_token_cls=1,
    ):
        super(MusicTaggingTransformer, self).__init__()
        # Input preprocessing
#         self.spec = self.spec = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate,
#                                                                      n_fft=n_fft,
#                                                                      f_min=f_min,
#                                                                      f_max=f_max,
#                                                                      n_mels=n_mels,
#                                                                      power=2)
#         self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB()
        
        # Input embedding
        self.frontend = ResFrontEnd(conv_ndim, attention_ndim, n_mels)

        # Positional embedding
        self.pos_embedding = nn.Parameter(torch.randn(1, attention_max_len + 1, attention_ndim))
        self.cls_token = nn.Parameter(torch.randn(attention_ndim))

        # transformer
        self.transformer = nn.Transformer(
            attention_ndim,
            attention_nlayers,
            attention_nheads,
            attention_ndim // attention_nheads,
            attention_ndim * 4,
            dropout,
        )
        self.to_latent = nn.Identity()
        self.dropout = nn.Dropout(dropout)

        # projection for sequence classification
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(attention_ndim), nn.Linear(attention_ndim, n_seq_cls)
        )
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """
        Args:
            x (torch.Tensor): (batch, time)
        Returns:
            x (torch.Tensor): (batch, n_seq_cls)
        """
        # Input preprocessing
#         x = self.spec(x)
#         x = self.amplitude_to_db(x)
        x = x.unsqueeze(0)

        # Input embedding
#         print(x.shape)
        x = self.frontend(x)

        # Positional embedding with a [CLS] token
#         cls_token = self.cls_token.repeat(x.shape[0], 1, 1)
        print(x.shape)
        print(self.pos_embedding.shape)
#         print(cls_token.shape)
#         x = torch.cat((cls_token, x), dim=1)
        x += self.pos_embedding[:, : x.size(1)]
        x = self.dropout(x)

        # transformer
        x = self.transformer(x)

        # projection for sequence classification
        x = self.to_latent(x[:, 0])
        x = self.mlp_head(x)
        x = self.sigmoid(x)
        return x

In [218]:
class Attention(nn.Module):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.0):
        super().__init__()
        inner_dim = dim_head * heads
        self.heads = heads
        self.scale = dim_head ** -0.5

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
        self.to_out = nn.Sequential(nn.Linear(inner_dim, dim), nn.Dropout(dropout))

    def forward(self, x, mask=None):
        b, n, _, h = *x.shape, self.heads
        qkv = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), qkv)

        dots = torch.einsum('bhid,bhjd->bhij', q, k) * self.scale
        mask_value = -torch.finfo(dots.dtype).max

        if mask is not None:
            mask = F.pad(mask.flatten(1), (1, 0), value=True)
            assert mask.shape[-1] == dots.shape[-1], 'mask has incorrect dimensions'
            mask = mask[:, None, :] * mask[:, :, None]
            dots.masked_fill_(~mask, mask_value)
            del mask

        attn = dots.softmax(dim=-1)

        out = torch.einsum('bhij,bhjd->bhid', attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        out = self.to_out(out)
        return out


class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(
                nn.ModuleList(
                    [
                        Residual(
                            PreNorm(
                                dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout)
                            )
                        ),
                        Residual(PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))),
                    ]
                )
            )

    def forward(self, x, mask=None):
        for attn, ff in self.layers:
            x = attn(x, mask=mask)
            x = ff(x)
        return x

In [219]:
# class BetterMusicTaggingTransformer(nn.Module):
#     def __init__(
#         self,
#         conv_ndim=16,
#         attention_ndim=256,
#         attention_nheads=8,
#         attention_nlayers=4,
#         attention_max_len=512,
#         dropout=0.1,
#         n_seq_cls=1,
#         n_token_cls=1,
#     ):
#         super(MusicTaggingTransformer, self).__init__()
#         # Input preprocessing
#         self.convdownsampler = nn.Conv2d(1, 1, kernel_size=5, stride=3,padding=2)

#         # Positional embedding
#         self.pos_embedding = nn.Parameter(torch.randn(1, attention_max_len + 1, attention_ndim))
#         self.cls_token = nn.Parameter(torch.randn(attention_ndim))

#         # transformer
#         self.transformer = Transformer(
#             attention_ndim,
#             attention_nlayers,
#             attention_nheads,
#             attention_ndim // attention_nheads,
#             attention_ndim * 4,
#             dropout,
#         )
# #         attention_ndim=256
# #         attention_nheads=8
# #         attention_nlayers=4
# #         attention_max_len=512
# #         dropout=0.1
# #         trans = nn.Transformer(
# #                     d_model=attention_ndim,
# #                     nhead=attention_nheads,
# #                     num_encoder_layers=attention_nlayers,
# #         #             attention_ndim // attention_nheads,
# #         #             attention_ndim * 4,
# #         #             dropout,
# #                 )
#         self.to_latent = nn.Identity()
#         self.dropout = nn.Dropout(dropout)

#         # projection for sequence classification
#         self.mlp_head = nn.Sequential(
#             nn.LayerNorm(attention_ndim), nn.Linear(attention_ndim, n_seq_cls)
#         )
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, x):
#         """
#         Args:
#             x (torch.Tensor): (batch, time)
#         Returns:
#             x (torch.Tensor): (batch, n_seq_cls)
#         """
#         # Input preprocessing
#         x = self.spec(x)
#         x = self.amplitude_to_db(x)
#         x = x.unsqueeze(1)

#         # Input embedding
#         x = self.frontend(x)

#         # Positional embedding with a [CLS] token
#         cls_token = self.cls_token.repeat(x.shape[0], 1, 1)
#         x = torch.cat((cls_token, x), dim=1)
#         x += self.pos_embedding[:, : x.size(1)]
#         x = self.dropout(x)

#         # transformer
#         x = self.transformer(x)

#         # projection for sequence classification
#         x = self.to_latent(x[:, 0])
#         x = self.mlp_head(x)
#         x = self.sigmoid(x)
#         return x

In [220]:
m = MusicTaggingTransformer()

In [221]:
bo

tensor([[[-21.8261, -33.5147, -30.9810,  ..., -84.2810, -84.0638, -83.7539],
         [-24.3652, -16.3998, -22.2266,  ..., -22.0788, -26.8455, -26.1931],
         [ -1.6416,   9.6201,   2.6366,  ..., -13.9094, -10.7504,  -8.3305],
         ...,
         [ -9.7697, -12.6982, -13.0742,  ..., -15.4942, -17.7372, -15.5838],
         [-17.2892, -17.1960, -12.8223,  ..., -26.6157, -27.6857, -26.9247],
         [-14.8324, -15.5061, -14.3807,  ..., -26.3400, -28.4942, -28.8055]]],
       grad_fn=<UnsafeViewBackward0>)

In [222]:
bo.shape, train_dataset[0]['input'].unsqueeze(0).shape

(torch.Size([1, 40, 256]), torch.Size([1, 96, 1000]))

In [223]:
m(train_dataset[0]['input'].unsqueeze(0))

torch.Size([1, 16, 12, 250])
torch.Size([1, 513, 256])


RuntimeError: The size of tensor a (250) must match the size of tensor b (256) at non-singleton dimension 3

In [53]:
attention_ndim=256
attention_nheads=8
attention_nlayers=4
attention_max_len=512
dropout=0.1

In [61]:
trans = nn.Transformer(
            d_model=attention_ndim,
            nhead=attention_nheads,
            num_encoder_layers=attention_nlayers,
#             attention_ndim // attention_nheads,
#             attention_ndim * 4,
#             dropout,
        )

In [90]:
conv = nn.Conv2d(1, 1, kernel_size=5, stride=3,padding=2)

In [91]:
train_dataset[0]

{'input': tensor([[-75.1938, -62.6215, -69.7594,  ..., -11.2562,  -1.3265,   7.3618],
         [-81.3514, -68.9420, -75.0238,  ...,  -1.7271,  -1.1692,  11.6628],
         [-85.0144, -77.8768, -85.4316,  ...,   8.5874,  -4.7166,  10.7350],
         ...,
         [-90.0000, -88.7078, -86.6313,  ..., -30.7656, -32.4680, -26.8488],
         [-90.0000, -90.0000, -90.0000,  ..., -38.2618, -40.8892, -37.0017],
         [-90.0000, -90.0000, -90.0000,  ..., -54.0475, -58.1458, -55.8609]]),
 'label_ids': [0],
 'labels': tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
        dtype=torch.float64)}

In [98]:
from torchvision.transforms import Compose, Resize, ToTensor

In [104]:
16 * 20

320

In [105]:
transform = Compose([Resize((32, 320)), ToTensor()])

In [106]:
train_dataset[0]['input'].unsqueeze(0).shape

torch.Size([1, 96, 1000])

In [107]:
o = conv(train_dataset[0]['input'].unsqueeze(0))
o.shape

torch.Size([1, 32, 334])

In [108]:
from einops import rearrange

In [110]:
o.shape

torch.Size([1, 32, 334])

In [112]:
o[:,:,:320]

tensor([[[-21.8261, -33.5147, -30.9810,  ...,  -3.1035,  -3.5209,  -0.4347],
         [-44.1574, -72.5931, -73.2856,  ...,   2.4495,   1.3631,   3.0209],
         [-49.0809, -78.1875, -81.8709,  ...,   7.3148,   5.7103,   9.7803],
         ...,
         [-49.0031, -84.2731, -83.1274,  ..., -16.0451, -17.5599, -22.2684],
         [-48.6362, -82.5790, -79.1320,  ..., -21.1351, -22.0427, -24.7242],
         [-49.4997, -83.3667, -82.4678,  ..., -26.3400, -28.4942, -28.8055]]],
       grad_fn=<SliceBackward0>)

In [96]:
patch_size = 16

In [115]:
# rearrange(o[:,:,:320].unsqueeze(0), 'b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size)
rearrange(o[:,:,:320], 'b (h s1) (w s2) -> b (h w) (s1 s2)', s1=patch_size, s2=patch_size).shape

torch.Size([1, 40, 256])

In [70]:
24 * 250

6000

In [57]:
train_dataset[0]['labels']

tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       dtype=torch.float64)

In [60]:
train_dataset[0]['labels'].unsqueeze(0).shape

torch.Size([1, 15])

In [125]:
bo = rearrange(o[:,:,:320], 'b (h s1) (w s2) -> b (h w) (s1 s2)', s1=patch_size, s2=patch_size)
bl = train_dataset[0]['labels'].unsqueeze(0).unsqueeze(0)

In [126]:
bo.shape, bl.shape

(torch.Size([1, 40, 256]), torch.Size([1, 1, 15]))

# DUMB TESTING

In [129]:
from vit_pytorch import ViT

In [137]:
v = ViT(
    image_size = 256,
    patch_size = 32,
    num_classes = 1000,
    dim = 1024,
    depth = 6,
    heads = 16,
    mlp_dim = 2048,
    dropout = 0.1,
    emb_dropout = 0.1
)

img = torch.randn(256, 256)
img = torch.tensor([[img, img, img]])

preds = v(img) # (1, 1000)

ValueError: only one element tensors can be converted to Python scalars

In [149]:
img = torch.stack([img, img, img], dim=0).unsqueeze(0)

preds = v(img) # (1, 1000)

In [151]:
preds.shape

torch.Size([1, 1000])

In [152]:
bo.shape, bl.shape

(torch.Size([1, 40, 256]), torch.Size([1, 1, 15]))

In [128]:
trans(bo, bl)

RuntimeError: the batch number of src and tgt must be equal

In [None]:
self.conv_1 = nn.Conv2d(input_channels, output_channels, 3, padding=1)
self.bn_1 = nn.BatchNorm2d(output_channels)

In [153]:
from transformers import BertTokenizer, BertForSequenceClassification

In [162]:
from transformers import AutoModelForSequenceClassification

model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", num_labels=15)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at b

In [163]:
model

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(28996, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0): BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, element