<a href="https://colab.research.google.com/github/lucyxinyu/APS360_Artificial_Intelligence/blob/master/APS360_Group_Project_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#APS360 Final Project - Team 12

##Spectrogram Test
The following code plots a spectrogram of a song you want from YouTube.

####YouTube download code

In [None]:
#Install youtube-dl library to download stuff from YouTube
!pip install youtube-dl #only need to run this once

In [None]:
from __future__ import unicode_literals
from IPython.display import Audio
import youtube_dl

def download_music_youtube(link, codec='mp3'):
    if link != "":
        if "https://www.youtube.com/" not in link:
            return ""
    else:
        return ""
    
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
    }
    with youtube_dl.YoutubeDL(ydl_opts) as ydl:
        ydl.download([link])
        info_dict = ydl.extract_info(link, download=False)
        video_url = info_dict.get("url", None)
        video_id = info_dict.get("id", None)
        video_title = info_dict.get('title', None)
    filename = video_title + "-" + video_id + "." + codec
    return filename

In [None]:
link = "https://www.youtube.com/watch?v=9E6b3swbnWg" # <<------- ENTER THE LINK OF THE YOUTUBE VIDEO YOU WANT TO DOWNLOAD
music_file = download_music_youtube(link)
Audio(music_file)

####Spectrogram code

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import librosa
import librosa.display

def get_spec_log(filename, offset=0, duration=None, sr=None, n_fft=2048, hop_length=512):
    """
    Generates a numpy array that you can use to plot a spectrogram (with logarithmic scale)
    INPUTS:
        ::string:: filename     #name of the file
        ::float::  offset       #starting time of the song in seconds
        ::float::  duration     #how much of the song you want to load in seconds
        ::int::    sr           #sampling rate; select None for original sampling rate of song
        ::int::    n_fft        #size of the FFT, which will also be used as the window length
        ::int::    hop_length   #step or stride between windows. If the step is smaller than the window length, the windows will overlap
    OUTPUTS:
        ::np.ndarray:: y        #data which represents the song
        ::int::        sr       #sampling rate
        ::np.ndarray:: D_log    #use this to plot spectrogram
    """
    if os.path.isfile(filename) == False:
        return []
    y, sr = librosa.load(filename, offset=offset, duration=duration, sr=sr)
    D = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=n_fft))
    D_log = librosa.amplitude_to_db(D,ref=np.max)
    return y, sr, D_log

def plot_spec(D_log, sr, title="Power spectrogram"):
    librosa.display.specshow(D_log,y_axis='log', x_axis='time', sr=sr)
    plt.title(title)
    plt.colorbar(format='%+2.0f dB')
    plt.tight_layout()
    plt.show()
    return None

In [None]:
y, sr, D_log = get_spec_log(music_file, offset=0, duration=10, n_fft=10000)
plot_spec(D_log, sr, title=music_file)
librosa.display.waveplot(y, sr)

##Data Splitting and Visualization

The following code is used to import the MusicNet data, process the labels into interval trees, and for obtaining visualizations of the data and labels, through a spectrogram and transcription plot respectively. In order for this to work, the dataset must be uploaded to the current working directory, with the right structure (with the right subfolders train_data, train_labels, etc.).<br>

Note that the working directory must be structured like this:<br>
```
<current working directory>
├── musicnet
|   ├── test_data
|   ├── test_images
|   ├── test_labels
|   ├── train_data
|   ├── train_images
|   └── train_labels
└── <this notebook>
```

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import csv
import json
import pandas as pd
from intervaltree import IntervalTree
import librosa
import librosa.display
import cv2 as cv
import torch
from torch.utils.data import Dataset, DataLoader

In [None]:
##################################################################
#GLOBAL VARIABLES

root = os.getcwd() #working directory

#location of the data files
data_set_dir = os.path.join(root, "musicnet")
train_data_dir = os.path.join(data_set_dir, "train_data")
train_labels_dir = os.path.join(data_set_dir, "train_labels")
test_data_dir = os.path.join(data_set_dir, "test_data")
test_labels_dir = os.path.join(data_set_dir, "test_labels")

train_images_dir = os.path.join(data_set_dir, "train_images")
test_images_dir = os.path.join(data_set_dir, "test_images")
if os.path.isdir(train_images_dir) == False:
    os.mkdir(train_images_dir)
if os.path.isdir(test_images_dir) == False:
    os.mkdir(test_images_dir)

filenums = []

fs = 44100      # samples/second

##################################################################

In [None]:
#FUNCTIONS TO SET UP THE DATASET

def getfilenums(path, ret_var_type="int"):
    """
    Returns list of base filenames (without extension) in path, with each entry having type ret_var_type
    """
    a = []
    if os.path.isdir(path) == False:
        return a
    if ret_var_type == "int":
        for entry in os.scandir(path):
            if entry.path.endswith(".csv") and entry.is_file():
                a += [int(os.path.basename(entry)[0:-4])]
    elif ret_var_type == "str":
        for entry in os.scandir(path):
            if entry.path.endswith(".csv") and entry.is_file():
                a += [os.path.basename(entry)[0:-4]]
    return a

