In [None]:
# #Installing Relevant Packages
# !pip install gdown 
# !pip install mirdata --user
# !pip install madmom --user

# !pip install torchmetrics --user
# !pip install spleeter --user

In [1]:
import os
import mirdata
import torch
import torch.nn as nn
# from matplotlib import pyplot as plt
import random
import numpy as np
import librosa
import torchaudio
import madmom
from tqdm import tqdm as tqdm
import torchmetrics
import warnings
from torch.nn import TransformerEncoderLayer as torchTransformerEncoderLayer
import pickle

In [2]:
# ignore certain warnings
warnings.filterwarnings('ignore')

# set default figure size
# plt.rcParams['figure.figsize'] = (20, 6)

In [3]:
#Ensure deterministic behaviour
torch.backends.cudnn.deterministic = True
random.seed(hash("Setting random hash") % 2**32 - 1)
torch.manual_seed(hash("By removing stochasticity") % 2**32 - 1)
torch.cuda.manual_seed_all(hash("So runs are repeatable") % 2**32 - 1)
np.random.seed(hash("Improves reducibility") % 2**32 - 1)

In [4]:
working_dir = '/raid/home/niranjan20090/DL/'
device = torch.device("cuda:7" if torch.cuda.is_available() else "cpu")

print('Device:',device)

Device: cuda:7


In [5]:
sample_rate = 16000

config = dict(
    sample_rate = sample_rate,
    FPS = 15.625,    
)

In [6]:
# !mkdir dataset
# !mkdir models

In [7]:
# # #Small Dataset
# gtzan = mirdata.initialize('gtzan_genre', version='mini',data_home=os.path.join(working_dir,'dataset'))
# gtzan.download()
# print(len(gtzan.track_ids))

# track_ids = gtzan.track_ids
# print(gtzan.track(track_ids[42]))

In [8]:
# # https://drive.google.com/file/d//view?usp=share_link
# import gdown
# idx = "1nP9azGmjUCcigCN-wipIDFZuohzkolXL"
# out = "dataset/gtzan_mini.tar.gz"
# gdown.download(id=idx,output=out,quiet=False)
# !tar -xf dataset/gtzan_mini.tar.gz
# !mv gtzan_mini dataset/

In [6]:
#Large dataset
# use the following line to initialise the dataset (i.e. the full version without 'mini')
gtzan = mirdata.initialize('gtzan_genre')
gtzan.download()
print(len(gtzan.track_ids))

track_ids = gtzan.track_ids
print(gtzan.track(track_ids[42]))

INFO: Downloading ['all', 'tempo_beat_annotations'] to /raid/home/niranjan20090/mir_datasets/gtzan_genre
INFO: [all] downloading genres.tar.gz
INFO: /raid/home/niranjan20090/mir_datasets/gtzan_genre/gtzan_genre/genres.tar.gz already exists and will not be downloaded. Rerun with force_overwrite=True to delete this file and force the download.
INFO: [tempo_beat_annotations] downloading annot.zip
INFO: /raid/home/niranjan20090/mir_datasets/gtzan_genre/annot.zip already exists and will not be downloaded. Rerun with force_overwrite=True to delete this file and force the download.


1000
Track(
  audio_path="/raid/home/niranjan20090/mir_datasets/gtzan_genre/gtzan_genre/genres/classical/classical.00042.wav",
  beats_path=".../home/niranjan20090/mir_datasets/gtzan_genre/gtzan_tempo_beat-main/beats/gtzan_classical_00042.beats",
  genre="classical",
  tempo_path="...id/home/niranjan20090/mir_datasets/gtzan_genre/gtzan_tempo_beat-main/tempo/gtzan_classical_00042.bpm",
  track_id="classical.00042",
  audio: The track's audio

        Returns,
  beats: ,
  tempo: ,
)


In [10]:
# # https://drive.google.com/file/d/18_RfquqT15dDRPoglV8ZhV5qIbAliOpG/view?usp=share_link
# import gdown
# idx = "18_RfquqT15dDRPoglV8ZhV5qIbAliOpG"
# out = "dataset/gtzan_full.zip"
# gdown.download(id=idx,output=out,quiet=False)

# !unzip dataset/gtzan_full.zip

In [7]:
from sklearn.model_selection import train_test_split

tracks = gtzan.load_tracks()
train_files, test_files = train_test_split(list(tracks.keys()), test_size=0.3, random_state=1234)
val_files, test_files = train_test_split(test_files, test_size=0.5, random_state=1234)
len(train_files), len(val_files), len(test_files)

(700, 150, 150)

In [8]:
from torch.utils.data import Dataset
class GtzanDataset(Dataset):
    def __init__(self,track_ids,device,sample_rate,fps,data_path):
        self.track_ids = []
        self.device = device
        self.sample_rate = sample_rate
        self.fps = fps
        self.duration = 7
        self.data_path = data_path
        for track_id in track_ids:
            track = gtzan.track(track_id)
            try:
                beats = track.beats.times
                downbeats = track.beats.positions.astype(int) == 1
            except AttributeError:
                continue
            self.track_ids.append(track_id)

                
    def __len__(self):
        return len(self.track_ids)
    
    def __getitem__(self,idx):
        track_id = self.track_ids[idx]
        track = gtzan.track(track_id)
        audios = np.load(os.path.join(data_path,f'{track_id}.npy'))
        
        n_frames = int(self.fps*self.duration)*4
        beats = track.beats.times
        downbeats = track.beats.positions.astype(int) == 1
        downbeats = track.beats.times[downbeats]
        
        beats = track.beats.times[track.beats.positions.astype(int) != 1] #CE
        
        beats = madmom.utils.quantize_events(beats,fps=self.fps,length=n_frames)
        downbeats = madmom.utils.quantize_events(downbeats,fps=self.fps,length=n_frames)
        nonbeats = (beats == 0)&(downbeats == 0)
                
        beats = beats.reshape(beats.shape[0],1)
        downbeats = downbeats.reshape(downbeats.shape[0],1)
        nonbeats = nonbeats.reshape(nonbeats.shape[0],1)
        
        truths = np.hstack([beats,downbeats,nonbeats])
        
#         feats = torch.from_numpy(feats).to(device).type(torch.FloatTensor)
        truths = torch.from_numpy(truths).to(device).type(torch.FloatTensor)       

        return {
            'audio': [audios[:,i*self.duration*self.sample_rate:(i+1)*self.duration*self.sample_rate] for i in range(4)],
            'targets': truths,
            'id': track_id,
        }

