In [1]:
# Standard library imports
import collections
import glob
import os

# Third party imports
from IPython import display
import numpy as np
import pandas as pd
import pretty_midi
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [2]:
_SAMPLING_RATE = 16000
midi_dir = 'maestro-v3.0.0'
npy_dir = 'maestro-numpy'
filenames = glob.glob(f'{midi_dir}/**/*.mid*')

sample_file = filenames[1]
pm = pretty_midi.PrettyMIDI(sample_file)

def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
    waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
    waveform_short = waveform[:seconds*_SAMPLING_RATE]
    return display.Audio(waveform_short, rate=_SAMPLING_RATE)

display_audio(pm)



# MIDI to NumPy
Extract sequences from the MIDI files and store them as NumPy arrays in npy files.

In [3]:
seq_length = 25
vocab_size = 128

In [4]:
def midi_to_notes(midi_file):
    pm = pretty_midi.PrettyMIDI(midi_file)
    instrument = pm.instruments[0]
    notes = collections.defaultdict(list)

    sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
    prev_start = sorted_notes[0].start

    for note in sorted_notes:
        start = note.start
        end = note.end
        notes['pitch'].append(note.pitch)
        notes['start'].append(start)
        notes['end'].append(end)
        notes['step'].append(start - prev_start)
        notes['duration'].append(end - start)
        prev_start = start

    return pd.DataFrame({name: np.array(value) for name, value in notes.items()})

def get_sequences():
    all_notes = []
    for f in filenames:
        notes = midi_to_notes(f)
        all_notes.append(notes)
    all_notes = pd.concat(all_notes)
    key_order = ['pitch', 'step', 'duration']
    data = np.stack([all_notes[key] for key in key_order], axis=1)
    data = torch.tensor(data)

    dividend = torch.tensor([vocab_size, 1., 1.]).reshape((1, 3, 1))
    sequences = data.unfold(0, seq_length + 1, 1) / dividend

    inputs = sequences[:, :, :-1].numpy()
    labels = sequences[:, :, -1].numpy()
    return inputs, labels

In [None]:
if not os.path.exists(npy_dir):
    os.mkdir(npy_dir)

inputs, labels = get_sequences()
with open(os.path.join(npy_dir, 'inputs.npy'), 'wb') as f:
    np.save(f, inputs)
with open(os.path.join(npy_dir, 'labels.npy'), 'wb') as f:
    np.save(f, labels)

# PyTorch Dataset and Dataloader
Loads the extract sequences.

In [4]:
if torch.cuda.is_available():
    device = 'cuda:0'
else:
    device = 'cpu'
print(device)

cuda:0


In [5]:
class MusicDataset(Dataset):
    def __init__(self, X, y, split='train'):
        train_end = val_start = round(X.shape[0] * 0.8)
        val_end = test_start = round(y.shape[0] * 0.9)
        if split == 'train':
            X = X[:train_end]
            y = y[:train_end]
        elif split == 'val':
            X = X[val_start:val_end]
            y = y[val_start:val_end]
        elif split == 'test':
            X = X[test_start:]
            y = y[test_start:]
        else:
            raise NotImplementedError()
            
        self.X = torch.tensor(X, device=device)
        self.y = torch.tensor(y, device=device)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [6]:
# Load X, y out here instead of in the dataset class to save memory/time
with open(os.path.join(npy_dir, 'inputs.npy'), 'rb') as f:
    X = np.load(f)
with open(os.path.join(npy_dir, 'labels.npy'), 'rb') as f:
    y = np.load(f)

# Construct datasets and dataloaders
train_ds = MusicDataset(X, y, split='train')
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)
val_ds = MusicDataset(X, y, split='val')
val_dl = DataLoader(val_ds, batch_size=64, shuffle=True)

# Free up memory
del X; del y

Convert tensor back to playable audio file.

In [7]:
def notes_to_midi(notes, out_file=None, instr_name='Acoustic Grand Piano', velocity=100):
    pm = pretty_midi.PrettyMIDI()
    instr = pretty_midi.Instrument(
        program=pretty_midi.instrument_name_to_program(instr_name)
    )
    prev_start = 0
    for i, note in notes.iterrows():
        start = float(prev_start + note['step'])
        end = float(start + note['duration'])
        note = pretty_midi.Note(
            velocity=velocity,
            pitch=int(note['pitch']),
            start=start,
            end=end
        )
        instr.notes.append(note)
        prev_start = start

    pm.instruments.append(instr)
    if out_file:
        pm.write(out_file)
    return pm