###################################################################################################################
def process_labels(path):
    """
    Takes in csv files in the directory, path, and returns a dict with entries that are intervaltree.intervaltree.IntervalTree
    """
    #This function was taken and modified from https://github.com/jthickstun/pytorch_musicnet/blob/master/musicnet.py
    trees = dict()
    for item in os.listdir(os.path.join(data_set_dir,path)):
        if not item.endswith('.csv'): continue
        uid = int(item[:-4])
        tree = IntervalTree()
        with open(os.path.join(data_set_dir,path,item), 'r') as f:
            reader = csv.DictReader(f, delimiter=',')
            for label in reader:
                start_time = int(label['start_time'])
                end_time = int(label['end_time'])
                instrument = int(label['instrument'])
                note = int(label['note'])
                start_beat = float(label['start_beat'])
                end_beat = float(label['end_beat'])
                note_value = label['note_value']
                tree[start_time:end_time] = (instrument,note,start_beat,end_beat,note_value)
        trees[uid] = tree
    return trees

###################################################################################################################

def get_music_filenames(path):
    d = dict()
    for entry in os.scandir(path):
        if entry.path.endswith(".wav") and entry.is_file():
            d[int(os.path.basename(entry)[0:-4])] = entry.path
    return d

def get_dataset(dataset_type='train'):
    if dataset_type == 'train':
        label_data = process_labels(train_labels_dir)
        music_data = get_music_filenames(train_data_dir)
    elif dataset_type == 'test':
        label_data = process_labels(test_labels_dir)
        music_data = get_music_filenames(test_data_dir)
    else:
        return None
    
    #merge the two dictionaries
    ds = [music_data, label_data]
    dataset = dict()
    for k in music_data.keys():
        dataset[k] = tuple(dataset[k] for dataset in ds)
    return dataset

In [None]:
#FUNCTION FOR FURTHER DEALING WITH THE LABELS (TRANSCRIPTION)

def plot_MIDI(intervaltree_obj, start_time=0, end_time=30):
    """
    Plots the transcription in MIDI format of an intervaltree.intervaltree.IntervalTree object
    """
    stride = 512                         # 512 samples between windows
    wps = fs/float(stride)               # ~86 windows/second; fs is global variable defined previously
    
    Yvec = np.zeros((int(wps*(end_time-start_time)),128))   # 128 distinct note labels
    
    #See http://www.ccarh.org/courses/253/handout/gminstruments/ for MIDI codes
    MIDI_codes = {
        1: 'Piano',
        41: 'Violin',
        42: 'Viola',
        43: 'Cello',
        72: 'Clarinet',
        71: 'Bassoon',
        61: 'French Horn',
        69: 'Oboe',
        74: 'Flute',
        7: 'Harpsichord',
        44: 'Contrabass'
    }
    #Note that there are only 11 instruments in the dataset
    MIDI_colour_codes = {
        1: 1,
        41: 2,
        42: 3,
        43: 4,
        72: 5,
        71: 6,
        61: 7,
        69: 8,
        74: 9,
        7: 10,
        44: 11
    }
    #To replace the ticks on the colorbar
    instrum_ticks = ['Piano','Violin','Viola','Cello','Clarinet','Bassoon','French Horn','Oboe','Flute','Harpsichord','Contrabass']

    for window in range(Yvec.shape[0]): #iterate over Yvec
        for obj in intervaltree_obj[window*stride + start_time*fs]: #iterate over Interval objects within window
            Yvec[window,obj.data[1]] = MIDI_colour_codes[obj.data[0]]

    fig = plt.figure(figsize=(20,5))
    colormap = plt.imshow(Yvec.T,aspect='auto',cmap='nipy_spectral')
    plt.gca().invert_yaxis()
    fig.axes[0].set_xlabel('window')
    fig.axes[0].set_ylabel('note (MIDI code)')

    cbar = plt.colorbar(colormap, orientation='vertical')
    cbar.set_ticks(range(1,12,1))
    cbar.set_ticklabels(instrum_ticks)
    plt.show()

############################################################################################################################
    
def get_label_3D_array(intervaltree_obj, width, start_time=0, end_time=30, plot=True):
    """
    Returns an array with shape 128 x width x 11 corresponding to the transcription of some song,
    between some start_time and end_time, represented by an intervaltree_obj
    """
    #stride = 512                         # 512 samples between windows
    #wps = fs/float(stride)               # ~86 windows/second; fs is global variable defined previously
    #Yvec = np.zeros((int(wps*(end_time-start_time)),128))   # 128 distinct note labels
    
    Yvec = np.zeros((128,width,11))   # 128 distinct note labels, 11 instruments
    wps = int(width / (end_time-start_time))
    stride = int(fs/wps)
    #See http://www.ccarh.org/courses/253/handout/gminstruments/ for MIDI codes
    MIDI_codes = {
        1: 'Piano',
        41: 'Violin',
        42: 'Viola',
        43: 'Cello',
        72: 'Clarinet',
        71: 'Bassoon',
        61: 'French Horn',
        69: 'Oboe',
        74: 'Flute',
        7: 'Harpsichord',
        44: 'Contrabass'
    }
    #Note that there are only 11 instruments in the dataset
    MIDI_colour_codes = {
        1: 1,
        41: 2,
        42: 3,
        43: 4,
        72: 5,
        71: 6,
        61: 7,
        69: 8,
        74: 9,
        7: 10,
        44: 11
    }
    MIDI_colour_codes_0 = dict()
    for key in MIDI_colour_codes:
        MIDI_colour_codes_0[key] = MIDI_colour_codes[key] - 1
    #MIDI_inverted_0 = {v: k for k, v in MIDI_colour_codes_0.items()}


    for window in range(Yvec.shape[1]): #iterate over Yvec
        for obj in intervaltree_obj[window*stride + start_time*fs]: #iterate over Interval objects within window
            Yvec[obj.data[1],window,MIDI_colour_codes_0[obj.data[0]]] = 1

    if plot == True: 
        plot_label_3D_array(Yvec)
    return Yvec