In [9]:
# data_path = '/raid/home/niranjan20090/DL/dataset/gtzan_mini'
data_path = '/raid/home/niranjan20090/DL/dataset/gtzan_full'
train_dataset = GtzanDataset(track_ids=train_files,device=device,
                             sample_rate=config['sample_rate'],fps=config['FPS'],data_path=data_path)
val_dataset = GtzanDataset(track_ids=val_files,device=device,
                           sample_rate=config['sample_rate'],fps=config['FPS'],data_path=data_path)
test_dataset = GtzanDataset(track_ids=test_files,device=device,
                            sample_rate=config['sample_rate'],fps=config['FPS'],data_path=data_path)

In [10]:
from torch.utils.data import DataLoader
config['batch_size'] = 2

train_loader = DataLoader(dataset=train_dataset,batch_size=config['batch_size'],shuffle=True,drop_last=True)
val_loader = DataLoader(dataset=val_dataset,batch_size=config['batch_size'],shuffle=False,drop_last=True)
test_loader = DataLoader(dataset=test_dataset,batch_size=config['batch_size'],shuffle=False,drop_last=True)

In [11]:
#Copied from https://github.com/MWM-io/SpecTNT-pytorch/blob/master/harmonicstft.py
def hz_to_midi(hz):
    return 12 * (torch.log2(hz) - np.log2(440.0)) + 69


def midi_to_hz(midi):
    return 440.0 * (2.0 ** ((midi - 69.0)/12.0))


def note_to_midi(note):
    return librosa.core.note_to_midi(note)


def hz_to_note(hz):
    return librosa.core.hz_to_note(hz)


def initialize_filterbank(sample_rate, n_harmonic, semitone_scale):
    # MIDI
    # lowest note
    low_midi = note_to_midi('C1')
    # highest note
    high_note = hz_to_note(sample_rate / (2 * n_harmonic))
    high_midi = note_to_midi(high_note)
    # number of scales
    level = (high_midi - low_midi) * semitone_scale
    midi = np.linspace(low_midi, high_midi, level + 1)
    hz = midi_to_hz(midi[:-1])
    # stack harmonics
    harmonic_hz = []
    for i in range(n_harmonic):
        harmonic_hz = np.concatenate((harmonic_hz, hz * (i+1)))
    return harmonic_hz, level


class HarmonicSTFT(nn.Module):
    """
    Trainable harmonic filters as implemented by Minz Won.
    
    Paper: https://ccrma.stanford.edu/~urinieto/MARL/publications/ICASSP2020_Won.pdf
    Code: https://github.com/minzwon/data-driven-harmonic-filters
    Pretrained: https://github.com/minzwon/sota-music-tagging-models/tree/master/training
    """

    def __init__(self,
                 sample_rate=16000,
                 n_fft=512,
                 win_length=None,
                 hop_length=None,
                 pad=0,
                 power=2,
                 normalized=False,
                 n_harmonic=6,
                 semitone_scale=2,
                 bw_Q=1.0,
                 learn_bw='only_Q',
                 checkpoint=None):
        super(HarmonicSTFT, self).__init__()

        # Parameters
        self.sample_rate = sample_rate
        self.n_harmonic = n_harmonic
        self.bw_alpha = 0.1079
        self.bw_beta = 24.7

        # Spectrogram
        self.spec = torchaudio.transforms.Spectrogram(n_fft=n_fft, win_length=win_length,
                                                      hop_length=hop_length, pad=pad,
                                                      window_fn=torch.hann_window,
                                                      power=power, normalized=normalized, wkwargs=None)
        self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB()

        # Initialize the filterbank. Equally spaced in MIDI scale.
        harmonic_hz, self.level = initialize_filterbank(
            sample_rate, n_harmonic, semitone_scale)

        # Center frequncies to tensor
        self.f0 = torch.tensor(harmonic_hz.astype('float32'))

        # Bandwidth parameters
        if learn_bw == 'only_Q':
            self.bw_Q = nn.Parameter(torch.tensor(
                np.array([bw_Q]).astype('float32')))
        elif learn_bw == 'fix':
            self.bw_Q = torch.tensor(np.array([bw_Q]).astype('float32'))

        if checkpoint is not None:
            state_dict = torch.load(checkpoint)
            hstft_state_dict = {k.replace('hstft.', ''): v for k,
                                v in state_dict.items() if 'hstft.' in k}
            self.load_state_dict(hstft_state_dict)

    def get_harmonic_fb(self):
        # bandwidth
        bw = (self.bw_alpha * self.f0 + self.bw_beta) / self.bw_Q
        bw = bw.unsqueeze(0)  # (1, n_band)
        f0 = self.f0.unsqueeze(0)  # (1, n_band)
        fft_bins = self.fft_bins.unsqueeze(1)  # (n_bins, 1)

        up_slope = torch.matmul(fft_bins, (2/bw)) + 1 - (2 * f0 / bw)
        down_slope = torch.matmul(fft_bins, (-2/bw)) + 1 + (2 * f0 / bw)
        fb = torch.max(self.zero, torch.min(down_slope, up_slope))
        return fb

    def to_device(self, device, n_bins):
        self.f0 = self.f0.to(device)
        self.bw_Q = self.bw_Q.to(device)
        # fft bins
        self.fft_bins = torch.linspace(0, self.sample_rate//2, n_bins)
        self.fft_bins = self.fft_bins.to(device)
        self.zero = torch.zeros(1)
        self.zero = self.zero.to(device)

    def forward(self, waveform):
        # stft
        spectrogram = self.spec(waveform)
        # to device
        self.to_device(waveform.device, spectrogram.size(1))
        # triangle filter
        harmonic_fb = self.get_harmonic_fb()
        harmonic_spec = torch.matmul(
            spectrogram.transpose(1, 2), harmonic_fb).transpose(1, 2)
        # (batch, channel, length) -> (batch, harmonic, f0, length)
        b, c, l = harmonic_spec.size()
        harmonic_spec = harmonic_spec.view(b, self.n_harmonic, self.level, l)
        # amplitude to db
        harmonic_spec = self.amplitude_to_db(harmonic_spec)
        return harmonic_spec

In [12]:
#Resnet Front-End
#Taken from https://github.com/MWM-io/SpecTNT-pytorch/blob/master/networks.py

class Res2DMaxPoolModule(nn.Module):
    def __init__(self, in_channels, out_channels, pooling=2):
        super(Res2DMaxPoolModule, self).__init__()
        self.conv_1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)
        self.bn_1 = nn.BatchNorm2d(out_channels)
        self.conv_2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.bn_2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.mp = nn.MaxPool2d(tuple(pooling))

        # residual
        self.diff = False
        if in_channels != out_channels:
            self.conv_3 = nn.Conv2d(
                in_channels, out_channels, 3, padding=1)
            self.bn_3 = nn.BatchNorm2d(out_channels)
            self.diff = True

    def forward(self, x):
        out = self.bn_2(self.conv_2(self.relu(self.bn_1(self.conv_1(x)))))
        if self.diff:
            x = self.bn_3(self.conv_3(x))
        out = x + out
        out = self.mp(self.relu(out))
        return out


