In [1]:
import numpy as np
import copy
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchsummary import summary
import matplotlib.pyplot as plt
import librosa
import librosa.display
from tqdm import tqdm
import json
import os
import pickle
import sys
from pathlib import Path
from sklearn.model_selection import train_test_split
import soundfile as sf
from utils import GRU

## Creating the chunks

In [2]:
SR = 44100
HOP = 256
FRAMES = 6
BATCH_SIZE = 256

In [3]:
chord_detector = GRU()
chord_detector.load_state_dict(torch.load('./models/chord_detector.pth'))
chord_detector.eval()

GRU(
  (gru): GRU(12, 256, num_layers=2, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=512, out_features=12, bias=True)
)

In [4]:
class MelChordDataset(Dataset):
    def __init__(
            self, 
            data_location = "../../../../Music Technology/Datasets/musdb18hq/",
            out_location = "../../../../Music Technology/Datasets/musdb18hq/",
            frames_per_chord = 6,
            train = True,
            write_data = False
        ):
        super(MelChordDataset).__init__()
        if write_data:
            self._write_chords_and_audio(data_location, out_location, train)
        self.frames_per_chord = frames_per_chord
        vocals_y = []
        vocals_chroma = []
        chord_templates:dict = json.load(open('./chord_templates.json'))
        act_chord_data = []
        self.data_location= data_location+"chunks_vocal/"
        self.out_location= out_location+"chunks_chord/"
        if not train:
            self.data_location[:-1]+="_test/"
            self.out_location[:-1]+="_test/"

        for i in range(len(os.listdir(self.data_location))): # 100
            with open(self.out_location+"chord_"+str(i), "rb") as fp:
                chord_data = pickle.load(fp)
            act_chord_data.append(torch.Tensor(np.array([np.array(chord_templates[i]) for i in chord_data])))
            vocals_y.append(librosa.load(self.data_location + 'vocal_'+str(i)+'.wav', sr=SR)[0])
            vocals_chroma.append(torch.Tensor(librosa.feature.chroma_cens(y=vocals_y[-1], sr = SR, hop_length=HOP)).T)
        
        # act_chord_data[i]: Shape: (num_chords[i], 12)
        # vocals_chroma[i]: Shape: (num_frames[i], 12)
        # num_chords[i] = (num_frames[i] // frames_per_chord)

        self.data = []
        self._create_data(act_chord_data, vocals_chroma)
    
    def _create_data(self, chord_data, chroma_data):
        for (chroma, chords) in zip(chroma_data, chord_data):
            for i in range(0, chroma.shape[0]-self.frames_per_chord, self.frames_per_chord):
                block_chroma = chroma[i:i+self.frames_per_chord,:]
                block_chord = chords[i//self.frames_per_chord]
                if(block_chroma.any()):
                    self.data.append((block_chroma, block_chord))

    def _write_chords_and_audio(
            self, 
            data_location, 
            out_location, 
            train = True
        ):
        if train:
            data_location = data_location+"train/"
        else:
            data_location = data_location+"test/"
        folders = os.listdir(data_location)
        count = 0

        for folder in folders:
            if not os.path.isdir(data_location+folder):
                continue
            mixture_y, _ = librosa.load(data_location + '/' + folder + '/mixture.wav', sr=SR)
            vocals_y, _ = librosa.load(data_location + '/' + folder + '/vocals.wav', sr=SR)
            mixture_y = mixture_y/np.max(np.abs(mixture_y))
            vocals_y = vocals_y/np.max(np.abs(vocals_y))

            mixture_chroma = torch.Tensor(librosa.feature.chroma_cens(y=mixture_y, sr = SR, hop_length=HOP)).T
            chunk_length = FRAMES
            nchunks = mixture_chroma.shape[0] // chunk_length # no padding

            if train:
                if not os.path.isdir(out_location+'chunks_chord'):
                    os.mkdir(out_location+'chunks_chord')
                if not os.path.isdir(out_location+'chunks_vocal'):
                    os.mkdir(out_location+'chunks_vocal')
            else:
                if not os.path.isdir(out_location+'chunks_chord_test/'):
                    os.mkdir(out_location+'chunks_chord_test')
                if not os.path.isdir(out_location+'chunks_vocal_test/'):
                    os.mkdir(out_location+'chunks_vocal_test')

            # Get chords from mixture chroma
            chord_stack, time = MelChordDataset.prediction(chord_detector, mixture_chroma)
            frame_num = np.array([int(i/((HOP/SR)*6)) for i in time])
            chord_stack = np.array([frame_num, chord_stack]).T
            chords = []
            for prev, curr in zip(chord_stack[:-1], chord_stack[1:]):
                frame_diff = int(curr[0]) - int(prev[0])
                chords.extend([prev[1] for _ in range(frame_diff)])
            chords.extend([chord_stack[-1][1] for _ in range(nchunks - len(chords))])

            if train:
                with open(out_location+"chunks_chord/chord_"+str(count), "wb") as fp:
                    pickle.dump(chords, fp)
                sf.write(out_location + 'chunks_vocal/vocal_' + str(count)+'.wav', vocals_y, SR)
            else:
                with open(out_location+"chunks_chord_test/chord_"+str(count), "wb") as fp:
                    pickle.dump(chords, fp)
                sf.write(out_location + 'chunks_vocal_test/vocal_' + str(count)+'.wav', vocals_y, SR)
            count+=1

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        return self.data[index]
    
    @staticmethod
    def predict(model, audio, chroma_req = True, chord_templates:dict = json.load(open('./chord_templates.json')), sr = SR, hop = HOP):
        if chroma_req:
            chroma = torch.Tensor(librosa.feature.chroma_cens(y=audio, sr = sr, hop_length=hop)).T.unsqueeze(0)
        else:
            chroma = audio
        with torch.no_grad():
            outputs = nn.functional.softmax(model(chroma), 1)[0]
        min_val = 120
        min_key = ''
        for key, val in chord_templates.items():
            out = torch.norm(torch.Tensor(val) - outputs)
            if min_val >= out:
                min_val = out
                min_key = key
        return min_key
    
    @staticmethod
    def prediction(model, chroma, frame = 6):
        stack = []
        time = []
        model.eval()
        pred = MelChordDataset.predict(model, chroma[:frame, :].unsqueeze(0), False)
        prev_pred = pred
        dur = 1
        main_sub = 0
        for i in tqdm(range(frame, chroma.shape[0]-frame+1, frame)):
            model.eval()
            pred = MelChordDataset.predict(model, chroma[i:i+frame, :].unsqueeze(0), False)
            if(pred != prev_pred):
                if(dur>10):
                    if(len(stack)==0):
                        stack.append(prev_pred)
                    elif(stack[-1]==prev_pred):
                        dur = 0
                        prev_pred = pred
                        continue
                    else:
                        stack.append(prev_pred)
                    if len(time)!=0:
                        time.append((i)*HOP/SR - main_sub)
                    else:
                        main_sub = (i)*HOP/SR
                        time.append(0.0)
                dur = 0
                prev_pred = pred
            dur+=1
        return stack, time

In [5]:
# train_data = MelChordDataset(train = True, write_data = False)
# test_data = MelChordDataset(train = False, write_data = False)
# torch.save(train_data, './data/final/train_data.pt')
# torch.save(test_data, './data/final/test_data.pt')

In [6]:
train_data = torch.load('./data/final/train_data.pt')
test_data = torch.load('./data/final/test_data.pt')

In [7]:
print(len(train_data))
print(len(test_data))

576514
297423


In [8]:
train_loader = DataLoader(
    train_data,
    BATCH_SIZE,
    shuffle=False
)
test_loader = DataLoader(
    test_data,
    BATCH_SIZE,
    shuffle=False
)

In [9]:
# class GRU(nn.Module):
#     def __init__(self, input_size = 12, hidden_size = 64, num_layers = 1, num_classes = 12, bidirectional = True) -> None:
#         super(GRU, self).__init__()
#         self.num_layers = num_layers
#         self.hidden_size = hidden_size
#         self.bidirectional = bidirectional

#         self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first = True, bidirectional=bidirectional)
#         if(bidirectional):
#             self.fc = nn.Linear(hidden_size*2, num_classes)
#         else:
#             self.fc = nn.Linear(hidden_size, num_classes)

#     def forward(self, x):
#         if(self.bidirectional):
#             h0 = torch.zeros(2*self.num_layers, x.size(0), self.hidden_size)
#         else:
#             h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
#         out, _ = self.gru(x, h0)
#         out = out[:,-1,:] # Since we only want the output of the last cell
#         out = self.fc(out)
#         return(out)


In [22]:
class Predictor(nn.Module):
    def __init__(self):
        super(Predictor, self).__init__() # Transpose as well
        self.conv1 = nn.Conv2d(1, 2, (1,3), 1, 1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(2, 4, (1,3), 1, 1)
        self.relu2 = nn.ReLU()
        self.conv3 = nn.Conv2d(4, 8, (1,3))
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv2d(8, 12, (1,3))
        self.relu4 = nn.ReLU()
        self.FC = nn.Linear(12*16*2, 12)
        
    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.relu3(self.conv3(x))
        x = self.relu4(self.conv4(x))
        flat = torch.flatten(x, 1)
        h4 = self.FC(flat)
        return h4

In [24]:
device = torch.device('cpu')

In [25]:
model = Predictor().to(device)

In [26]:
summary(copy.deepcopy(model).to('cpu'), (1, 12, 6))
model = model.to('mps')

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1             [-1, 2, 14, 6]               8
              ReLU-2             [-1, 2, 14, 6]               0
            Conv2d-3             [-1, 4, 16, 6]              28
              ReLU-4             [-1, 4, 16, 6]               0
            Conv2d-5             [-1, 8, 16, 4]             104
              ReLU-6             [-1, 8, 16, 4]               0
            Conv2d-7            [-1, 12, 16, 2]             300
              ReLU-8            [-1, 12, 16, 2]               0
            Linear-9                   [-1, 12]           4,620
Total params: 5,060
Trainable params: 5,060
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.02
Params size (MB): 0.02
Estimated Total Size (MB): 0.04
-----------------------------------------------

In [27]:
num_epochs = 200
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 5e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 10, 0.9)

In [28]:
best_weights = copy.deepcopy(model.state_dict())
max = 0
val_acc = 0
train_acc = 0
for epoch in range(num_epochs):
    device = torch.device('cpu')
    model = model.to(device)
    model.train()
    for i, (chromas,chords) in tqdm(enumerate(train_loader)):
        chromas = torch.transpose(chromas,1,2).unsqueeze(1).to(device)
        chords = chords.to(device)

        preds = model(chromas)
        loss = criterion(preds, chords)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    scheduler.step() # Decaying learning rate per 25 epochs by 0.2 times
    print(f'Epoch {epoch+1}/{num_epochs}; Loss = {loss.item():.6f}; LR = {scheduler.get_last_lr()}')
    with torch.no_grad():
        n_samples = 0
        n_correct = 0
        model.eval()
        device = torch.device('cpu')
        model = model.to(device)
        for chromas, chords in tqdm(test_loader):
            chromas = torch.transpose(chromas,1,2).unsqueeze(1).to(device)
            chords = chords.to(device)
            pred_outputs1 = model(chromas)
            prediction = MelChordDataset.predict(model, chromas[0][0], chroma_req=False)
            print(prediction)
            sys.exit()
            # _, actual_preds1 = torch.max(pred_outputs1, 1)
            print(actual_preds1.shape)
            n_samples += chords.shape[0]
            n_correct += (actual_preds1 == chords).sum().item()
        val_acc = n_correct/n_samples * 100

        if (max <= (n_correct/n_samples * 100)):
            print('SAVED MODEL WEIGHTS')
            max = val_acc
            best_weights = copy.deepcopy(model.state_dict())

        n_samples = 0
        n_correct = 0
        
        for chromas, chords in train_loader:
            chromas = torch.transpose(chromas,1,2).unsqueeze(1).to(device) # From 128, 1, 28, 28 ---> 128, 784
            chords = chords.to(device)
            pred_outputs1 = model(chromas)
            _, actual_preds1 = torch.max(pred_outputs1, 1) # Returns value, index
            n_samples += chords.shape[0]
            n_correct += (actual_preds1 == chords).sum().item()
        train_acc = n_correct/n_samples * 100
    
    print(f'Train Accuracy: {train_acc:.2f}%')
    print(f'Dev Accuracy: {val_acc:.2f}%')
    print("-"*20)

0it [00:00, ?it/s]


RuntimeError: NNPACK SpatialConvolution_updateOutput failed