In [None]:
!pip install pretty_midi

In [None]:
# library for understanding music
from torch.utils.data import Dataset
import numpy as np
import pretty_midi


# define the Pitch Dataset object for Pytorch
class ImprovPitchDataset(Dataset):
    
    """
    --> DataLoader can do the batch computation for us

    Implement a custom Dataset:
    inherit Dataset
    implement __init__ , __getitem__ , and __len__
    """
    
    def __init__(self):
        #for listing down the file names
        import os
        
        #specify the path
        path='data/w_jazz_augmented/'
        #read all the filenames
        files=[i for i in os.listdir(path) if i.endswith(".mid")]
        #reading each midi file
        notes_array = np.array([np.array(readMIDI(path+i)[0]) for i in files])
        
        #converting 2D array into 1D array
        notes_ = [element for note_ in notes_array for element in note_]
        #No. of unique notes
        unique_notes = list(set(notes_))
        print("number of unique pithces: " + str(len(unique_notes)))
        
        from collections import Counter
        #computing frequency of each note
        freq = dict(Counter(notes_))
              
        # the threshold for frequent notes can change 
        threshold = 20 # this threshold is the number of classes that have to be predicted
        frequent_notes = [note_ for note_, count in freq.items() if count>=threshold]
        print("number of frequent pithces (more than 50 times): " + str(len(frequent_notes)))
        self.num_frequent_notes = len(frequent_notes)
        self.vocab = frequent_notes
        
        # prepare new musical files which contain only the top frequent notes
        new_music=[]
        for notes in notes_array:
            temp=[]
            for note_ in notes:
                if note_ in frequent_notes:
                    temp.append(note_)            
            new_music.append(temp)
        new_music = np.array(new_music) # same solos but with only most frequent notes

        self.x = new_music

    # support indexing such that dataset[i] can be used to get i-th sample
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # we can call len(dataset) to return the size
    def __len__(self):
        return self.n_samples
    
    def getData(self):
        return self.x

# define the Duration Dataset object for Pytorch
class ImprovDurationDataset(Dataset):
    
    """
    --> DataLoader can do the batch computation for us

    Implement a custom Dataset:
    inherit Dataset
    implement __init__ , __getitem__ , and __len__
    """
    
    def __init__(self):
        #for listing down the file names
        import os
        
        #specify the path
        path='data/w_jazz_augmented/'
        #read all the filenames
        files=[i for i in os.listdir(path) if i.endswith(".mid")]
        #reading each midi file
        notes_array = np.array([np.array(readMIDI(path+i)[1]) for i in files])
        
        #converting 2D array into 1D array
        notes_ = [element for note_ in notes_array for element in note_]
        #No. of unique notes
        unique_notes = list(set(notes_))
        print("number of unique durations: " + str(len(unique_notes)))
        
        from collections import Counter
        #computing frequency of each note
        freq = dict(Counter(notes_))
              
        # the threshold for frequent notes can change 
        threshold = 10 
        frequent_notes = [note_ for note_, count in freq.items() if count>=threshold]
        print("number of frequent durations (more than 10 times): " + str(len(frequent_notes)))
        self.num_frequent_notes = len(frequent_notes)
        self.vocab = frequent_notes
        
        # prepare new musical files which contain only the top frequent notes
        new_music=[]
        for notes in notes_array:
            temp=[]
            for note_ in notes:
                if note_ in frequent_notes:
                    temp.append(note_)            
            new_music.append(temp)
        new_music = np.array(new_music) # same solos but with only most frequent notes

        self.x = new_music

    # support indexing such that dataset[i] can be used to get i-th sample
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # we can call len(dataset) to return the size
    def __len__(self):
        return self.n_samples
    
    def getData(self):
        return self.x