class ResFrontEnd(nn.Module):
    """
    Adapted from Minz Won ResNet implementation.
    
    Original code: https://github.com/minzwon/semi-supervised-music-tagging-transformer/blob/master/src/modules.py
    """
    def __init__(self, in_channels, out_channels, freq_pooling, time_pooling):
        super(ResFrontEnd, self).__init__()
        self.input_bn = nn.BatchNorm2d(in_channels)
        self.layer1 = Res2DMaxPoolModule(
            in_channels, out_channels, pooling=(freq_pooling[0], time_pooling[0]))
        self.layer2 = Res2DMaxPoolModule(
            out_channels, out_channels, pooling=(freq_pooling[1], time_pooling[1]))
        self.layer3 = Res2DMaxPoolModule(
            out_channels, out_channels, pooling=(freq_pooling[2], time_pooling[2]))

    def forward(self, hcqt):
        """
        Inputs:
            hcqt: [B, F, K, T]
        Outputs:
            out: [B, ^F, ^K, ^T]
        """
        # batch normalization
        out = self.input_bn(hcqt)
        # CNN
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        return out
    

class SpecTNTBlock(nn.Module):
    def __init__(
        self, n_channels=128, n_frequencies=16, n_times=109,
        spectral_dmodel=64, spectral_nheads=4, spectral_dimff=64,
        temporal_dmodel=256, temporal_nheads=8, temporal_dimff=256,
        embed_dim=128, dropout=0.15, use_tct=False
    ):
        super().__init__()

        self.D = embed_dim
        self.F = n_frequencies
        self.K = n_channels
        self.T = n_times

        # TCT: Temporal Class Token
        if use_tct:
            self.T += 1

        # Shared frequency-time linear layers
        self.D_to_K = nn.Linear(self.D, self.K)
        self.K_to_D = nn.Linear(self.K, self.D)

        # Spectral Transformer Encoder
        self.spectral_linear_in = nn.Linear(self.F+1, spectral_dmodel)
        self.spectral_encoder_layer = nn.TransformerEncoderLayer(
            d_model=spectral_dmodel, nhead=spectral_nheads, dim_feedforward=spectral_dimff, dropout=dropout, batch_first=True, activation="gelu", norm_first=True)
        self.spectral_linear_out = nn.Linear(spectral_dmodel, self.F+1)

        # Temporal Transformer Encoder
        self.temporal_linear_in = nn.Linear(self.T, temporal_dmodel)
        self.temporal_encoder_layer = nn.TransformerEncoderLayer(
            d_model=temporal_dmodel, nhead=temporal_nheads, dim_feedforward=temporal_dimff, dropout=dropout, batch_first=True, activation="gelu", norm_first=True)
        self.temporal_linear_out = nn.Linear(temporal_dmodel, self.T)

    def forward(self, spec_in, temp_in):
        """
        Inputs:
            spec_in: spectral embedding input [B, T, F+1, K]
            temp_in: temporal embedding input [B, T, 1, D]
        Outputs:
            spec_out: spectral embedding output [B, T, F+1, K]
            temp_out: temporal embedding output [B, T, 1, D]
        """
        # Element-wise addition between TE and FCT
        spec_in = spec_in + \
            nn.functional.pad(self.D_to_K(temp_in), (0, 0, 0, self.F))

        # Spectral Transformer
        spec_in = spec_in.flatten(0, 1).transpose(1, 2)  # [B*T, K, F+1]
        emb = self.spectral_linear_in(spec_in)  # [B*T, K, spectral_dmodel]
        spec_enc_out = self.spectral_encoder_layer(
            emb)  # [B*T, K, spectral_dmodel]
        spec_out = self.spectral_linear_out(spec_enc_out)  # [B*T, K, F+1]
        spec_out = spec_out.view(-1, self.T, self.K,
                                 self.F+1).transpose(2, 3)  # [B, T, F+1, K]

        # FCT slicing (first raw) + back to D
        temp_in = temp_in + self.K_to_D(spec_out[:, :, :1, :])  # [B, T, 1, D]

        # Temporal Transformer
        temp_in = temp_in.permute(0, 2, 3, 1).flatten(0, 1)  # [B, D, T]
        emb = self.temporal_linear_in(temp_in)  # [B, D, temporal_dmodel]
        temp_enc_out = self.temporal_encoder_layer(
            emb)  # [B, D, temporal_dmodel]
        temp_out = self.temporal_linear_out(temp_enc_out)  # [B, D, T]
        temp_out = temp_out.unsqueeze(1).permute(0, 3, 1, 2)  # [B, T, 1, D]

        return spec_out, temp_out

In [13]:
#InSpecT module inspired from SpecTNT Module from https://github.com/MWM-io/SpecTNT-pytorch/blob/master/networks.py