def plot_label_3D_array(Yvec):
    """
    Call this function to visualize the transcription given some Yvec
    """
    #To replace the ticks on the colorbar
    instrum_ticks = ['Piano','Violin','Viola','Cello','Clarinet','Bassoon','French Horn','Oboe','Flute','Harpsichord','Contrabass']
    
    Y_reg = np.zeros((128,Yvec.shape[1]))
    
    #convert the "one-hot encoding" array Yvec to regular array
    for i in range(0,Y_reg.shape[0],1):
        for j in range(0,Y_reg.shape[1],1):
            instr = -1
            for k in range(0,Yvec.shape[2],1):
                if Yvec[i,j,k] != 0:
                    instr = k
                    break
            if instr != -1:
                Y_reg[i,j] = instr + 1

    fig = plt.figure(figsize=(20,5))
    colormap = plt.imshow(Y_reg,aspect='auto',cmap='nipy_spectral')
    plt.gca().invert_yaxis()
    fig.axes[0].set_xlabel('window')
    fig.axes[0].set_ylabel('note (MIDI code)')

    cbar = plt.colorbar(colormap, orientation='vertical')
    cbar.set_ticks(range(1,12,1))
    cbar.set_ticklabels(instrum_ticks)
    plt.show()
    
############################################################################################################################
# Functions to save the labels of each song as a json file

class NumpyArrayEncoder(json.JSONEncoder):
    #see https://pynative.com/python-serialize-numpy-ndarray-into-json/
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)
    
def DictWithNumpyToJson(d, save_directory, base_filename):
    if type(save_directory) != str or type(base_filename) != str:
        print("ERROR: Invalid save_directory or filename")
        return False
    if os.path.isdir(save_directory) == False:
        print("ERROR: Invalid Directory")
        return False
    
    save_filename = os.path.join(save_directory, base_filename) + ".json"
    if os.path.exists(save_filename) == True:
        os.remove(save_filename)
    
    with open(save_filename, "w") as write_file:
        json.dump(d, write_file, cls=NumpyArrayEncoder)
    return True
    
def JsonToDictWithNumpy(filename):
    if os.path.isfile(filename) == False:
        print("ERROR: File could not be found")
        return None
    if filename.endswith(".json") == False:
        print("ERROR: File not json")
        return None
    
    with open(filename, "r") as read_file:
        opened_dict = json.load(read_file)
    
    return_dict = dict()
    for key in opened_dict:
        return_dict[key] = np.asarray(opened_dict[key])
    
    return return_dict

def save_json_label_for_song(song_filename, save_directory, intervaltree_obj, width, interval, sr=None):
    """
    Saves the label of one song, which is a json file consisting of numpy arrays produced by the get_label_3D_array function
    INPUTS:
        ::string:: song_filename                                    #full path of the song
        ::string:: save_directory                                   #directory to save the json file
        ::intervaltree.intervaltree.IntervalTree:: intervaltree_obj #interval tree object
        ::int:: width                                               #the width in pixels of the desired transcription label (corresponds to width of spectrogram)
        ::float:: interval                                          #the duration of each desired transcription label (corresponds to that of the spectrogram)
        ::int:: sr                                                  #sampling rate of the song
    """
    temp_y, temp_sr = librosa.load(song_filename, sr=sr)
    song_length = librosa.get_duration(y=temp_y, sr=temp_sr) #length of the song in seconds
    number_of_arrays = int(np.floor(song_length / interval)) #number of arrays to create
    
    store = dict()
    bn = os.path.basename(song_filename)[0:-4]
    current = ''
    
    for i in range(0, number_of_arrays, 1):
        current = bn + '_' + str(i*interval).zfill(4) + '_' + str((i+1)*interval).zfill(4) 
        label_3D_array = get_label_3D_array(intervaltree_obj, width, start_time=i*interval, end_time=(i+1)*interval, plot=False)
        store[current] = label_3D_array
    
    DictWithNumpyToJson(store, save_directory, bn)
    return True

############################################################################################################################