def readMIDI(file):
    '''
    
    Parameters
    ----------
    file : path to a midi file (string)
        the midi file is read by the function.

    Returns
    -------
    notes : list of notes in midi number format
        this list contains all of the notes and the rests in the song.
        The duration of each note is in the correspondent index of the durations array
    durations : list of durations in musical terms (string)
        this list contains the durations of each note and rest in the song.
        The pitch of each note is in the correspondent index of the durations array
    dur_dict : python dictionary
        the keys of this dictionary are the time in seconds 
        of each possible note/rest duration for this song.
    song_properties : python dictionary
        this dictionary contains some basic properties of the midi file.

    '''

    pm = pretty_midi.PrettyMIDI(file)
    print("Loading Music File:",file)
    
    # Get and downbeat times
    beats = pm.get_beats()
    downbeats = pm.get_downbeats()
    
    song_properties = {}
    song_properties['tempo'] = pm.estimate_tempo()
    song_properties['beat duration'] = beats[1] - beats[0]
    song_properties['measure duration'] = downbeats[2] - downbeats[0]
    
    # sampling of the measure
    unit = song_properties['measure duration'] / 96.
    # possible note durations in seconds 
    # (it is possible to add representations - include 32nds, quintuplets...):
    # [full, half, quarter, 8th, 16th, dot half, dot quarter, dot 8th, dot 16th, half note triplet, quarter note triplet, 8th note triplet]
    possible_durations = [unit * 96, unit * 48, unit * 24, unit * 12, unit * 6, unit * 72, 
                          unit * 36, unit * 18, unit * 9, unit * 32, unit * 16, unit * 8]
    
    # Define durations dictionary
    dur_dict = {}
    dur_dict[possible_durations[0]] = 'full'
    dur_dict[possible_durations[1]] = 'half'
    dur_dict[possible_durations[2]] = 'quarter'
    dur_dict[possible_durations[3]] = '8th'
    dur_dict[possible_durations[4]] = '16th'
    dur_dict[possible_durations[5]] = 'dot half'
    dur_dict[possible_durations[6]] = 'dot quarter'
    dur_dict[possible_durations[7]] = 'dot 8th'
    dur_dict[possible_durations[8]] = 'dot 16th'
    dur_dict[possible_durations[9]] = 'half note triplet'
    dur_dict[possible_durations[10]] = 'quarter note triplet'
    dur_dict[possible_durations[11]] = '8th note triplet'
    
    # compile the lists of pitchs and durations
    notes = []
    durations = []
    for instrument in range(len(pm.instruments)):
        for note in range(len(pm.instruments[instrument].notes)-1):
            # append pitch
            notes.append(str(pm.instruments[instrument].notes[note].pitch))
            # calculate note duration in secods
            duration_sec = pm.instruments[instrument].notes[note].end - pm.instruments[instrument].notes[note].start
            # calculate distance from each duration
            distance = np.abs(np.array(possible_durations) - duration_sec)
            idx = distance.argmin()
            durations.append(dur_dict[possible_durations[idx]])
            
            # check for rests
            intra_note_time = pm.instruments[instrument].notes[note+1].start - pm.instruments[instrument].notes[note].end
            # if the interval between notes is greater than the smallest duration ('16th')
            # and smaller than the greatest duration ('full') then there is a rest
            if intra_note_time >= possible_durations[4]:
                # there is a rest!
                
                # handle the possibility of pauses longer than a full note
                while intra_note_time > possible_durations[0]:
                    notes.append('R')
                    # calculate distance from each duration
                    distance = np.abs(np.array(possible_durations) - intra_note_time)
                    idx = distance.argmin()
                    durations.append(dur_dict[possible_durations[idx]])
                    intra_note_time -= possible_durations[idx]
                
                notes.append('R')
                # calculate distance from each duration
                distance = np.abs(np.array(possible_durations) - intra_note_time)
                idx = distance.argmin()
                durations.append(dur_dict[possible_durations[idx]])
    return notes, durations, dur_dict, song_properties

def getKey(val, dict_to_ix): 
    for key, value in dict_to_ix.items(): 
        if val == value: 
            return key

def convertMIDI(notes, durations, tempo, dur_dict):
    '''
    
    Parameters
    ----------
    pitches : list of pitches
        a list of all the pitches and the rests in the song to be exported.
        Each pitch/rest should have its own duration 
        at the same index of the durations list.
    durations : list of durations
        a list of all the durations of the pitches/rests in the song to be exported.
        Each duration should have its own pitch/rest 
        at the same index of the pitches list.
    tempo : integer
        tempo of the song.

    Returns
    -------
    pm : pretty_midi object
        this pretty midi can be exported to midi and saved.

    '''
    # pitches and durations must be of equal lenght
    # still does not include rests
    
    # Construct a PrettyMIDI object.
    pm = pretty_midi.PrettyMIDI(initial_tempo=tempo)
    # Add a piano instrument
    inst = pretty_midi.Instrument(program=1, is_drum=False, name='piano')
    pm.instruments.append(inst)
    # Let's add a few notes to our instrument
    velocity = 100
    offset = 0
    # for each note
    for i in range(len(notes)):
        if notes[i] != '<pad>' and notes[i] != '<sos>' and notes[i] != '<eos>' and durations[i] != '<pad>' and durations[i] != '<sos>' and durations[i] != '<eos>':
            if notes[i] == 'R':
                duration = getKey(durations[i],dur_dict)
            else:
                pitch = int(notes[i])
                duration = getKey(durations[i],dur_dict)
                start = offset
                end = offset + duration
                inst.notes.append(pretty_midi.Note(velocity, pitch, start, end))
            offset += duration
    return pm


