In [0]:
 #loading pytorch for our model and data preprocessing
# http://pytorch.org/
from os.path import exists
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision
import torch
import torchvision
import torch.nn as nn
import os

# We'll need numpy for some mathematical operations
import numpy as np

# matplotlib for displaying the output
import matplotlib.pyplot as plt
import matplotlib.style as ms
ms.use('seaborn-muted')
%matplotlib inline

# and IPython.display for audio output
import IPython.display

# Librosa for audio (need ffmpeg for audioread)
!apt-get update
!apt-get install ffmpeg
!pip install librosa

import librosa
# And the display module for visualization
import librosa.display

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [83.2 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.161)] [1 InRelease 2,587 B/83.2                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1710/x86_64  InRelease
0% [Waiting for headers] [1 InRelease 60.5 kB/83.2 kB 73%] [Waiting for headers                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [1 InRelease gpgv 83.2 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:3 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
                                                                               0% [1 InRelease gpgv 83.2 kB] [Waiting for headers] [Waiting for headers]                       

In [0]:
#mount google disk to download dataset to colab server
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [0]:
#root folder on google drive
root = '/gdrive/My Drive/Project/'
#genres for classification
genres = ['classical', 'rock', 'rap_hip_hop', 'blues', 'new_age', 'techno', 'jazz', 'folk', 'country', 
         'edm', 'reggae', 'heavy_metal', 'pop', 'drum_and_bass', 'house']
#main data features: sample rate and number of mels
sr=11025
n_mels=256

In [0]:
#load dataset https://drive.google.com/drive/folders/1g_s87T8xsZNz2pHfOIsQsFShNy7nHjoX?usp=sharing
             #https://drive.google.com/open?id=13IWMY7n21fNfXzh76WAoagFWqkM3-EaA
%%time

music = {}
duration = 2100

for genre in genres:
    music[genre] = librosa.load(root +'music_dataset/' + genre + '.mp3', duration=duration, sr=sr, res_type='kaiser_fast')

In [0]:
#split dataset on samples
%%time
samples = {}
for genre in music:
    samples[genre] = np.split(music[genre][0], indices_or_sections=duration//5)



In [0]:
#check sample
IPython.display.Audio(data=samples['rock'][1], rate=sr)

In [0]:
#data preprocessing
%%time
data = []
for genre in samples:
    for sample in samples[genre]:
        harmonic, percussive = librosa.effects.hpss(sample)
        
        p_mel = librosa.feature.melspectrogram(percussive, sr=sr, n_mels=n_mels//2),
        p_mel=p_mel[0]
        
        mfcc = librosa.feature.mfcc(sample, n_mfcc=n_mels)
        mfcc_delta = librosa.feature.delta(mfcc)
        
        chromagram = librosa.feature.chroma_cqt(y=harmonic, n_chroma=n_mels//2, sr=sr)
        
        data.append([chromagram, mfcc_delta, p_mel])
        
labels = []
for i in range(len(genres)):
    labels += [[i]] * (duration//5)
labels = np.array(labels)



CPU times: user 1h 19s, sys: 30min 57s, total: 1h 31min 17s
Wall time: 57min 53s


In [0]:
#split on test and train data
from sklearn import model_selection
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, labels, test_size=0.15)
np.array(X_train).shape

(6885, 3, 128, 108)

In [0]:
#main model for project
model = torchvision.models.resnet18()
model.fc = nn.Linear(in_features = 512 ,out_features = len(genres))
model.avgpool = nn.AvgPool2d(kernel_size=4, stride=1, padding=0)


In [0]:
tensor_x = torch.stack([torch.Tensor(i) for i in X_train]) # transform to torch tensors
tensor_y = torch.stack([torch.Tensor(i) for i in y_train]).long()

dataset = torch.utils.data.TensorDataset(tensor_x, tensor_y) # create your datset
dataloader = torch.utils.data.DataLoader(dataset, shuffle=True, batch_size=128) # create your dataloader

tensor_x_test = torch.stack([torch.Tensor(i) for i in X_test]) # transform to torch tensors
tensor_y_test = torch.stack([torch.Tensor(i) for i in y_test]).long()

dataset_test = torch.utils.data.TensorDataset(tensor_x_test, tensor_y_test) # create your datset
dataloader_test = torch.utils.data.DataLoader(dataset_test, batch_size=128) # create your dataloader

In [0]:
def accuracy(output,labels):
  predictions = torch.argmax(output,dim=1)
  correct = (predictions == labels).sum().cpu().numpy()
  return correct / len(labels)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(device)

cuda:0


In [0]:
#learning parameters
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-4)

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.94)

EPOCH_SIZE = 30

epoch_counter = 0

In [0]:
#training
model.train()
for epoch in range(3):
    epoch_counter+=1
    print(epoch_counter)
    lr_scheduler.step()
    for itr, data in enumerate(dataloader):
        samples = data[0].to(device)
        labels = data[1].squeeze(1).to(device)
        
        y_pred = model.forward(samples)
        
        optimizer.zero_grad()
        
        loss = loss_func(y_pred, labels)
        
        if itr%100 == 0:
          print('Iteration {}, train accuracy {:.2f}, loss {:.4f}'.format(itr+epoch*len(dataloader),accuracy(y_pred,labels),loss))

        loss.backward()
        
        optimizer.step()

25
Iteration 0, train accuracy 1.00, loss 0.0074
26
Iteration 54, train accuracy 1.00, loss 0.0015
27
Iteration 108, train accuracy 1.00, loss 0.0006


In [0]:
#test accuracy
model.eval()

with torch.no_grad():
  accuracy_list = []
  for itr,data in enumerate(dataloader_test):
    samples = data[0].to(device)
    labels = data[1].squeeze(1).to(device)
    
    y_pred = model.forward(samples)
    accuracy_list.append(accuracy(y_pred,labels))
acc = np.sum(accuracy_list)/len(accuracy_list)
print('Test accuracy - {:.2f}'.format(np.sum(accuracy_list)/len(accuracy_list)))

Test accuracy - 0.88


In [0]:
#saving model
torch.save(model, '/gdrive/My Drive/Project/ResNet18NDS.pth')

In [0]:
#loading model
model = torch.load('/gdrive/My Drive/Project/ResNet18withOtherPrep.pth')#AlexNet, ResNet18, IncV4, AlexNetwithOtherPrep, ResNet18withOtherPrep, IncResV2withOtherPrep
model.to(device)
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace)
      (conv2): Co

In [0]:
#distribute samples with different distribution for many samples predict
def sample_distribution(data_len, sample_rate, sample_len=5, step=5, distribution='equal', m = -1, d = -1):
  sample_list = []
  if distribution == 'equal':
    sample_list = np.array(range(2, data_len//(sample_rate*sample_len), step))
  elif distribution == 'normal':
    if m==-1:
      m = data_len//(sample_rate*sample_len*2)
    if d==-1:
      d = data_len//(sample_rate*sample_len*2)-2
    sample_list = np.round(np.random.normal(m, d, data_len//(sample_rate*sample_len*step)))
    np.maximum(sample_list, 2, out=sample_list)
    np.minimum(sample_list, data_len//(sample_rate*sample_len)-2, out=sample_list)
  elif distribution == 'uniform':
    sample_list = np.random.randint(2, data_len//(sample_rate*sample_len)-2, data_len//(sample_rate*sample_len*step))
  elif distribution:
    raise(BaseException)
  return list(map(int,sample_list.tolist()))

In [0]:
#generator for many samples predict

def music_predict(path, step=3, normalized=False, distribution = 'equal'):
    files = os.listdir(path)
    for song in files:
        data, sample_rate = librosa.load(path + song)
        sample_rate = sr
        estimates = {genre: 0 for genre in genres}
        sample_list = sample_distribution(len(data), sample_rate, distribution = distribution)
        
        for i in sample_list:
            sample = data[(i-1)*sample_rate*5: i*sample_rate*5]
            harmonic, percussive = librosa.effects.hpss(sample)
            
            p_mel = librosa.feature.melspectrogram(percussive, sr=sr, n_mels=n_mels//2),
            p_mel=p_mel[0]
            
            mfcc = librosa.feature.mfcc(sample, n_mfcc=n_mels)
            mfcc_delta = librosa.feature.delta(mfcc)
           
            chromagram = librosa.feature.chroma_cqt(y=harmonic, n_chroma=n_mels//2, sr=sr)
            
            transformed_data = [[chromagram, mfcc_delta, p_mel]]
            
            predict = dict(zip(genres, model.forward(torch.cuda.FloatTensor(transformed_data))[0].detach().cpu().numpy()))
            for genre in predict:
              estimates[genre] += predict[genre]
        if normalized:
            max_estimate, min_estimate = max(estimates.values()), min(estimates.values())
            sum_est = 0
            for genre in estimates:
                estimates[genre] = int((estimates[genre]-min_estimate) / (max_estimate-min_estimate) * 100)
                sum_est+=estimates[genre]
            for genre in estimates:
                estimates[genre]/=sum_est
                
        yield {song: estimates}

In [0]:
#loading validation dataset

def validation_load(path):
  music={}
  for genre in genres:
      g_path = path+genre+'/'
      files = os.listdir(g_path)
      songs = {}
      for song in files:
        data, sample_rate = librosa.load(g_path + song)
        songs[song] = data
      music[genre]=songs
  return music

In [0]:
#genereate the report based on equal sample distribution prediction
def predict_report(path, top=1, step=3):
  model_name = model.__class__.__name__
  total_files = 0
  total_true_pred = 0
  genres_acc = {}
  predict_dict = {}
  for genre in genres:
      g_path = path+genre+'/'
      true_genre = genre
      files = os.listdir(g_path)
      genre_files = len(files)
      total_files+=genre_files
      true_genre_pred = 0
      for song in files:

            data, sample_rate = librosa.load(g_path + song)
            sample_rate = sr
            estimates = {est_genre: 0 for est_genre in genres}
            sample_list = np.array(range(2, len(data)//(sr*5), step))
            for i in sample_list:
                sample = data[(i-1)*sample_rate*5: i*sample_rate*5]
                harmonic, percussive = librosa.effects.hpss(sample)
                
                p_mel = librosa.feature.melspectrogram(percussive, sr=sr, n_mels=n_mels//2),
                p_mel=p_mel[0]
                
                mfcc = librosa.feature.mfcc(sample, n_mfcc=n_mels)
                mfcc_delta = librosa.feature.delta(mfcc)
                
                chromagram = librosa.feature.chroma_cqt(y=harmonic, n_chroma=n_mels//2, sr=sr)
                
                transformed_data = [[chromagram, mfcc_delta, p_mel]]
                
                predict = dict(zip(genres, model.forward(torch.cuda.FloatTensor(transformed_data))[0].detach().cpu().numpy()))
                for pr_genre in predict:
                    estimates[pr_genre] += predict[pr_genre]
            predict_dict[song] = estimates
            res_list = sorted(estimates.items(), key=lambda kv: kv[1], reverse=True)
            if true_genre in list(map(lambda kv: kv[0], res_list[:top])): 
                true_genre_pred+=1
      genres_acc[true_genre] = true_genre_pred/genre_files
      
      total_true_pred+=true_genre_pred
  total_acc = total_true_pred/total_files
  table={'model':model_name, 'total_acc': total_acc, 'genres_acc': genres_acc, 'predict_dict': predict_dict}
  return(table)

In [0]:
#genereate same report without loading validation dataset
def predict_report_without_loading(music, top=1, step=3):
  model_name = model.__class__.__name__
  total_files = 0
  total_true_pred = 0
  genres_acc = {}
  predict_dict = {}
  for genre in genres:
      true_genre = genre
      files = music[genre].keys()
      genre_files = len(files)
      total_files+=genre_files
      true_genre_pred = 0
      for song in files:
            
            data = music[genre][song]
            sample_rate = sr
            estimates = {est_genre: 0 for est_genre in genres}
            sample_list = np.array(range(2, len(data)//(sr*5), step))
            for i in sample_list:
                sample = data[(i-1)*sample_rate*5: i*sample_rate*5]
                harmonic, percussive = librosa.effects.hpss(sample)
                
                p_mel = librosa.feature.melspectrogram(percussive, sr=sr, n_mels=n_mels//2),
                p_mel=p_mel[0]
                
                mfcc = librosa.feature.mfcc(sample, n_mfcc=n_mels)
                mfcc_delta = librosa.feature.delta(mfcc)
                
                chromagram = librosa.feature.chroma_cqt(y=harmonic, n_chroma=n_mels//2, sr=sr)
                
                transformed_data = [[chromagram, mfcc_delta, p_mel]]
                
                predict = dict(zip(genres, model.forward(torch.cuda.FloatTensor(transformed_data))[0].detach().cpu().numpy()))
                for pr_genre in predict:
                    estimates[pr_genre] += predict[pr_genre]
            predict_dict[song] = estimates
            res_list = sorted(estimates.items(), key=lambda kv: kv[1], reverse=True)
            if true_genre in list(map(lambda kv: kv[0], res_list[:top])): 
                true_genre_pred+=1
      genres_acc[true_genre] = true_genre_pred/genre_files
      
      total_true_pred+=true_genre_pred
  total_acc = total_true_pred/total_files
  table={'model':model_name, 'total_acc': total_acc, 'genres_acc': genres_acc, 'predict_dict': predict_dict}
  return(table)

In [0]:
#report with loading dataset https://drive.google.com/open?id=12IGHqmslVtACKRaqUfjOxICNDzTnl6bk
%%time

report = predict_report('/gdrive/My Drive/Project/Arture_validation/', step=5, top=3)
print(report)

In [0]:
#loading validation https://drive.google.com/open?id=12IGHqmslVtACKRaqUfjOxICNDzTnl6bk
%%time
val_music = validation_load('/gdrive/My Drive/Project/Arture_validation/')

CPU times: user 12min 27s, sys: 43.2 s, total: 13min 10s
Wall time: 13min 51s


In [0]:
#report without loading
%%time

report = predict_report_without_loading(val_music, step=5, top=3)
print(report)

{'model': 'ResNet', 'total_acc': 0.48936170212765956, 'genres_acc': {'classical': 1.0, 'rock': 0.6666666666666666, 'rap_hip_hop': 1.0, 'blues': 0.5, 'new_age': 0.5, 'techno': 0.25, 'jazz': 0.0, 'folk': 0.5, 'country': 0.3333333333333333, 'edm': 0.0, 'reggae': 0.0, 'heavy_metal': 0.0, 'pop': 0.5, 'drum_and_bass': 0.875, 'house': 0.0}, 'predict_dict': {'chaykovskiy-petr-ilich_-_vals-cvetov.Classic.mp3': {'classical': 284.71605655550957, 'rock': -38.41689562052488, 'rap_hip_hop': -37.758540083188564, 'blues': -107.36754250526428, 'new_age': 59.46131930779666, 'techno': -47.255994156003, 'jazz': -85.37618781626225, 'folk': 10.264451827853918, 'country': 97.1334860920906, 'edm': -43.6650493144989, 'reggae': -60.85200160741806, 'heavy_metal': -64.26210714876652, 'pop': -13.51653665304184, 'drum_and_bass': 36.90575713664293, 'house': 12.285241901874542}, 'debussy_-_claire-de-lune.Classic.mp3': {'classical': 155.322301030159, 'rock': -26.620125092566013, 'rap_hip_hop': -11.805766120553017, 'bl

In [0]:
#saving the report
import json
with open(root+'ResNet18OPValReportTop3.json', 'w') as fp:
    json.dump(report, fp)

In [0]:
np.random.seed(42)
g = music_predict('/gdrive/My Drive/Project/Arture_validation/rock/', step=5, normalized=True)

In [0]:
#generator predictions
%%time

res = next(g)
print(list(res.keys())[0])
for item in sorted(list(res.values())[0].items(), key=lambda kv:list(kv)[1], reverse=True):
    print(item[0], item[1])