class SpecTNTModule(nn.Module):
    def __init__(
        self, n_channels=128, n_frequencies=16, n_times=109,
        spectral_dmodel=64, spectral_nheads=4, spectral_dimff=64,
        temporal_dmodel=256, temporal_nheads=8, temporal_dimff=256,
        embed_dim=128, dropout=0.15, use_tct=False, n_block=1
    ):
        super().__init__()

        D = embed_dim
        F = n_frequencies
        K = n_channels
        T = n_times

        # Frequency Class Token
        self.fct = nn.Parameter(torch.zeros(1, T, 1, K))

        # Frequency Positional Encoding
        self.fpe = nn.Parameter(torch.zeros(1, 1, F+1, K))

        # TCT: Temporal Class Token
        if use_tct:
            self.tct = nn.Parameter(torch.zeros(1, 1, 1, D))
        else:
            self.tct = None

        # Temporal Embedding
        self.te = nn.Parameter(torch.rand(1, T, 1, D))

        # SpecTNT blocks
        self.spectnt_blocks = nn.ModuleList([
            SpecTNTBlock(
                n_channels,
                n_frequencies,
                n_times,
                spectral_dmodel,
                spectral_nheads,
                spectral_dimff,
                temporal_dmodel,
                temporal_nheads,
                temporal_dimff,
                embed_dim,
                dropout,
                use_tct
            )
            for _ in range(n_block)
        ])

    def forward(self, x):
        """
        Input:
            x: [B, T, F, K]
        Output:
            spec_emb: [B, T, F+1, K]
            temp_emb: [B, T, 1, D]
        """
        batch_size = len(x)

        # Initialize spectral embedding - concat FCT (first raw) + add FPE
        fct = torch.repeat_interleave(self.fct, batch_size, 0)  # [B, T, 1, K]
        spec_emb = torch.cat([fct, x], dim=2)  # [B, T, F+1, K]
        spec_emb = spec_emb + self.fpe
        if self.tct is not None:
            spec_emb = nn.functional.pad(
                spec_emb, (0, 0, 0, 0, 1, 0))  # [B, T+1, F+1, K]

        # Initialize temporal embedding
        temp_emb = torch.repeat_interleave(self.te, batch_size, 0)  # [B, T, 1, D]
        if self.tct is not None:
            tct = torch.repeat_interleave(self.tct, batch_size, 0)  # [B, 1, 1, D]
            temp_emb = torch.cat([tct, temp_emb], dim=1)  # [B, T+1, 1, D]

        # SpecTNT blocks inference
        for block in self.spectnt_blocks:
            spec_emb, temp_emb = block(spec_emb, temp_emb)
            
        return spec_emb, temp_emb

In [14]:
class InSpecTBlock(nn.Module):
    def __init__(self,config):
        super().__init__()
        self.spectnt_block = SpecTNTBlock()
        self.inst_attn = torchTransformerEncoderLayer(d_model=128, nhead=8, \
            dim_feedforward=1024, dropout=0.15, batch_first=True, activation="gelu", norm_first=True)
        self.n_inst = 3
    
    def forward(self,spec_embs, temp_embs):
        '''
        Input
            spec_embs = [B T F+1 ES I]
            temp_embs = [B T 1 ET I]
        Output
            spec_embs = [B T F+1 ES I]
            temp_embs = [B T 1 ET I]
        '''
        B,T,F,ES,ET = spec_embs.shape[0], spec_embs.shape[1], \
            spec_embs.shape[2]-1, spec_embs.shape[3], temp_embs.shape[3]
        t_spec_embs = []
        t_temp_embs = []
        for i in range(self.n_inst):
            spec_emb, temp_emb = self.spectnt_block(spec_embs[...,i],temp_embs[...,i])
            t_spec_embs.append(spec_emb)
            t_temp_embs.append(temp_emb)
        
        spec_embs = torch.stack(t_spec_embs,dim=4)
        temp_embs = torch.stack(t_temp_embs,dim=4)
        
        spec_embs = spec_embs.permute(0,1,2,4,3).reshape(-1,self.n_inst,ES) # [B*T*(F+1) I E]
        spec_embs = self.inst_attn(spec_embs)
        spec_embs = spec_embs.reshape(B,T,F+1,self.n_inst,ES).permute(0,1,2,4,3)
        
        temp_embs = temp_embs.permute(0,1,2,4,3).reshape(-1,self.n_inst,ET) # [B*T*1 I E]
        temp_embs = self.inst_attn(temp_embs)
        temp_embs = temp_embs.reshape(B,T,1,self.n_inst,ET).permute(0,1,2,4,3)
        return spec_embs, temp_embs

In [15]:
class InSpecT(nn.Module):
    def __init__(self, config, num_inspect_blocks=3):
        super().__init__()
        self.feature_extractor = HarmonicSTFT(config['sample_rate'])
        self.fe_model = ResFrontEnd(6,128,[2,2,2],[2,2,1])
        self.spectnt1 = SpecTNTModule(n_block=2)
        self.inspect_module = nn.ModuleList([InSpecTBlock(config) for i in range(num_inspect_blocks)])
        self.spectnt2 = SpecTNTModule(n_block=2)
        self.fc = nn.Linear(128,3)
        self.n_inst = 3
    
    def forward(self,audio):
        spec_embs = []
        temp_embs = []
        for i in range(self.n_inst):
            features = self.feature_extractor(audio[:,:,i])
            if len(features.size()) == 3:
                features = features.unsqueeze(1)
            fe_out = self.fe_model(features)
            fe_out = fe_out.permute(0,3,2,1)
            spec_emb, temp_emb = self.spectnt1(fe_out)
            spec_embs.append(spec_emb)
            temp_embs.append(temp_emb)
            
        spec_embs = torch.stack(spec_embs,dim=4)
        temp_embs = torch.stack(temp_embs,dim=4)
        for inspect_block in self.inspect_module:
            spec_embs, temp_embs = inspect_block(spec_embs,temp_embs)
        
        t_spec_embs = []
        t_temp_embs = []
        for i in range(self.n_inst):
            for block in self.spectnt2.spectnt_blocks:
                spec_emb, temp_emb = block(spec_embs[...,i],temp_embs[...,i])
            t_spec_embs.append(spec_emb)
            t_temp_embs.append(temp_emb)
        
        spec_embs = torch.stack(t_spec_embs,dim=4)
        temp_embs = torch.stack(t_temp_embs,dim=4)
        
        spec_emb = None
        temp_emb = None
        
        temp_embs = torch.relu(temp_embs)

        temp_embs = torch.mean(temp_embs,dim=4).squeeze()

        acts = self.fc(temp_embs)
        return acts


In [16]:
class ResBlock(nn.Module):
    def __init__(self,i,num_filters,kernel_size,padding,dropout_rate=0):
        super().__init__()
        self.i = i
        self.num_filters = num_filters
        self.kernel_size = kernel_size
        self.padding = padding
        self.dropout_rate = dropout_rate
        self.res_x = nn.Conv1d(in_channels=num_filters, out_channels=num_filters, kernel_size=1, padding='same', bias=False)
        self.conv_3 = nn.Conv1d(in_channels=num_filters, out_channels=num_filters, kernel_size=1, padding='same', bias=False)
        self.conv_1 = nn.Conv1d(in_channels=num_filters, out_channels=num_filters, kernel_size=kernel_size,dilation=i,padding='same')
        self.conv_2 = nn.Conv1d(in_channels=num_filters, out_channels=num_filters, kernel_size=kernel_size,dilation=2*i,padding='same')
        self.dp = nn.Dropout(p = dropout_rate)

    def forward(self,x):
        res_x = self.res_x(x)
        x_1 = self.conv_1(x)
        x_2 = self.conv_2(x)
        x_12 = torch.cat([x_1,x_2])
        x = torch.nn.ELU(x_12)
        x = self.dp(x)
        x = self.conv_3(x)
        res_x = x + res_x
        return res_x, x