In [None]:
# library for understanding music
import pretty_midi
import torch
import torch.nn as nn
import numpy as np
import math
from dataset_funct import ImprovDurationDataset, ImprovPitchDataset, readMIDI, convertMIDI

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(1)

# TRANSFORMER MODEL
class TransformerModel(nn.Module):

    def __init__(self, ntoken, ninp, nhead, nhid, nlayers, dropout=0.5):
        super(TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        self.model_type = 'Transformer'
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        self.decoder = nn.Linear(ninp, ntoken)

        self.init_weights()

    def _generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src):
        if self.src_mask is None or self.src_mask.size(0) != src.size(0):
            device = src.device
            mask = self._generate_square_subsequent_mask(src.size(0)).to(device)
            self.src_mask = mask

        src = self.encoder(src) * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, self.src_mask)
        output = self.decoder(output)
        return output

# POSITIONAL ENCODING
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

import time
def train(model, vocab, train_data, criterion, optimizer):
    model.train() # Turn on the train mode
    total_loss = 0.
    start_time = time.time()
    ntokens = len(vocab)
    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        data, targets = get_batch(train_data, i)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output.view(-1, ntokens), targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

        total_loss += loss.item()
        log_interval = 200
        if batch % log_interval == 0 and batch > 0:
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | '
                  'lr {:02.2f} | ms/batch {:5.2f} | '
                  'loss {:5.2f} | ppl {:8.2f}'.format(
                    epoch, batch, len(train_data) // bptt, scheduler.get_lr()[0],
                    elapsed * 1000 / log_interval,
                    cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()

def evaluate(eval_model, data_source, vocab):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    ntokens = len(vocab)
    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, bptt):
            data, targets = get_batch(data_source, i)
            output = eval_model(data)
            output_flat = output.view(-1, ntokens)
            total_loss += len(data) * criterion(output_flat, targets).item()
    return total_loss / (len(data_source) - 1)