def get_label_dict_for_song(song_filename, intervaltree_obj, width, interval, sr=None):
    """
    Obtains the labels of one song, which is a dictionary consisting of numpy arrays produced by the get_label_3D_array function
    INPUTS:
        ::string:: song_filename                                    #full path of the song
        ::intervaltree.intervaltree.IntervalTree:: intervaltree_obj #interval tree object
        ::int:: width                                               #the width in pixels of the desired transcription label (corresponds to width of spectrogram)
        ::float:: interval                                          #the duration of each desired transcription label (corresponds to that of the spectrogram)
        ::int:: sr                                                  #sampling rate of the song
    """
    temp_y, temp_sr = librosa.load(song_filename, sr=sr)
    song_length = librosa.get_duration(y=temp_y, sr=temp_sr) #length of the song in seconds
    number_of_arrays = int(np.floor(song_length / interval)) #number of arrays to create
    
    store = dict() #this is the dictionary that will be returned
    bn = os.path.basename(song_filename)[0:-4]
    current = ''
    
    for i in range(0, number_of_arrays, 1):
        current = bn + '_' + str(i*interval).zfill(4) + '_' + str((i+1)*interval).zfill(4) 
        label_3D_array = get_label_3D_array(intervaltree_obj, width, start_time=i*interval, end_time=(i+1)*interval, plot=False)
        store[current] = label_3D_array
    
    return store


In [None]:
#FUNCTIONS FOR OBTAINING THE SPECTROGRAM

def get_spec_log(filename, offset=0, duration=None, sr=None, n_fft=2048, hop_length=512):
    """
    Generates a numpy array that you can use to plot a spectrogram (with logarithmic scale)
    INPUTS:
        ::string:: filename     #name of the file
        ::float::  offset       #starting time of the song in seconds
        ::float::  duration     #how much of the song you want to load in seconds
        ::int::    sr           #sampling rate; select None for original sampling rate of song
        ::int::    n_fft        #size of the FFT, which will also be used as the window length
        ::int::    hop_length   #step or stride between windows. If the step is smaller than the window length, the windows will overlap
    OUTPUTS:
        ::np.ndarray:: y        #data which represents the song
        ::int::        sr       #sampling rate
        ::np.ndarray:: D_log    #use this to plot spectrogram
    """
    if os.path.isfile(filename) == False:
        return []
    y, sr = librosa.load(filename, offset=offset, duration=duration, sr=sr)
    D = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=n_fft))
    D_log = librosa.amplitude_to_db(D,ref=np.max)
    return y, sr, D_log

def plot_spec(D_log, sr, title="Power spectrogram"):
    plt.figure(figsize=(18,5)) #include this line BEFORE the next one! https://gist.github.com/mailletf/3484932dd29d62b36092
    axes = librosa.display.specshow(D_log,y_axis='log', x_axis='time', sr=sr)
    plt.title(title)
    plt.colorbar(format='%+2.0f dB')
    plt.show()

def show_spectrogram(filename, offset=0, duration=None, sr=None, n_fft=2048, hop_length=512, title="Power spectrogram"):
    """
    Call this function if you only want to see the spectrogram for some filename
    """
    y, sr, D_log = get_spec_log(filename, offset=offset, duration=duration, n_fft=n_fft)
    plot_spec(D_log, sr, title=title)

############################################################################################################################
    
def save_spec_raw(D_log, sr, filename='temp.jpg'):
    """
    Saves a raw spectrogram, without axes, titles, or labels
    """
    fig = plt.figure(figsize=(18,6), dpi=100)
    #axes = librosa.display.specshow(D_log, y_axis='log',x_axis='time', sr=sr) #colour spectrogram
    axes = librosa.display.specshow(D_log, cmap='gray', y_axis='log',x_axis='time', sr=sr) #grayscale spectrogram
    plt.axis('off') #do not show axis labels
    #plt.show()
    plt.savefig(filename,bbox_inches='tight',pad_inches = 0) #remember to remove white space
    plt.close() #close the plot, save memory
    
def get_merged_spec(filename, save_directory, offset=0, duration=None, sr=None, n_fft=2048, hop_length=512, length=20):
    """
    Get the merged spectrogram of a song with filename <filename>, consisting of individual spectrograms with length <length>
    Final spectrogram is automatically saved in in <save_directory>
    offset, duration, sr, n_fft, hop_length are as defined in the get_spec_log function
    """
    if duration == None:
        return False
    #check if filename exists and is a wav file
    if os.path.exists(filename) and filename.endswith('.wav'):
        bn = os.path.basename(filename)[0:-4]
    else:
        return False
    #check if the save directory exists
    if os.path.isdir(save_directory) == False:
        return False
    
    img_filenames = [] #list that stores the filenames of the spectrogram images (each having a length defined by the variable, length)
    num_plots = int(np.floor(duration / length)) #number of spectrograms to make
    for i in range(0,num_plots,1):
        current_filename = os.path.join(save_directory, bn + '_temp_' + str(offset + length*i).zfill(4) + "_" + str(offset + length*(i+1)).zfill(4) + '.jpg')
        img_filenames += [current_filename]
        y, sr, D_log = get_spec_log(filename, offset=offset+i*length, duration=length, n_fft=5000)
        save_spec_raw(D_log, sr, filename=current_filename) #save the spectrogram
    
    img_array = [] #list that stores the images
    #load images
    for i in range(0,len(img_filenames),1):
        #img_array += [cv.imread(img_filenames[i],-1)]
        img_array += [cv.imread(img_filenames[i],cv.IMREAD_GRAYSCALE)]
    
    #merge images
    #final_img = np.zeros((img_array[0].shape[0],img_array[0].shape[1] * len(img_filenames),3), np.uint8)
    final_img = np.zeros((img_array[0].shape[0],img_array[0].shape[1] * len(img_filenames)), np.uint8)
    for i in range(0,len(img_filenames),1):
        #final_img[0:img_array[i].shape[0], (i*img_array[i].shape[1]):((i+1)*img_array[i].shape[1]), :] = img_array[i]
        final_img[0:img_array[i].shape[0], (i*img_array[i].shape[1]):((i+1)*img_array[i].shape[1])] = img_array[i]
    
    #plt.imshow(final_img[:,:,::-1]) #only need to change to RGB for plotting
    cv.imwrite(os.path.join(save_directory, bn + "_" + str(offset).zfill(4) + "_" + str(offset + length*num_plots).zfill(4) + ".jpg"),final_img)
    return True