class TCN:
    def __init__(
        self,
        num_filters=20,
        kernel_size=5,
        dilations=[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024],
        activation='elu',
        padding='same',
        dropout_rate=0.15
    ):
        super().__init__()
        self.dropout_rate = dropout_rate
        self.activation = activation
        self.dilations = dilations
        self.kernel_size = kernel_size
        self.num_filters = num_filters
        self.padding = padding
        self.dropout = nn.Dropout(p=dropout_rate)
        self.fc = nn.Linear(20,2)

        if padding != 'causal' and padding != 'same':
            raise ValueError("Only 'causal' or 'same' padding are compatible for this layer.")
        
        self.res_blocks = []
        for i in range(11):
            self.res_blocks.append(ResBlock(2**i, num_filters, self.kernel_size, self.padding, self.dropout_rate))
        


    def forward(self, inputs):
        x = inputs
        # gather skip connections, each having a different context
        # build the TCN models
        for i in range(11):
            # feed the output of the previous layer into the next layer
            # increase dilation rate for each consecutive layer
            x,_ = self.res_blocks[i](x)
            # collect skip connection
            skip_connections.append(skip_out)
        # activate the output of the TCN stack
        x = torch.nn.ELU(x)
        x = self.dropout(x)
        x = self.fc(x)
        # merge the skip connections by simply adding them
        return x

In [17]:
class Hung(nn.Module):
    def __init__(self, config, num_inspect_blocks=3):
        super().__init__()
        self.feature_extractor = HarmonicSTFT(config['sample_rate'])
        self.fe_model = ResFrontEnd(6,256,[2,2,2],[2,2,1])
        self.spectnt1 = SpecTNTModule(
        n_channels=256, n_frequencies=16, n_times=109,
        spectral_dmodel=64, spectral_nheads=4, spectral_dimff=64,
        temporal_dmodel=256, temporal_nheads=8, temporal_dimff=256,
        embed_dim=128, dropout=0.15, use_tct=False, n_block=2)


        # self.inspect_module = nn.ModuleList([InSpecTBlock(config) for i in range(num_inspect_blocks)])
        self.spectnt2 = SpecTNTModule(
        n_channels=256, n_frequencies=16, n_times=109,
        spectral_dmodel=64, spectral_nheads=4, spectral_dimff=64,
        temporal_dmodel=256, temporal_nheads=8, temporal_dimff=256,
        embed_dim=128, dropout=0.15, use_tct=False, n_block=3)
        self.fc = nn.Linear(128,3)
        self.n_inst = 1
    
    def forward(self,audio):
        spec_embs = []
        temp_embs = []
        for i in range(self.n_inst):
            features = self.feature_extractor(audio[:,:,i])
            if len(features.size()) == 3:
                features = features.unsqueeze(1)
            fe_out = self.fe_model(features)
            fe_out = fe_out.permute(0,3,2,1)
            spec_emb, temp_emb = self.spectnt1(fe_out)
            spec_embs.append(spec_emb)
            temp_embs.append(temp_emb)
            
        spec_embs = torch.stack(spec_embs,dim=4)
        temp_embs = torch.stack(temp_embs,dim=4)
        # for inspect_block in self.inspect_module:
            # spec_embs, temp_embs = inspect_block(spec_embs,temp_embs)
        
        t_spec_embs = []
        t_temp_embs = []
        for i in range(self.n_inst):
            for block in self.spectnt2.spectnt_blocks:
                spec_emb, temp_emb = block(spec_embs[...,i],temp_embs[...,i])
            t_spec_embs.append(spec_emb)
            t_temp_embs.append(temp_emb)
        
        spec_embs = torch.stack(t_spec_embs,dim=4)
        temp_embs = torch.stack(t_temp_embs,dim=4)
        
        # spec_emb = None
        # temp_emb = None
        
        temp_embs = torch.mean(temp_embs,dim=4).squeeze()

        acts = self.fc(temp_embs)
        return acts


In [18]:
model = InSpecT(config).to(device)
# model = Hung(config).to(device)
# tcn_model = TCN()

In [19]:
class WeightedCELoss(torch.nn.Module):
    def __init__(self):
        super(WeightedCELoss, self).__init__()

    def forward(self, input, target):
        '''
            input : B x 3 x F
            target : B x 3 x F
        '''
        
        #Computing Class Frequency
        class_freqs = torch.sum(target, dim=(0, 2)) / (input.shape[0] * input.shape[2])
        weights = 1.0 / class_freqs
        #Normalizing Weights
        weights /= torch.sum(weights)
#         loss = nn.CrossEntropyLoss()(input.permute(0,2,1), target.permute(0,2,1))
        loss = nn.CrossEntropyLoss(weight=weights)(input, target)
        return loss

In [24]:
config['learning_rate'] = 1e-3
config['loss'] = nn.CrossEntropyLoss()
# config['loss'] = WeightedCELoss()
config['metrics'] = {
#     'MulticlassAccuracy':torchmetrics.classification.MulticlassAccuracy(3,average='none'),
    'MultilabelF1Score':torchmetrics.classification.MultilabelF1Score(3,average='none')
}
# config['optimizer'] = torch.optim.SGD(model.parameters(),lr=config['learning_rate'],momentum=0.9)
config['optimizer'] = torch.optim.Adam(model.parameters(),lr=config['learning_rate'])
config['epochs'] = 10
config['patience'] = 5
config['PATH'] = '/raid/home/niranjan20090/DL/models/'
config['model_name'] = 'celoss_full_numinst1'