def tensor_to_midi(tensor):
    prev_start = 0
    notes = []
    for i in range(tensor.shape[1]):
        pitch, step, duration = tensor[:, i]
        start = prev_start + step
        end = start + duration
        note = (pitch * vocab_size, step, duration)
        notes.append((*note, start, end))
        prev_start = start
    notes = pd.DataFrame(notes, columns=('pitch', 'step', 'duration', 'start', 'end'))
    pm = notes_to_midi(notes)
    return pm

In [8]:
display_audio(tensor_to_midi(train_ds[0][0]))



# Transformer model

In [26]:
model = nn.Transformer(d_model=3, nhead=3, batch_first=True)
model.to(device)
model = model.double()

Sanity check.

In [27]:
x, y = train_ds[0]
x = x.T.unsqueeze(0)
y = y.T.unsqueeze(0).unsqueeze(0)
print(x.shape, y.shape)
with torch.no_grad():
    print(model(x, y))

torch.Size([1, 25, 3]) torch.Size([1, 1, 3])
tensor([[[ 0.5321, -1.4008,  0.8687]]], device='cuda:0', dtype=torch.float64)


In [28]:
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

# Training loop
for epoch in range(10):
    running_loss = 0.0
    model.train()
    for i, (x, y) in enumerate(train_dl):        
        optimizer.zero_grad()
        x = x.moveaxis(-1, 1)
        y = y.unsqueeze(1)
        outputs = model(x, y)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0
            
    val_loss = 0.0
    model.eval()
    with torch.no_grad():
        for i, (x, y) in enumerate(val_dl):
            x = x.moveaxis(-1, 1)
            y = y.unsqueeze(1)
            outputs = model(x, y)
            loss = criterion(outputs, y)
            val_loss += loss.item()
    print(f'[{epoch + 1}] val loss: {val_loss:.3f}')

print('Finished Training')

[1,  2000] loss: 0.152
[1,  4000] loss: 0.075
[1,  6000] loss: 0.069
[1,  8000] loss: 0.076


KeyboardInterrupt: 

Save model.

In [None]:
torch.save(model.state_dict(), 'music_transformer.pt')

To load,
```
model = nn.Transformer(d_model=3, nhead=3, batch_first=True)
model.load_state_dict(torch.load('music_transformer.pt'))
```

# Generate music

In [29]:
model.eval()

Transformer(
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=3, out_features=3, bias=True)
        )
        (linear1): Linear(in_features=3, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=3, bias=True)
        (norm1): LayerNorm((3,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((3,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
      (1): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=3, out_features=3, bias=True)
        )
        (linear1): Linear(in_features=3, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
     

In [45]:
def predict_next_notes(notes, model):    
    x = notes.T.unsqueeze(0)
    y = torch.zeros((1, 1, 3), device=device, dtype=torch.double)
    with torch.no_grad():
        preds = model(x, y)
    pitch = preds[:, :, 0].squeeze()
    step = preds[:, :, 1].squeeze()
    duration = preds[:, :, 2].squeeze()
    return pitch.item(), step.item(), duration.item()

In [None]:
# Make room in memory
del train_ds, train_dl, val_ds, val_dl

In [38]:
# Load X, y out here instead of in the dataset class to save memory/time
with open(os.path.join(npy_dir, 'inputs.npy'), 'rb') as f:
    X = np.load(f)
with open(os.path.join(npy_dir, 'labels.npy'), 'rb') as f:
    y = np.load(f)

# Load testing dataset
test_ds = MusicDataset(X, y, split='test')
test_dl = DataLoader(test_ds, batch_size=64, shuffle=True)

del X; del y

In [53]:
generated_notes = []
num_preds = 120
prev_start = 0
input_notes, _ = test_ds[1]
input_notes = input_notes.to(device=device)

for _ in range(num_preds):
    pitch, step, duration = predict_next_notes(input_notes, model)
    start = prev_start + step
    end = start + duration
    input_note = (pitch * seq_length, step, duration)
    generated_notes.append((*input_note, start, end))
    input_notes = input_notes[:, 1:]
    input_note = torch.tensor(input_note) / torch.tensor([seq_length, 1, 1])
    input_note = input_note.to(device=device).unsqueeze(1)
    input_notes = torch.cat((input_notes, ), dim=1)
    prev_start = start

generated_notes = pd.DataFrame(generated_notes, columns=('pitch', 'step', 'duration', 'start', 'end'))
pm = notes_to_midi(generated_notes)
display_audio(pm)

