In [1]:
#import statements 
import glob
import random
from typing import List
from collections import defaultdict

import numpy as np
from numpy.random import choice

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from symusic import Score
from miditok import REMI, TokenizerConfig
from midi2audio import FluidSynth # Import library
from IPython.display import Audio, display

  from .autonotebook import tqdm as notebook_tqdm


Task 1 <br>
This assignment focuses on symbolic music modeling. The goal is to train a model that learns a distribution \( p(x) \) over symbolic music data (e.g., MIDI)  specifically within the EDM genre. In addition it is capable of sampling new sequences from this learned distribution unconditionally. We will be using the LSTM model for this task. <br>

Get list of files for training/test sets

In [2]:
import os
import glob
print("CWD:", os.getcwd())
print("Train directory exists?", os.path.exists("./train"))
print("Train files (glob):", glob.glob("./train/*.midi"))

train_files = glob.glob("./train/*.midi")
test_files = glob.glob("./test/*.midi")

CWD: c:\Users\sammy\Downloads\cse 153_task1\cse153_task1
Train directory exists? True
Train files (glob): ['./train\\MIDI-Unprocessed_01_R1_2006_01-09_ORIG_MID--AUDIO_01_R1_2006_01_Track01_wav.midi', './train\\MIDI-Unprocessed_01_R1_2006_01-09_ORIG_MID--AUDIO_01_R1_2006_02_Track02_wav.midi', './train\\MIDI-Unprocessed_01_R1_2006_01-09_ORIG_MID--AUDIO_01_R1_2006_03_Track03_wav.midi', './train\\MIDI-Unprocessed_01_R1_2006_01-09_ORIG_MID--AUDIO_01_R1_2006_04_Track04_wav.midi', './train\\MIDI-Unprocessed_01_R1_2006_01-09_ORIG_MID--AUDIO_01_R1_2006_05_Track05_wav.midi', './train\\MIDI-Unprocessed_01_R1_2008_01-04_ORIG_MID--AUDIO_01_R1_2008_wav--1.midi', './train\\MIDI-Unprocessed_01_R1_2008_01-04_ORIG_MID--AUDIO_01_R1_2008_wav--2.midi', './train\\MIDI-Unprocessed_01_R1_2008_01-04_ORIG_MID--AUDIO_01_R1_2008_wav--3.midi', './train\\MIDI-Unprocessed_01_R1_2009_01-04_ORIG_MID--AUDIO_01_R1_2009_01_R1_2009_01_WAV.midi', './train\\MIDI-Unprocessed_01_R1_2009_01-04_ORIG_MID--AUDIO_01_R1_2009_01_R1_

Construct a PyTorch Dataset

In [3]:
class MIDIDataset(Dataset):
    def __init__(self, file_paths: List[str], tokenizer):
        self.tokenizer = tokenizer
        self.file_paths = file_paths
        
    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        midi = Score(self.file_paths[idx])
        MAX_LEN = 512  # or even 512 to be safe
        tokens = self.tokenizer(midi)
        tokens = tokens[:MAX_LEN]
        # Return as dictionary to match training function expectations
        return {'input_ids': torch.tensor(tokens, dtype=torch.long)}

Configure the Tokenizer in order to be use to 

In [None]:
config = TokenizerConfig(
    num_velocities=32,           # Classical dynamics (was 1)
    use_chords=True,            # Essential for harmony (was False)
    use_programs=False,         # Piano only (was True)
    use_time_signatures=True,   # Classical changes time sigs
    use_rests=True, 
    use_tempos=True            # Important in classical
)
tokenizer = REMI(config)
tokenizer.train(vocab_size=1500, files_paths=train_files)
tokenizer.save("tokenizer.json")

Define PyTorch datasets and dataloaders

In [None]:
from torch.nn.utils.rnn import pad_sequence
from miditok.pytorch_data import DatasetMIDI, DataCollator
dataset = MIDIDataset(train_files, tokenizer)


train_dataset = DatasetMIDI(
    files_paths=train_files,
    tokenizer=tokenizer,
    max_seq_len=1024,
    bos_token_id=tokenizer["BOS_None"],
    eos_token_id=tokenizer["EOS_None"],
)
test_dataset = DatasetMIDI(
    files_paths=test_files,
    tokenizer=tokenizer,
    max_seq_len=1024,
    bos_token_id=tokenizer["BOS_None"],
    eos_token_id=tokenizer["EOS_None"],
)
print(f"# Train files loaded: {len(train_dataset)}")
print(f"# Test files loaded: {len(test_dataset)}")

collator = DataCollator(tokenizer.pad_token_id)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collator)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collator)