def get_merged_specs_of_song(filename, save_directory, sr=None, n_fft=2048, hop_length=512, length=20, interval=60):
    """
    Get the merged spectrograms of an entire song; each spectrogram having a length of <interval>
    Each merged spectrogram is made up of individual spectrograms each having a length of <length>
    """
    if os.path.exists(filename) == False:
        return False
    if filename.endswith('.wav') == False:
        return False
    if os.path.isdir(save_directory) == False:
        return False
    
    temp_y, temp_sr = librosa.load(filename, sr=sr)
    song_length = librosa.get_duration(y=temp_y, sr=temp_sr) #length of the song in seconds
    number_of_specs = int(np.floor(song_length / interval)) #number of merged spectrograms to create
    
    for i in range(0,number_of_specs,1):
        get_merged_spec(filename, save_directory, offset=i*interval, duration=interval, sr=sr, n_fft=n_fft, hop_length=hop_length, length=length)
    
    return True

def process_data_to_spec(filename, save_directory, sr=None, n_fft=2048, hop_length=512, length=20, interval=60, cleanup=True):
    """
    Get the merged spectrograms of an entire song with filename <filename>; each spectrogram having a length of <interval>
    Each merged spectrogram is made up of individual spectrograms each having a length of <length>
    The spectrograms are saved in the directory <save_directory>, and temporary files are deleted if cleanup==True
    """
    if os.path.isdir(save_directory) == False:
        print("ERROR: Invalid Directory")
        return False
    get_merged_specs_of_song(filename, save_directory, sr=sr, n_fft=n_fft, hop_length=hop_length, length=length, interval=interval)
    if cleanup == True:
        for entry in os.scandir(save_directory):
            if entry.path.endswith(".jpg") and entry.is_file():
                if '_temp_' in os.path.basename(entry)[0:-4]:
                    os.remove(entry.path) #remove the temp files
    return True

