In [0]:
import torch
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(0))
print(device)

In [0]:
!wget https://homes.cs.washington.edu/~thickstn/media/musicnet.npz
!wget https://homes.cs.washington.edu/~thickstn/media/musicnet_metadata.csv


In [0]:
!sudo apt-get install sox libsox-dev libsox-fmt-all
!pip install git+git://github.com/pytorch/audio

In [0]:
import numpy as np
import cv2
import matplotlib
import matplotlib.pyplot as plt
import os
import shutil
import pickle
from google.colab import drive, files
import pandas as pd
import torch
import librosa
from torch.utils.data import Dataset, DataLoader, random_split
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from torchvision.datasets import ImageFolder
import torchvision
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
from scipy.signal import spectrogram

In [0]:
fs = 44100
composers = ['Schubert', 'Beethoven', 'Brahms', 'Mozart', 'Bach']
segment_duration = 20
n_samples = 20
train_test_partition = 0.8
metadata_path = 'musicnet_metadata.csv'
dataset_path = 'musicnet.npz'

transform_params = {'sample_rate':fs, \
                    'n_fft':4096, \
                    'win_length':None, \
                    'hop_length':512, \
                    'f_min':0.0, \
                    'f_max':5000, \
                    'pad':0, \
                    'n_mels':128, \
                    }


In [0]:
class MusicnetComposers(Dataset):
    def __init__(self, csv_path, dataset_path, composers, segment_duration, n_samples, fs, transform):
        musicnet_metadata = pd.read_csv(csv_path) #Load metada file 
        with open(dataset_path, 'rb') as npz: #Load sound dataset
          musicnet_dataset = np.load(dataset_path, encoding = 'latin1', allow_pickle=True)


        self.transform = MelSpectrogram(sample_rate=fs, \
                                        n_fft = transform['n_fft'], \
                                        win_length = transform['win_length'], \
                                        hop_length = transform['hop_length'], \
                                        f_min = transform['f_min'], \
                                        f_max = transform['f_max'], \
                                        pad = transform['pad'], \
                                        n_mels = transform['n_mels'], \
                                        )

        #Allocate arrays
        #self.dataset = torch.zeros((len(composers)*n_samples, \
        #                            1, \
        #                         transform['n_mels'], \
        #                         np.ceil(segment_duration*fs/transform['hop_length']).astype('int64')\
        #                        ))
        self.dataset = torch.zeros((300, 1, 129, 3937))
        self.labels = torch.zeros(len(composers)*n_samples, dtype=int)

        for composer_id, composer in enumerate(composers): #Do for each composer

          
          composerData = musicnet_metadata.composer == composer #Locate the data of a composer
          composerMetadata = musicnet_metadata.loc[composerData] #Extract data related to a composer
          composer_data = [] #Temporary sound list

          for row in composerMetadata.itertuples(): #Do for each music of a composer
            
            #Extract information related to a music
            id = str(row.id)
            duration = row.seconds
            sound, _ = musicnet_dataset[id]
            n_splits = np.floor(duration/segment_duration).astype('int64')

            for i in range(n_splits): #Do for each segment of a music
              start = i*fs*segment_duration #Starting time of a music segment
              end = (i+1)*fs*segment_duration #Ending time of a music segment
              segment = sound[start:end]   #Extract a segment
              #segment = torch.Tensor(segment).reshape(1,-1)
              #segment = self.transform(segment)
              _, _, segment = spectrogram(segment, fs)
              #segment = AmplitudeToDB('power', top_db = 80)(segment)
              #segment = segment - 10 * torch.log10(torch.max(segment))
              segment = np.log(segment)
              segment = torch.Tensor(segment).unsqueeze(0)
              composer_data.append(segment)
              if len(composer_data)>=n_samples: break
            if len(composer_data)>=n_samples: break
          #Create a random index to select n_samples many samples for a composer
          index = np.arange(0, len(composer_data), 1)
          index = np.random.choice(index, size=n_samples, replace=False)
          #index = np.arange(0, n_samples, 1)
          composer_data = [composer_data[i] for i in index]

          #Create index vector to add a composer data to MusicnetComposers dataset
          index = np.arange(composer_id*n_samples, (composer_id+1)*n_samples, 1)

          #Add a composer data to MusicnetComposers dataset
          self.dataset[index] = torch.stack(composer_data)
          self.labels[index] = composer_id
          print('Composer {} is done.'.format(composer))
        #Save hyper-parameters
        self.composers = composers
        self.segment_duration = segment_duration
        self.n_samples = n_samples
        self.fs = fs
        self.transform = transform

    def __getitem__(self, index):

        #Get sound from original Musicnet dataset and convert it to torch.Tensor
        #Also each sound sample needs to be of the form (channel, time), so reshape it

        #Obtain spectogram of each sound
      
        return self.dataset[index], self.labels[index]
    
    def __len__(self):
        return len(self.labels)

In [0]:
musicnet_metadata = pd.read_csv(metadata_path) #Load metada file 
with open(dataset_path, 'rb') as npz: #Load sound dataset
  musicnet_dataset = np.load(dataset_path, encoding = 'latin1', allow_pickle=True)