# Train files loaded: 938
# Test files loaded: 105


In [None]:
#for rhythm
def extract_position_ids(token_sequence, tokenizer):
    position_ids = []
    current_pos = 0  # default if no position is found yet

    for token in token_sequence:
        token_str = tokenizer.vocab[token]
        if token_str.startswith("Position_"):
            try:
                current_pos = int(token_str.split("_")[1])
            except:
                current_pos = 0
        position_ids.append(current_pos)

    return torch.tensor(position_ids, dtype=torch.long)



LSTM Model<br>

In [None]:
class MusicRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim=256, hidden_dim=512, num_layers=2,
                 dropout=0.3, bidirectional=False, max_position_embeddings=1024):
        super(MusicRNN, self).__init__()

        self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.position_embedding = nn.Embedding(max_position_embeddings, embedding_dim)

        self.rnn = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True,
            bidirectional=bidirectional
        )

        rnn_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.layer_norm = nn.LayerNorm(rnn_output_dim)
        self.fc = nn.Linear(rnn_output_dim, vocab_size)

    def forward(self, x, position_ids, hidden=None):
        """
        x: (batch_size, seq_len)
        position_ids: (batch_size, seq_len)
        """
        tok_emb = self.token_embedding(x)               # (B, T, D)
        pos_emb = self.position_embedding(position_ids) # (B, T, D)
        x = tok_emb + pos_emb

        out, hidden = self.rnn(x, hidden)
        out = self.layer_norm(out)
        out = self.fc(out)

        return out, hidden

Training<br>

In [None]:
def train(model, train_loader, val_loader, vocab_size, num_epochs=5, lr=1e-4, device='cpu'):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0

        for batch in train_loader:
            batch = batch['input_ids'].to(device)  # (batch_size, seq_len)

            input_ids = batch[:, :-1]
            target_ids = batch[:, 1:]

            # Create position_ids: simply range from 0 to seq_len-1
            position_ids = torch.arange(input_ids.size(1), device=device).unsqueeze(0).expand_as(input_ids)

            optimizer.zero_grad()
            outputs, _ = model(input_ids, position_ids)
            outputs = outputs.reshape(-1, vocab_size)
            targets = target_ids.reshape(-1)

            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # --- Validation ---
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                batch = batch['input_ids'].to(device)

                input_ids = batch[:, :-1]
                target_ids = batch[:, 1:]
                position_ids = torch.arange(input_ids.size(1), device=device).unsqueeze(0).expand_as(input_ids)

                outputs, _ = model(input_ids, position_ids)
                outputs = outputs.reshape(-1, vocab_size)
                targets = target_ids.reshape(-1)

                loss = criterion(outputs, targets)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)
        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")


# Example usage
if __name__ == "__main__":
    vocab_size = tokenizer.vocab_size
    embedding_dim = 256
    hidden_dim = 512
    num_layers = 2

    model = MusicRNN(
    vocab_size=tokenizer.vocab_size,
    embedding_dim=256,
    hidden_dim=512,
    num_layers=2,
    dropout=0.3,
    bidirectional=False,   # try True if you want to experiment with bidirectional context
    max_position_embeddings=1024
)

    train(model, train_loader, test_loader, vocab_size)

KeyboardInterrupt: 

Sampling<br>

In [None]:

def sample_top_k(model, start_token, max_length, temperature, k, tokenizer, device='cpu'):
    model = model.to(device)
    model.eval()

    generated = [start_token]
    position_ids = [0]  # start with position 0
    input_token = torch.tensor([[start_token]], device=device)
    input_pos = torch.tensor([[0]], device=device)  # position ID

    hidden = None
    current_position = 0

    for _ in range(max_length):
        output, hidden = model(input_token, input_pos, hidden)  # (1, 1, vocab_size)
        logits = output[:, -1, :] / temperature

        top_k_logits, top_k_indices = torch.topk(logits, k)
        top_k_probs = F.softmax(top_k_logits, dim=-1)

        next_token_idx = torch.multinomial(top_k_probs, 1).item()
        next_token = top_k_indices[0, next_token_idx].item()
        generated.append(next_token)

        # --- Update position ---
        token_str = tokenizer.vocab[next_token]
        if token_str.startswith("Position_"):
            try:
                current_position = int(token_str.split("_")[1])
            except:
                current_position = 0
        position_ids.append(current_position)

        # Stop if EOS or PAD token is hit
        if next_token == tokenizer["EOS_None"] or next_token == tokenizer.pad_token_id:
            break

        # Prepare next input
        input_token = torch.tensor([[next_token]], device=device)
        input_pos = torch.tensor([[current_position]], device=device)

    return generated


start_token = tokenizer["BOS_None"]
generated_sequence = sample_top_k(
    model=model,
    start_token=start_token,
    max_length=1024,
    temperature=0.7,
    k=5,
    tokenizer=tokenizer,
    device='cuda' if torch.cuda.is_available() else 'cpu'
)


KeyError: 329

Generation output of midi files<br>

In [None]:
def generate_midi(tokenizer, generated_sequence, output_filename="rnn.mid"):
    try:
        vocab_size = tokenizer.vocab_size
        valid_sequence = [token for token in generated_sequence if 0 <= token < vocab_size]
        
        print(f"Original sequence length: {len(generated_sequence)}")
        print(f"Valid sequence length: {len(valid_sequence)}")
        
        if len(valid_sequence) < 2:
            print("Sequence too short or no valid tokens found")
            return None
            
        # Fix: Wrap the sequence in another list
        output_score = tokenizer.decode([valid_sequence])
        
        if len(output_score.tracks) == 0:
            print("Generated MIDI has no tracks")
            return None
            
        output_score.dump_midi(output_filename)
        print(f"Successfully generated {output_filename}")
        return output_score
        
    except Exception as e:
        print(f"Error during MIDI generation: {e}")
        print(f"Sequence sample: {generated_sequence[:20]}...")
        return None


# Usage
start_token = tokenizer.special_tokens_ids[1] if len(tokenizer.special_tokens_ids) > 1 else 1
generated_sequence = sample_top_k(model, start_token, max_length=1024, temperature=0.7, k=5)

# Generate MIDI safely
output_score = generate_midi(tokenizer, generated_sequence, "rnn.mid")

import pretty_midi

def remove_clashing_notes(midi_path_in, midi_path_out):
    pm = pretty_midi.PrettyMIDI(midi_path_in)
    
    for instrument in pm.instruments:
        instrument.notes.sort(key=lambda n: n.start)
        filtered_notes = []
        
        for note in instrument.notes:
            if not filtered_notes:
                filtered_notes.append(note)
            else:
                last_note = filtered_notes[-1]
                # If overlapping and pitch close (within 1 semitone)
                if note.start < last_note.end and abs(note.pitch - last_note.pitch) < 2:
                    # Keep louder note
                    if note.velocity > last_note.velocity:
                        filtered_notes[-1] = note
                else:
                    filtered_notes.append(note)
        
        instrument.notes = filtered_notes
    
    pm.write(midi_path_out)
    print(f"Saved cleaned MIDI to {midi_path_out}")

remove_clashing_notes("rnn.mid", "cleanedrnn.mid")



Original sequence length: 1025
Valid sequence length: 1025
Error during MIDI generation: File not found file (error:13): rnn.mid
Sequence sample: [1, 410, 371, 205, 32, 102, 143, 206, 44, 101, 142, 209, 49, 103, 139, 214, 44, 101, 140, 218]...