In [None]:
class MusicNetDataset(torch.utils.data.Dataset):
    #Note that this class inherits from torch.utils.data.Dataset
    def __init__(self, dataset_type='train', create_spec=True, create_labels=False, no_saved_labels=True):
        """
        INPUTS:
            ::string:: dataset_type           #the dataset type, either 'train', 'test', or 'val' (not yet supported)
            ::boolean:: create_spec           #if True, then merged spectrographs will be created and saved; old spectrographs will be overridden
            ::boolean:: create_labels         #if True and if no_saved_labels==False, then will create json files of the labels; old json files will be overridden
            ::boolean:: no_saved_labels       #if True, then will not use or create json files. Labels are loaded and processed from the csv files directly and are not saved
        """
        self.filenums = [] #list of base filenames (without extension); default type of each element is int
        self.data_dir_name = None
        self.labels_dir_name = None
        self.images_dir_name = None
        
        #Dictionary that stores the initial dataset; each entry contains the full song path and an intervaltree.intervaltree.IntervalTree object for that song
        self.unprocessed_dataset = dict()
        
        #list of processed data, every entry is a dictionary
        self.processed_data_list = []
        #Pandas DataFrame that stores the processed dataset
        self.processed_dataset = None
        
        self.interval = 10 #interval of each spectrogram, in seconds
        self.spectrogram_width = 0 #width of each spectrogram, in pixels
        
        #check if we want train, test, or val
        if dataset_type == 'train':
            self.filenums = getfilenums(train_labels_dir)
            self.data_dir_name = train_data_dir
            self.labels_dir_name = train_labels_dir
            self.images_dir_name = train_images_dir
        elif dataset_type == 'test':
            self.filenums = getfilenums(test_labels_dir)
            self.data_dir_name = test_data_dir
            self.labels_dir_name = test_labels_dir
            self.images_dir_name = test_images_dir
        else:
            raise NameError("Invalid dataset_type")
        
        #Check if self.images_dir_name is empty
        if create_spec == False and len(os.listdir(self.images_dir_name)) == 0:
            raise Exception("ERROR: There are no files in the images directory. Set create_spec=True to create spectrograms.")
        
        self.unprocessed_dataset = get_dataset(dataset_type=dataset_type) #load the dictionary
        
        ########################################################################################################################
        #create and save new merged spectrograms, overriding the old ones in self.images_dir_name
        if create_spec == True:
            self.music_filenames = get_music_filenames(self.data_dir_name)
            for i in range(0,len(self.filenums),1):
                process_data_to_spec(self.unprocessed_dataset[self.filenums[i]][0], self.images_dir_name, n_fft=5000, length=10, interval=self.interval, cleanup=True)
        
        self.spectrogram_width = self.__get_image_width(self.images_dir_name) #get the width of each spectrogram
        
        if no_saved_labels == False:
            if create_labels == True:
            #create and save new json files that represent the labels, overriding the old ones in self.labels_dir_name
                for i in range(0,len(self.filenums),1):
                    #save_json_label_for_song(song_filename, save_directory, intervaltree_obj, width, interval, sr=None)
                    save_json_label_for_song(self.unprocessed_dataset[self.filenums[i]][0], self.labels_dir_name, self.unprocessed_dataset[self.filenums[i]][1], width=self.spectrogram_width, interval=self.interval)

            list_of_label_paths = []
            #get a list of the filenames for the labels (json files)
            for entry in os.scandir(self.labels_dir_name):
                if entry.path.endswith(".json") and entry.is_file():
                    list_of_label_paths += [entry.path]

            for i in range(0,len(self.filenums),1):
                if self.filenums[i] == int(os.path.basename(list_of_label_paths[i])[0:-5]):
                    song_labels_dict = JsonToDictWithNumpy(list_of_label_paths[i])
                    for key in song_labels_dict: #note that key is a string
                        start_and_end_time = key.split('_')
                        current_sample = dict()
                        current_sample['filenum'] = self.filenums[i]
                        current_sample['start_time'] = float(start_and_end_time[1])
                        current_sample['end_time'] = float(start_and_end_time[2])
                        current_sample['spectrogram'] = cv.imread(os.path.join(self.images_dir_name,key)+'.jpg', cv.IMREAD_GRAYSCALE).astype(np.float32)
                        current_sample['transcription'] = song_labels_dict[key]
                        self.processed_data_list.append(current_sample)
                else:
                    raise Exception("ERROR: Missing file or label number " + str(self.filenums[i]))
        else:
            for i in range(0,len(self.filenums),1):
                song_labels_dict = get_label_dict_for_song(self.unprocessed_dataset[self.filenums[i]][0], self.unprocessed_dataset[self.filenums[i]][1], width=self.spectrogram_width, interval=self.interval)
                for key in song_labels_dict: #note that key is a string
                    start_and_end_time = key.split('_')
                    current_sample = dict()
                    current_sample['filenum'] = self.filenums[i]
                    current_sample['start_time'] = float(start_and_end_time[1])
                    current_sample['end_time'] = float(start_and_end_time[2])
                    current_sample['spectrogram'] = cv.imread(os.path.join(self.images_dir_name,key)+'.jpg', cv.IMREAD_GRAYSCALE).astype(np.float32)
                    current_sample['transcription'] = song_labels_dict[key]
                    self.processed_data_list.append(current_sample)
        
        self.processed_dataset = pd.DataFrame(data=self.processed_data_list, columns=['filenum','start_time','end_time','spectrogram','transcription'])

    ######################################################################################################################## 
    # Built-in functions in torch.utils.data.Dataset that must be overridden accordingly
    
    def __len__(self):
        return self.processed_dataset.shape[0]
    
    def __getitem__(self, idx):
        return self.processed_dataset.iloc[idx,:].to_dict()
        
    ########################################################################################################################   
    
    
    def __get_image_width(self, directory):
        """
        Private method. Gets the width of the first image in directory.
        """
        image_filename = ''
        list_of_files = []
        for entry in os.scandir(self.images_dir_name):
            if entry.path.endswith(".jpg") and entry.is_file():
                list_of_files += [entry.path]

        img = cv.imread(list_of_files[0], -1)
        return img.shape[0]

    def visualize_sample(self, idx):
        """
        Call this function to visualize the idx-th sample in this dataset
        """
        sample = self.__getitem__(idx)
        
        display_dict = dict()
        display_dict['filenum'] = [sample['filenum']]
        display_dict['start_time'] = [sample['start_time']]
        display_dict['end_time'] = [sample['end_time']]
        display_df = pd.DataFrame.from_dict(display_dict)
        print("\nInformation about sample number " + str(idx) + ":\n", display_df)

        print("\nInput and Label:")
        fig = plt.figure(figsize=(18,6), dpi=57)
        plt.imshow(sample['spectrogram'], cmap='gray')
        plot_label_3D_array(sample['transcription'])
        

Create an instance of the `MusicNetData` dataloader class to get the dataset. Note that the `MusicNetData` class inherits from `torch.utils.data.Dataset`. If you are running the code for the first time, set `create_spec=True` when initializing an instance of `MusicNetData`. This will create and save spectrograms of each song.<br><br>
Suppose the variable `dataset` is an instance of `MusicNetData`, that is, `dataset = MusicNetData(...)`. Every sample in `dataset` is a dictionary representing an interval of a song. This dictionary has keys: filenum, start_time, end_time, spectrogram, and transcription. The `i`th sample can be accessed via `dataset[i]`. The length of the dataset can be found by calling `len(dataset)`. The `i`th sample can be visualized by calling `dataset.visualize_sample(i)`.

In [None]:
train_set = MusicNetDataset(dataset_type='train', create_spec=False) #loading the data