In [20]:
def test(model,test_loader,config,tcn_model= None):
    metric_funs = config['metrics']
    sftmax = nn.Softmax(dim=1)
    
    loss_fun = config['loss']
    for metric in metric_funs:
        metric_funs[metric].reset()
    
    model.eval()
    running_loss = 0
    for metric in metric_funs:
        metric_funs[metric].reset()

    with torch.no_grad():
        for i, batch in enumerate(test_loader):
            track_splits = batch['audio']
            truths = batch['targets'].to(device).permute(0,2,1)
            output = []
            for i in range(len(track_splits)):
                track_splits[i] = track_splits[i].to(device).permute(0,2,1)
                output.append(model(track_splits[i]).permute(0,2,1))
            output = torch.cat(output,dim=2)
                
            # tracks = batch['audio'].to(device).permute(0,2,1)

            # output = model(tracks).permute(0,2,1)

            running_loss += loss_fun(output,truths).detach().cpu().item()

            op = output.detach().cpu()
            tr = truths.detach().cpu()
            
            for metric in metric_funs:
                metric_funs[metric].update(op,tr)
                
    
    loss = running_loss/len(val_loader)
    metrics = {}
    for metric in metric_funs:
        metrics[metric] = metric_funs[metric].compute()
        
    return loss,metrics

In [26]:
def train(model,train_loader,val_loader,config):
    running_loss = 0
    epochs = config['epochs']
    loss_fun = config['loss']
    optimizer = config['optimizer']
    batch_size = config['batch_size']
    metric_funs = config['metrics']
    PATH = config['PATH']
    
    sftmax = nn.Softmax(dim=1)
    
    best_val_loss = None
    best_model = None
    best_val_loss_epoch = -1
    
    for e in range(epochs):
        model.train()
        running_loss = 0        
        
        for metric in metric_funs:
            metric_funs[metric].reset()
        
        for batch in tqdm(train_loader,desc=f'Epoch: {e+1}/{epochs}'):
            track_splits = batch['audio']
            truths = batch['targets'].to(device).permute(0,2,1)
            output = []
            for i in range(len(track_splits)):
                track_splits[i] = track_splits[i].to(device).permute(0,2,1)
                output.append(model(track_splits[i]).permute(0,2,1))
            output = torch.cat(output,dim=2)

            loss = loss_fun(output,truths)
            op = output.detach().cpu()
            tr = truths.detach().cpu()
            
            for metric in metric_funs:
                metric_funs[metric].update(op,tr)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.detach().cpu().item()
        
        training_loss = running_loss/len(train_loader)
        training_metrics = {}
        for metric in metric_funs:
            training_metrics[metric] = metric_funs[metric].compute()
        
        #Validation
        validation_loss, validation_metrics = test(model,val_loader,config)
        
        print(f'Training Loss: {training_loss :.5f} | Validation Loss: {validation_loss :.5f}')
        print('Training Metrics: ',end='')
        for training_metric in training_metrics:
            print(f'{training_metric} : {training_metrics[training_metric]}',end=' ')
        print('\nValidation Metrics: ',end='')
        for validation_metric in validation_metrics:
            print(f'{validation_metric} : {validation_metrics[validation_metric]}',end=' ')
        print()
        
        if best_val_loss is None or best_val_loss > validation_loss:
            print('Model Updated')
            best_val_loss = validation_loss
            best_val_loss_epoch = e
            best_model = model
            
        print('-'*100)
            
        if abs(e-best_val_loss_epoch) >= config['patience']:
            print('Early Stopping...')
            model = best_model
            break
    
    torch.save(best_model.state_dict(), os.path.join(PATH,f'{config["model_name"]}_{best_val_loss :.3f}.pth'))
    return best_model

In [27]:
model = train(model,train_loader,val_loader,config)

Epoch: 1/10: 100%|██████████| 347/347 [06:24<00:00,  1.11s/it]


Training Loss: 0.47776 | Validation Loss: 0.45830
Training Metrics: MultilabelF1Score : tensor([0.1183, 0.0120, 0.9300]) 
Validation Metrics: MultilabelF1Score : tensor([0.1577, 0.0000, 0.9328]) 
Model Updated
----------------------------------------------------------------------------------------------------


Epoch: 2/10: 100%|██████████| 347/347 [06:29<00:00,  1.12s/it]


Training Loss: 0.46546 | Validation Loss: 0.47096
Training Metrics: MultilabelF1Score : tensor([0.1570, 0.0045, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1649, 0.0036, 0.9328]) 
----------------------------------------------------------------------------------------------------


Epoch: 3/10: 100%|██████████| 347/347 [06:38<00:00,  1.15s/it]


Training Loss: 0.46481 | Validation Loss: 0.46350
Training Metrics: MultilabelF1Score : tensor([0.1653, 0.0288, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1707, 0.0249, 0.9328]) 
----------------------------------------------------------------------------------------------------


Epoch: 4/10: 100%|██████████| 347/347 [06:43<00:00,  1.16s/it]


Training Loss: 0.46245 | Validation Loss: 0.45497
Training Metrics: MultilabelF1Score : tensor([0.1714, 0.0142, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1686, 0.0000, 0.9328]) 
Model Updated
----------------------------------------------------------------------------------------------------


Epoch: 5/10: 100%|██████████| 347/347 [06:47<00:00,  1.17s/it]


Training Loss: 0.45923 | Validation Loss: 0.45248
Training Metrics: MultilabelF1Score : tensor([0.1729, 0.0157, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1661, 0.0000, 0.9328]) 
Model Updated
----------------------------------------------------------------------------------------------------


Epoch: 6/10: 100%|██████████| 347/347 [06:53<00:00,  1.19s/it]


Training Loss: 0.45807 | Validation Loss: 0.45577
Training Metrics: MultilabelF1Score : tensor([0.1731, 0.0064, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1709, 0.0000, 0.9328]) 
----------------------------------------------------------------------------------------------------


Epoch: 7/10: 100%|██████████| 347/347 [07:00<00:00,  1.21s/it]


Training Loss: 0.45829 | Validation Loss: 0.45289
Training Metrics: MultilabelF1Score : tensor([0.1731, 0.0159, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1712, 0.0000, 0.9328]) 
----------------------------------------------------------------------------------------------------


Epoch: 8/10: 100%|██████████| 347/347 [07:10<00:00,  1.24s/it]


Training Loss: 0.45767 | Validation Loss: 0.45267
Training Metrics: MultilabelF1Score : tensor([1.7315e-01, 2.0233e-04, 9.3182e-01]) 
Validation Metrics: MultilabelF1Score : tensor([0.1573, 0.0000, 0.9328]) 
----------------------------------------------------------------------------------------------------