In [0]:
#Allocate arrays
#self.dataset = torch.zeros((len(composers)*n_samples, \
#                            1, \
#                         transform['n_mels'], \
#                         np.ceil(segment_duration*fs/transform['hop_length']).astype('int64')\
#                        ))
dataset = []
labels = []

for composer_id, composer in enumerate(composers): #Do for each composer

  
  composerData = musicnet_metadata.composer == composer #Locate the data of a composer
  composerMetadata = musicnet_metadata.loc[composerData] #Extract data related to a composer
  composer_data = [] #Temporary sound list

  for row in composerMetadata.itertuples(): #Do for each music of a composer
    
    #Extract information related to a music
    id = str(row.id)
    duration = row.seconds
    sound, _ = musicnet_dataset[id]

    _, _, sound = spectrogram(sound, fs)
    sound = np.log(sound)
    sound = torch.Tensor(sound).unsqueeze(0)
    composer_data.append(sound)
    if len(composer_data)>=n_samples: break
  #Create a random index to select n_samples many samples for a composer
  index = np.arange(0, len(composer_data), 1)
  index = np.random.choice(index, size=n_samples, replace=False)
  composer_data = [composer_data[i] for i in index]

  #Create index vector to add a composer data to MusicnetComposers dataset
  index = np.arange(0, n_samples, 1)
  print(index)

  
  [dataset.append(composer_data[i]) for i in index]
  print(composer_id)
  print(labels)
  [labels.append(composer_id) for i in index]
  print(labels)

  print('Composer {} is done.'.format(composer))

In [0]:
composerDataset = MusicnetComposers(metadata_path, \
                                    dataset_path, \
                                    composers, \
                                    segment_duration, \
                                    24, \
                                    fs, \
                                    transform_params \
                                    )


In [0]:
labels

In [0]:
train_len = int(train_test_partition * len(composers) * n_samples)
test_len = len(composers) * n_samples - train_len
train_set, test_set = random_split(composerDataset, [1200, 300])
train_loader = torch.utils.data.DataLoader(train_set, batch_size = 8, shuffle = True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 1, shuffle = True)

dataset_loader = torch.utils.data.DataLoader(composerDataset, batch_size = 8, shuffle = True)

In [0]:
model = torchvision.models.resnet34(pretrained=False, progress=True)
num_classes = 5
model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model.fc = nn.Linear(512, num_classes)

model = nn.Sequential(
    model,
    nn.Softmax(1)
)

model.train()
model.to(device)

In [0]:
optimizer = optim.SGD(model.parameters(), lr = 0.001, weight_decay = 0.0001)
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma = 0.1)

In [0]:
def train(model, epoch, dataset_loader):
    model.train()
    for batch_idx, (data, target) in enumerate(dataset_loader):
        optimizer.zero_grad()
        data = data.to(device)
        target = target.to(device)
        data = data.requires_grad_() #set requires_grad to True for training
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0: #print training stats
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss))

In [0]:
  model.train()
  for batch_idx, data in enumerate(dataset):
      target = torch.tensor(labels[batch_idx]).unsqueeze(0).float()
      data = data.unsqueeze(0).float()
      optimizer.zero_grad()
      data = data.to(device)
      target = target.to(device)
      data = data.requires_grad_() #set requires_grad to True for training
      output = model(data)
      loss = F.nll_loss(output, target)
      loss.backward()
      optimizer.step()
      if batch_idx % log_interval == 0: #print training stats
          print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(train_loader.dataset),
              100. * batch_idx / len(train_loader), loss))

In [0]:
data

In [0]:
!nvidia-smi



In [0]:
def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:
        data = data.to(device)
        target = target.to(device)
        output = model(data)[0]
        _, pred = output.max(0)
        correct += pred.eq(target).cpu().sum().item()
    print('\nTest set: Accuracy: {}/{} ({:.0f}%)\n'.format(
        correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [0]:
e = train(model, 1, train_loader)
e.shape

In [0]:
log_interval = 20
for epoch in range(1, 2):
    if epoch == 31:
        print("First round of training complete. Setting learn rate to 0.001.")
    #scheduler.step()
    train(model, epoch, dataset_loader)

In [0]:
idx = np.random.randint(0, len(test_set))

model.eval()
g, id = test_set[idx]
g = g.to(device)
print(id.item())
out, label = model(g.unsqueeze(0))[0].max(0)
print(out)
print(label.item())

In [0]:
test(model, 1)[0]

In [0]:
import librosa, librosa.display
idx = np.random.randint(0, len(composerDataset))
a = composerDataset[idx][0].squeeze()
#a = e[0].squeeze()
a.shape
a = a.squeeze()
plt.figure(figsize=(10, 4))
print(a.shape)
S_dB = AmplitudeToDB('power', top_db = 80)(a)
S_dB = S_dB.numpy()
#S_dB = librosa.power_to_db(a.numpy(), ref=np.max)
librosa.display.specshow(a.numpy(), x_axis='time', y_axis='mel', sr=fs)
plt.colorbar(format='%+2.0f dB')
plt.title('Mel-frequency spectrogram')
plt.tight_layout()
plt.show()