train_loader = torch.utils.data.DataLoader(train_set, batch_size=256, shuffle=True)

print(train_set[100]) #see what the 0th sample looks like
print(len(train_set)) #length of train_set
#make sure inputs are 0-1

In [None]:
print(train_set.processed_dataset.shape)


In [None]:
test_set = MusicNetDataset(dataset_type='test', create_spec=False)

test_loader = torch.utils.data.DataLoader(test_set, batch_size=256, shuffle=True)

In [None]:
def get_instrument_onehot(transcription):
    return (transcription.sum(dim=(1,2)) != 0).long()

In [None]:
print(train_set[0]['spectrogram'].dtype)

for sample in train_loader:
    print(get_instrument_onehot(sample['transcription']).shape)
    break

In [None]:
train_set.visualize_sample(100)

Baseline Model

In [None]:
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.transforms as transforms

In [None]:
#spectrogram resolution: 453x1395


class Baseline_Net(nn.Module):
    def __init__(self):
        super(Baseline_Net, self).__init__()
        self.name = "baseline"
        self.layer1 = nn.Linear(453*1395, 100)
        self.layer2 = nn.Linear(100,1)
    def forward(self, img):
        flattened = img.view(-1, 453*1395)
        activation1 = self.layer1(flattened)
        activation1 = F.relu(activation1)
        activation2 = self.layer2(activation1)
        return activation2.squeeze(1)

In [None]:

class Primary_Net(nn.Module):
    def __init__(self):
        super(Primary_Net, self).__init__()
        self.name = "primary"
        self.conv1 = nn.Conv2d(1, 5, 5)
        self.batch1 = nn.BatchNorm2d(5)   
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(5, 10, 5)
        self.batch2 = nn.BatchNorm2d(10) 
        self.conv3 = nn.Conv2d(10, 5, 5)
        self.batch3 = nn.BatchNorm2d(5) 
        self.fc1 = nn.Linear(53 * 170 * 5, 32) #height x width x number_of_channels
        self.fc2 = nn.Linear(32, 1)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.pool(F.leaky_relu(self.conv1(x)))
        x = self.pool(F.leaky_relu(self.conv2(x)))
        x = self.pool(F.leaky_relu(self.conv3(x)))
        x = x.view(-1, 53 * 170 * 5)
        x = F.leaky_relu(self.fc1(x))
        x = self.fc2(x)
        x = x.squeeze(1) # Flatten to [batch_size]
        return x
    
#add more layers
#change conv2d to 1d

In [None]:
baseline_net = Baseline_Net()

for param in baseline_net.parameters():
    print ("Baseline_Net", param.shape)