Epoch: 9/10: 100%|██████████| 347/347 [06:46<00:00,  1.17s/it]


Training Loss: 0.45772 | Validation Loss: 0.45162
Training Metrics: MultilabelF1Score : tensor([0.1707, 0.0010, 0.9318]) 
Validation Metrics: MultilabelF1Score : tensor([0.1481, 0.0000, 0.9328]) 
Model Updated
----------------------------------------------------------------------------------------------------


Epoch: 10/10: 100%|██████████| 347/347 [06:39<00:00,  1.15s/it]


Training Loss: 2.07592 | Validation Loss: 0.52421
Training Metrics: MultilabelF1Score : tensor([0.1006, 0.0340, 0.9278]) 
Validation Metrics: MultilabelF1Score : tensor([0.0000, 0.0000, 0.9328]) 
----------------------------------------------------------------------------------------------------


In [22]:
model = InSpecT(config).to(device)
model.load_state_dict(torch.load("/raid/home/niranjan20090/DL/models/celoss_full_numinst1_0.452_final.pth"), strict=False)

<All keys matched successfully>

In [25]:

config['metrics'] = {
#     'MulticlassAccuracy':torchmetrics.classification.MulticlassAccuracy(3,average='none'),
    'MultilabelF1Score':torchmetrics.classification.MultilabelF1Score(3,average='none'),
    'Precision':torchmetrics.classification.MultilabelPrecision(3,average='none'),
    'Recall':torchmetrics.classification.MultilabelRecall(3,average='none'),
}
loss, metrics = test(model,test_loader,config)

print(f'Test Loss: {loss: .5f}')
print('Test Metrics: ',end='')
for metric in metrics:{
    print(f'{metric} : {metrics[metric]}',end=' ')}

Test Loss:  0.51726
Test Metrics: MultilabelF1Score : tensor([0.0000, 0.0000, 0.9328]) Precision : tensor([0.0000, 0.0000, 0.8741]) Recall : tensor([0., 0., 1.]) 

In [26]:
#Generating annotations from truths
beat_annotations = {k : v.beats.times for k,v in tracks.items() if v.beats is not None}
downbeat_annotations = {k : v.beats.times[v.beats.positions == 1] for k,v in tracks.items() if v.beats is not None}

In [27]:
#Getting detections over the test dataset.
def get_detections(model,loader,beat_tracker,downbeat_tracker):
    model.eval()
    sftmax = nn.Softmax(dim=1)
    detections = {}
    
    with torch.no_grad():
         for batch in loader:
            track_splits = batch['audio']
            truths = batch['targets'].to(device).permute(0,2,1)
            output = []
            for i in range(len(track_splits)):
                track_splits[i] = track_splits[i].to(device).permute(0,2,1)
                output.append(model(track_splits[i]).permute(0,2,1))
            output = torch.cat(output,dim=2)
            ids = batch['id']

            preds = sftmax(output).cpu().detach().numpy()
            for i in range(len(ids)):
                beats_act = preds[i][0]
                beats = beat_tracker(beats_act)
                downbeats = downbeat_tracker(preds[i][:2].T)
                detections[ids[i]] = {'beats': beats, 'downbeats': downbeats}
    return detections

In [28]:
from madmom.features.beats import DBNBeatTrackingProcessor
from madmom.features.downbeats import DBNDownBeatTrackingProcessor

#DBN Beat Tracking Processor from [Bock et. al ISMIR 2016]
beat_tracker = DBNBeatTrackingProcessor(
    min_bpm=55.0,max_bpm=215.0,fps=config['FPS'],transition_lambda=100,threshold = 0.05
)

#DBN DownBeat Tracking Processor from [Bock et. al ISMIR 2016]
downbeat_tracker = DBNDownBeatTrackingProcessor(
    beats_per_bar = [3,4], min_bpm=55.0,max_bpm=215.0,fps=config['FPS'],transition_lambda=100
)

detections = get_detections(model,test_loader,beat_tracker,downbeat_tracker)

In [29]:
def evaluate_beats(detections, annotations):
    evals = []
    for key, det in detections.items():
        ann = annotations[key]
        e = madmom.evaluation.beats.BeatEvaluation(det, ann)
        evals.append(e)
    return madmom.evaluation.beats.BeatMeanEvaluation(evals)


def evaluate_downbeats(detections, annotations):
    evals = []
    for key, det in detections.items():
        ann = annotations[key]
        e = madmom.evaluation.beats.BeatEvaluation(det, ann, downbeats=True)
        evals.append(e)
    return madmom.evaluation.beats.BeatMeanEvaluation(evals)

In [30]:
beat_detections = {k: v['beats'] for k, v in detections.items()}
downbeat_detections = {k: v['downbeats'] for k, v in detections.items()}
# evaluate beats
print('Beat evaluation\n---------------')
print(' Beat tracker:', evaluate_beats(beat_detections, beat_annotations))
print(' Downbeat tracker:', evaluate_beats(downbeat_detections, beat_annotations))

# evaluate downbeats
print('\nDownbeat evaluation\n-------------------')
print(' Downbeat tracker:', evaluate_downbeats(downbeat_detections, downbeat_annotations))

Beat evaluation
---------------
 Beat tracker: mean for 148 files
  F-measure: 0.349 P-score: 0.397 Cemgil: 0.250 Goto: 0.000 CMLc: 0.003 CMLt: 0.014 AMLc: 0.055 AMLt: 0.229 D: 0.229 Dg: 0.001
 Downbeat tracker: mean for 148 files
  F-measure: 0.349 P-score: 0.397 Cemgil: 0.250 Goto: 0.000 CMLc: 0.003 CMLt: 0.014 AMLc: 0.055 AMLt: 0.229 D: 0.229 Dg: 0.001

Downbeat evaluation
-------------------
 Downbeat tracker: mean for 148 files
  F-measure: 0.088 P-score: 0.395 Cemgil: 0.063 Goto: 0.000 CMLc: 0.009 CMLt: 0.020 AMLc: 0.134 AMLt: 0.239 D: 1.062 Dg: 0.005


In [34]:
def predict_beat_downbeat(model,audios,postprocessor):
    model.eval()
    sftmax = nn.Softmax(dim=1)
    
    with torch.no_grad():
        preds = []
        for i in range(len(audios)):
            wav = torch.Tensor(audios[i]).unsqueeze(0).to(device).permute(0,2,1)
            pred = sftmax(model(wav)).cpu().detach().numpy()[:,:2]
            preds.append(pred)
        preds = np.concatenate(preds)
        output = postprocessor(preds)
        
    return output.squeeze()