if __name__ == '__main__':

    # DATA LOADING
    
    # LOAD PITCH DATASET
    datasetPitch = ImprovPitchDataset()
    X_pitch = datasetPitch.getData()
    # set vocabulary for conversion
    vocabPitch = datasetPitch.vocab
    # Add padding tokens to vocab
    vocabPitch.append('<pad>')
    vocabPitch.append('<sos>')
    vocabPitch.append('<eos>')
    pitch_to_ix = {word: i for i, word in enumerate(vocabPitch)}
    #print(X_pitch[:3])
    
    # Divide pitch into train, validation and test
    train_pitch = X_pitch[:int(len(X_pitch)*0.7)]
    val_pitch = X_pitch[int(len(X_pitch)*0.7)+1:int(len(X_pitch)*0.7)+1+int(len(X_pitch)*0.1)]
    test_pitch = X_pitch[int(len(X_pitch)*0.7)+1+int(len(X_pitch)*0.1):]
    
    # LOAD DURATION DATASET
    datasetDuration = ImprovDurationDataset()
    X_duration = datasetDuration.getData()
    # set vocabulary for conversion
    vocabDuration = datasetDuration.vocab
    # Add padding tokens to vocab
    vocabDuration.append('<pad>')
    vocabDuration.append('<sos>')
    vocabDuration.append('<eos>')
    duration_to_ix = {word: i for i, word in enumerate(vocabDuration)}
    #print(X_duration[:3])
    
    # Divide duration into train, validation and test
    train_duration = X_duration[:int(len(X_duration)*0.7)]
    val_duration = X_duration[int(len(X_duration)*0.7)+1:int(len(X_duration)*0.7)+1+int(len(X_duration)*0.1)]
    test_duration = X_duration[int(len(X_duration)*0.7)+1+int(len(X_duration)*0.1):]
    
    
    
    #%% DATA PREPARATION

    # pad data to max_lenght of sequences, prepend <sos> and append <eos>
    def pad(data):
        # from: https://pytorch.org/text/_modules/torchtext/data/field.html
        data = list(data)
        # calculate max lenght
        max_len = max(len(x) for x in data)
        # Define padding tokens
        pad_token = '<pad>'
        init_token = '<sos>'
        eos_token = '<eos>'
        # pad each sequence in the data to max_lenght
        padded, lengths = [], []
        for x in data:
            padded.append(
                ([init_token])
                + list(x[:max_len])
                + ([eos_token])
                + [pad_token] * max(0, max_len - len(x)))
        lengths.append(len(padded[-1]) - max(0, max_len - len(x)))
        return padded
    
    # divide into batches of size bsz and converts notes into numbers
    def batchify(data, bsz, dict_to_ix):
        
        padded = pad(data)
        padded_num = [[dict_to_ix[x] for x in ex] for ex in padded]
        
        data = torch.tensor(padded_num, dtype=torch.long)
        data = data.contiguous()
        
        # Divide the dataset into bsz parts.
        nbatch = data.size(0) // bsz
        # Trim off any extra elements that wouldn't cleanly fit (remainders).
        data = data.narrow(0, 0, nbatch * bsz)
        # Evenly divide the data across the bsz batches.
        data = data.view(bsz, -1).t().contiguous()
        return data.to(device)
    
    batch_size = 20
    eval_batch_size = 10
    
    train_data_pitch = batchify(train_pitch, batch_size, pitch_to_ix)
    val_data_pitch = batchify(val_pitch, eval_batch_size, pitch_to_ix)
    test_data_pitch = batchify(test_pitch, eval_batch_size, pitch_to_ix)
    
    train_data_duration = batchify(train_duration, batch_size, duration_to_ix)
    val_data_duration = batchify(val_duration, eval_batch_size, duration_to_ix)
    test_data_duration = batchify(test_duration, eval_batch_size, duration_to_ix)
    
    # divide into target and input sequence of lenght bptt
    # --> obtain matrices of size bptt x batch_size
    bptt = 35 # lenght of a sequence of data (IMPROVEMENT HERE!!)
    def get_batch(source, i):
        seq_len = min(bptt, len(source) - 1 - i)
        data = source[i:i+seq_len] # input 
        target = source[i+1:i+1+seq_len].view(-1) # target (same as input but shifted by 1)
        return data, target
    
    
    #%% PITCH MODEL TRAINING
    
    # HYPERPARAMETERS
    ntokens_pitch = len(vocabPitch) # the size of vocabulary
    emsize = 200 # embedding dimension
    nhid = 200 # the dimension of the feedforward network model in nn.TransformerEncoder
    nlayers = 2 # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
    nhead = 2 # the number of heads in the multiheadattention models
    dropout = 0.2 # the dropout value
    modelPitch = TransformerModel(ntokens_pitch, emsize, nhead, nhid, nlayers, dropout).to(device)
    
    # LOSS FUNCTION
    criterion = nn.CrossEntropyLoss()
    lr = 5.0 # learning rate
    optimizer = torch.optim.SGD(modelPitch.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
    
    # TRAIN AND EVALUATE LOSS
    best_val_loss = float("inf")
    epochs = 10 # The number of epochs
    best_model = None
    
    # TRAINING LOOP
    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()
        train(modelPitch, vocabPitch, train_data_pitch, criterion, optimizer)
        val_loss = evaluate(modelPitch, val_data_pitch, vocabPitch)
        print('-' * 89)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
              'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                         val_loss, math.exp(val_loss)))
        print('-' * 89)
    
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_pitch = modelPitch
    
        scheduler.step()
    
    # TEST THE MODEL
    test_loss = evaluate(best_model_pitch, test_data_pitch, vocabPitch)
    print('=' * 89)
    print('| End of training | test loss {:5.2f} | test ppl {:8.2f}'.format(
        test_loss, math.exp(test_loss)))
    print('=' * 89)
    
    savePATHpitch = 'modelsPitch/modelPitch_'+ str(epochs) + 'epochs_padding.pt'
    state_dictPitch = best_model_pitch.state_dict()
    torch.save(state_dictPitch, savePATHpitch)
    

    #%% DURATION MODEL TRAINING
    
    # HYPERPARAMETERS
    ntokens_duration = len(vocabDuration) # the size of vocabulary
    emsize = 200 # embedding dimension
    nhid = 200 # the dimension of the feedforward network model in nn.TransformerEncoder
    nlayers = 2 # the number of nn.TransformerEncoderLayer in nn.TransformerEncoder
    nhead = 2 # the number of heads in the multiheadattention models
    dropout = 0.2 # the dropout value
    modelDuration = TransformerModel(ntokens_duration, emsize, nhead, nhid, nlayers, dropout).to(device)
    
    # LOSS FUNCTION
    criterion = nn.CrossEntropyLoss()
    lr = 5.0 # learning rate
    optimizer = torch.optim.SGD(modelDuration.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
    
    # TRAIN AND EVALUATE LOSS
    best_val_loss = float("inf")
    epochs = 10 # The number of epochs
    best_model = None
    
    # TRAINING LOOP
    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()
        train(modelDuration, vocabDuration, train_data_duration, criterion, optimizer)
        val_loss = evaluate(modelDuration, val_data_duration, vocabDuration)
        print('-' * 89)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
              'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                         val_loss, math.exp(val_loss)))
        print('-' * 89)
    
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_duration = modelDuration
    
        scheduler.step()
    
    # TEST THE MODEL
    test_loss = evaluate(best_model_duration, test_data_duration, vocabDuration)
    print('=' * 89)
    print('| End of training | test loss {:5.2f} | test ppl {:8.2f}'.format(
        test_loss, math.exp(test_loss)))
    print('=' * 89)
    
    savePATHduration = 'modelsDuration/modelDuration_'+ str(epochs) + 'epochs_padding.pt'
    state_dictDuration = best_model_duration.state_dict()
    torch.save(state_dictDuration, savePATHduration)
    

    #%% SAMPLES GENERATION

    def getNote(val, dict_to_ix): 
        for key, value in dict_to_ix.items(): 
             if val == value: 
                 return key

    def generate(model, melody4gen, dict_to_ix, next_notes=10):
        melody4gen = melody4gen.tolist()
        for i in range(0,next_notes):
            x_pred = torch.tensor([dict_to_ix[w] for w in melody4gen], dtype=torch.long)
            y_pred = model(x_pred)
            last_word_logits = y_pred[0][-1]
            p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
            word_index = np.random.choice(len(last_word_logits), p=p)
            melody4gen.append(getNote(word_index, dict_to_ix))
        return melody4gen
    
    # Remove characters who are not in the dictionary
    def onlyDict(pitchs, durations, vocabPitch, vocabDuration):
        # takes an array and a dictionary and gives the same array without
        # the elements who are not in the dictionary
        new_pitch = []
        new_duration = []
        for i in range(len(pitchs)):
            if pitchs[i] in vocabPitch and durations[i] in vocabDuration:
                new_pitch.append(pitchs[i]) 
                new_duration.append(durations[i]) 
        new_pitch = np.array(new_pitch) # same solos but with only most frequent notes
        new_duration = np.array(new_duration) # same solos but with only most frequent notes
        return new_pitch, new_duration
    
    
    #specify the path
    f = 'data/w_jazz/JohnColtrane_Mr.P.C._FINAL.mid'
    melody4gen_pitch, melody4gen_duration, dur_dict, song_properties = readMIDI(f)
    melody4gen_pitch, melody4gen_duration = onlyDict(melody4gen_pitch, melody4gen_duration, vocabPitch, vocabDuration)
    melody4gen_pitch = melody4gen_pitch[:80]
    melody4gen_duration = melody4gen_duration[:80]
    #print(melody4gen_pitch)
    #print(melody4gen_duration)
    
    notes2gen = 40 # number of new notes to generate
    new_melody_pitch = generate(modelPitch, melody4gen_pitch, pitch_to_ix, next_notes=notes2gen)
    new_melody_duration = generate(modelDuration, melody4gen_duration, duration_to_ix, notes2gen)

    converted = convertMIDI(new_melody_pitch, new_melody_duration, song_properties['tempo'], dur_dict)
    converted.write('output/music.mid')
    
    # For plotting
    import mir_eval.display
    import librosa.display
    import matplotlib.pyplot as plt
    def plot_piano_roll(pm, start_pitch, end_pitch, fs=100):
        # Use librosa's specshow function for displaying the piano roll
        librosa.display.specshow(pm.get_piano_roll(fs)[start_pitch:end_pitch],
                                 hop_length=1, sr=fs, x_axis='time', y_axis='cqt_note',
                                 fmin=pretty_midi.note_number_to_hz(start_pitch))
    
    plt.figure(figsize=(8, 4))
    plot_piano_roll(converted, 0, 127)