In [None]:
def plot_training_curve(path):
    """ Plots the training curve for a model run, given the csv files
    containing the train/validation error/loss.

    Args:
        path: The base path of the csv files produced during training
    """
    import matplotlib.pyplot as plt
    train_err = np.loadtxt("{}_train_err.csv".format(path))
    val_err = np.loadtxt("{}_val_err.csv".format(path))
    train_loss = np.loadtxt("{}_train_loss.csv".format(path))
    val_loss = np.loadtxt("{}_val_loss.csv".format(path))
    plt.title("Train vs Validation Error")
    n = len(train_err) # number of epochs
    plt.plot(range(1,n+1), train_err, label="Train")
    plt.plot(range(1,n+1), val_err, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Error")
    plt.legend(loc='best')
    plt.show()
    plt.title("Train vs Validation Loss")
    plt.plot(range(1,n+1), train_loss, label="Train")
    plt.plot(range(1,n+1), val_loss, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc='best')
    plt.show()

In [None]:
def get_model_name(instrument, name, batch_size, learning_rate, epoch):
    """ Generate a name for the model consisting of all the hyperparameter values

    Args:
        config: Configuration object containing the hyperparameters
    Returns:
        path: A string with the hyperparameter name and value concatenated
    """
    path = "instrument_{0}_model{1}_bs{2}_lr{3}_epoch{4}".format(instrument, name, batch_size,
                                                   learning_rate,
                                                   epoch)
    return path

def evaluate(instrument, net, loader, criterion):
    """ Evaluate the network on the validation set.

     Args:
         net: PyTorch neural network object
         loader: PyTorch data loader for the validation set
         criterion: The loss function
     Returns:
         err: A scalar for the avg classification error over the validation set
         loss: A scalar for the average loss function over the validation set
     """
    total_loss = 0.0
    total_err = 0.0
    total_epoch = 0
    for i,data in enumerate(loader,0):
        #inputs = torch.from_numpy(data['spectrogram']).unsqueeze(0).unsqueeze(0)
        inputs = data['spectrogram']
        labels = get_instrument_onehot(data['transcription'])[:,instrument]
        outputs = net(inputs)
        loss = criterion(outputs, labels.float())
        corr = (outputs > 0.0).squeeze().long() != labels
        total_err += int(corr.sum())
        total_loss += loss.item()
        total_epoch += len(labels)
    err = float(total_err) / total_epoch
    loss = float(total_loss) / (i + 1)
    return err, loss


def train_single_instrument(instrument, net, batch_size = 16, learning_rate=0.01, num_epochs=30):
    ########################################################################
    
    ########################################################################
    # Fixed PyTorch random seed for reproducible result
    torch.manual_seed(1000)
    ########################################################################
    # Obtain the PyTorch data loader objects to load batches of the datasets
    #train_loader, val_loader, test_loader, classes = get_data_loader(
    #        target_classes, batch_size)
    ########################################################################
    # Define the Loss function and optimizer
    # The loss function will be Binary Cross Entropy (BCE). In this case we
    # will use the BCEWithLogitsLoss which takes unnormalized output from
    # the neural network and scalar label.
    # Optimizer will be SGD with Momentum.
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
    #optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)
    #change momentum to 0.5-1.5
    #change optimizer: Adam
    ########################################################################
    # Set up some numpy arrays to store the training/test loss/erruracy
    train_err = np.zeros(num_epochs)
    train_loss = np.zeros(num_epochs)
    val_err = np.zeros(num_epochs)
    val_loss = np.zeros(num_epochs)
    ########################################################################
    # Train the network
    # Loop over the data iterator and sample a new batch of training data
    # Get the output from the network, and optimize our loss function.
    start_time = time.time()
    for epoch in range(num_epochs):  # loop over the dataset multiple times
        total_train_loss = 0.0
        total_train_err = 0.0
        total_epoch = 0
        for i,data in enumerate(train_loader,0):
            # Get the inputs
            inputs = data['spectrogram']
            labels = get_instrument_onehot(data['transcription'])[:,instrument]
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass, backward pass, and optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels.float())
            loss.backward()
            #print(loss)
            #print(labels)
            #print(outputs)
            #print(net.layer1.weight.grad.norm())
            optimizer.step()
        
            # Calculate the statistics
            corr = (outputs > 0.0).squeeze().long() != labels
            total_train_err += int(corr.sum())
            total_train_loss += loss.item()
            total_epoch += len(labels)
        train_err[epoch] = float(total_train_err) / total_epoch
        train_loss[epoch] = float(total_train_loss) / (i+1)
        val_err[epoch], val_loss[epoch] = evaluate(instrument, net, test_loader, criterion)
        print(("Epoch {}: Train err: {}, Train loss: {} |"+
               "Validation err: {}, Validation loss: {}").format(
                   epoch + 1,
                   train_err[epoch],
                   train_loss[epoch],
                   val_err[epoch],
                   val_loss[epoch]))
        # Save the current model (checkpoint) to a file
        model_path = get_model_name(instrument, net.name, batch_size, learning_rate, epoch)
        torch.save(net.state_dict(), model_path)
    print('Finished Training')
    end_time = time.time()
    elapsed_time = end_time - start_time
    print("Total time elapsed: {:.2f} seconds".format(elapsed_time))
    # Write the train/test loss/err into CSV file for plotting later
    epochs = np.arange(1, num_epochs + 1)
    np.savetxt("{}_train_err.csv".format(model_path), train_err)
    np.savetxt("{}_train_loss.csv".format(model_path), train_loss)
    np.savetxt("{}_val_err.csv".format(model_path), val_err)
    np.savetxt("{}_val_loss.csv".format(model_path), val_loss)
    plot_training_curve(model_path)
    return model_path

def train_nets(nets, batch_size=16, learning_rate=0.01, num_epochs=30):
    array = []
    for i, net in enumerate(nets):
        print("training instrument: ", i)
        train_single_instrument(i, net, batch_size, learning_rate, num_epochs)

In [None]:
testnet1 = Baseline_Net()
nets = [Baseline_Net() for i in range(11)]
train_nets(nets, num_epochs=20)
#change learning rate

In [None]:
model_path = get_model_name(10,"baseline", batch_size=32,learning_rate=0.01, epoch=9)
plot_training_curve(model_path)

In [None]:
testnet2 = Primary_Net()
nets = [Primary_Net() for i in range(11)]
train_nets(nets, batch_size = 128, num_epochs=10)

In [None]:
def get_instrument_model(num):
    return "instrument_{}_modelprimary_lr0.01_epoch9".format(num)

nets = [Primary_Net() for i in range(11)]

for i, net in enumerate(nets):
    net.load_state_dict(torch.load(get_instrument_model(i)))
    net.eval()

In [None]:
image = Image.open("/Users/katherineliang/PycharmProjects/aps360/musicnet/holdout_images/1742_0770_0780.jpg")
display(image)

spectrogram = ToTensor()(image)

In [None]:
names = ['piano', 'violin', 'viola', 'cello', 'clarinet', 'bassoon', 'horn', 'oboe', 'flute', 'harpsichord', 'string bass']

In [None]:
for i, net in enumerate(nets):
    result = net(spectrogram).item()
    print(names[i], result)

In [None]:
nets = [Primary_Net() for i in range(128)]

In [None]:
notes = []

for i, net in enumerate(nets):
    result = net(spectrogram).item()
    if result>0:
        notes.append(i)
print(notes)

In [None]:
image = Image.open("/Users/katherineliang/PycharmProjects/aps360/musicnet/holdout_images/1742_0770_0780_56.jpg")
display(image)

spectrogram = ToTensor()(image)

In [None]:
notes = []

for i, net in enumerate(nets):
    result = net(spectrogram).item()
    if result>0:
        notes.append(i)
print(notes)