In [53]:
import IPython
idx = 30
data = test_dataset[idx]
track = gtzan.track(data['id'])
audio = np.concatenate(data['audio'],axis=1)
num_samples = len(audio[0])
beats_per_bar = int(np.amax(track.beats.positions))
postprocessor = DBNDownBeatTrackingProcessor(beats_per_bar=beats_per_bar, fps=config['FPS'])

output = predict_beat_downbeat(model,data['audio'],postprocessor)
bss = np.hstack([track.beats.times.reshape(-1,1),track.beats.positions.reshape(-1,1)])
print('Predictions\tAnnotations')
print(np.hstack([output[:20],bss[:20]]).round(2))
# output = bss

Predictions	Annotations
[[ 0.19  2.    0.04  4.  ]
 [ 0.45  3.    0.69  1.  ]
 [ 0.7   4.    1.33  2.  ]
 [ 0.96  1.    1.97  3.  ]
 [ 1.22  2.    2.63  4.  ]
 [ 1.47  3.    3.28  1.  ]
 [ 1.73  4.    3.93  2.  ]
 [ 1.98  1.    4.57  3.  ]
 [ 2.24  2.    5.22  4.  ]
 [ 2.5   3.    5.87  1.  ]
 [ 2.75  4.    6.52  2.  ]
 [ 3.01  1.    7.17  3.  ]
 [ 3.26  2.    7.82  4.  ]
 [ 3.52  3.    8.47  1.  ]
 [ 3.78  4.    9.12  2.  ]
 [ 4.03  1.    9.77  3.  ]
 [ 4.29  2.   10.41  4.  ]
 [ 4.54  3.   11.07  1.  ]
 [ 4.8   4.   11.71  2.  ]
 [ 5.06  1.   12.36  3.  ]]


In [61]:
# from scipy.io import wavfile
# from scipy.signal import resample
# sample_rate = config['sample_rate']

# beat_sr, beat = wavfile.read('/raid/home/niranjan20090/DL/dataset/beat.wav')
# downbeat_sr, downbeat = wavfile.read('/raid/home/niranjan20090/DL/dataset/downbeat.wav')

# #Downsample to sample_rate
# beat = resample(beat,int(len(beat) * float(sample_rate)/beat_sr))
# beat = beat.astype(np.float32)/32768.0
# downbeat = resample(downbeat,int(len(downbeat) * float(sample_rate)/downbeat_sr))
# downbeat = downbeat.astype(np.float32)/32768.0

# beat_length_sample = beat.shape[0]
# downbeat_length_sample = downbeat.shape[0]

# audio_with_beats = audio[0]
# audio_with_beats_an = audio[0]
# beat_sequence = np.zeros(num_samples)

# for i in range(len(output)):
#     beat_start_sample = int(output[i][0]*sample_rate)
#     beat_position = output[i][1]
#     if beat_position == 1:
#         beat_end_sample = beat_start_sample + downbeat_length_sample
#         beat_sequence[beat_start_sample:beat_end_sample] = downbeat[:beat_sequence[beat_start_sample:beat_end_sample].shape[0]]
#     else:
#         beat_end_sample = beat_start_sample + beat_length_sample
#         beat_sequence[beat_start_sample:beat_end_sample] = beat[:beat_sequence[beat_start_sample:beat_end_sample].shape[0]]

# audio_scale_factor = (np.amax(beat_sequence) - np.amin(beat_sequence))/(np.amax(audio_with_beats)-np.amin(audio_with_beats))        
# audio_with_beats = audio_scale_factor*0.4*audio_with_beats + 0.5*beat_sequence
# beat_sequence = np.zeros(num_samples)

# for i in range(len(bss)):
#     beat_start_sample = int(bss[i][0]*sample_rate)
#     beat_position = bss[i][1]
#     if beat_position == 1:
#         beat_end_sample = beat_start_sample + downbeat_length_sample
#         beat_sequence[beat_start_sample:beat_end_sample] = downbeat[:beat_sequence[beat_start_sample:beat_end_sample].shape[0]]
#     else:
#         beat_end_sample = beat_start_sample + beat_length_sample
#         beat_sequence[beat_start_sample:beat_end_sample] = beat[:beat_sequence[beat_start_sample:beat_end_sample].shape[0]]

# audio_scale_factor = (np.amax(beat_sequence) - np.amin(beat_sequence))/(np.amax(audio_with_beats_an)-np.amin(audio_with_beats_an))        
# audio_with_beats_an = audio_scale_factor*0.4*audio_with_beats_an + 0.5*beat_sequence

# print('Audio')
# IPython.display.display(IPython.display.Audio(audio,rate=sample_rate))
# print('Audio with beats and downbeats (annotated)')
# IPython.display.display(IPython.display.Audio(audio_with_beats_an,rate=sample_rate))
# print('Audio with beats and downbeats (predicted)')
# IPython.display.display(IPython.display.Audio(audio_with_beats,rate=sample_rate))

In [62]:
# # song = test_dataset.track_ids[idx]
# import matplotlib.pyplot as plt
# det = detections[data['id']]
# # print(det)

# # track = tracks[song]
# # audio
# hop_length = 512

# spec = librosa.amplitude_to_db(np.abs(librosa.stft(audio[0], hop_length=hop_length)), ref=np.max)
# # librosa.display.specshow(spec, y_axis='log', sr=sr, hop_length=hop_length, x_axis='time')
# plt.plot(spec)
# plt.title(f'Log-frequency power spectrogram of track: "{data["id"]}"')
# # plt.colorbar(format="%+2.f dB")
# # plot annotations in the upper part
# plt.vlines(track.beats.times, hop_length * 2, sample_rate / 2, linestyles='dotted', color='w')
# plt.vlines(track.beats.times[track.beats.positions == 1], hop_length * 2, sr / 2, color='w')
# plt.text(7, hop_length * 1.65, 'Annotations (above)', color='w', fontsize=12)
# # plot detections in the lower part
# plt.vlines(det['downbeats'][:, 0], 0, hop_length, linestyles='dotted', color='w')
# plt.vlines(det['downbeats'][det['downbeats'][:, 1] == 1][:, 0], 0, hop_length, color='w')
# plt.text(7, hop_length * 1.1, 'Detections (below)', color='w', fontsize=12)
# plt.show()

: 