### Data extraction and feature extraction 

In [1]:
import pretty_midi
import pandas as pd
from pathlib import Path
from collections import defaultdict

In [2]:
def extract_note_features_with_instrument(midi_file):
    try:
        pm = pretty_midi.PrettyMIDI(midi_file)
        notes = defaultdict(list)
        
        if not pm.instruments:
            raise ValueError("No instruments found in MIDI file.")

        for instrument in pm.instruments:
            program_num = instrument.program if not instrument.is_drum else 127  # Assign drum as 128，now change to 127
            #program_num = instrument.program  # <- Extract instrument ID
            for note in instrument.notes:
                notes["pitch"].append(note.pitch)
                notes["velocity"].append(note.velocity)
                notes["note_name"].append(pretty_midi.note_number_to_name(note.pitch))  # e.g., 'C#4'
                notes["octave"].append(note.pitch // 12 - 1)  # Convert MIDI pitch to octave number
                notes["start"].append(note.start)
                notes["end"].append(note.end)
                notes["duration"].append(note.end - note.start)
                notes["instrument"].append(program_num)  # <- Add this line

        return pd.DataFrame(notes)
    
    except Exception as e:
        print(f"Failed to parse {midi_file} due to error: {e}")
        return pd.DataFrame()

In [3]:
def extract_advanced_note_features(midi_file):
    try:
        pm = pretty_midi.PrettyMIDI(midi_file)
        instrument = pm.instruments[0]  # Assuming single instrument for now

        notes = defaultdict(list)
        for note in instrument.notes:
            notes["pitch"].append(note.pitch)
            notes["velocity"].append(note.velocity)  # Extract actual velocity (1-127)
            notes["note_name"].append(pretty_midi.note_number_to_name(note.pitch))  # e.g., 'C#4'
            notes["octave"].append(note.pitch // 12 - 1)  # Convert MIDI pitch to octave number
            notes["start"].append(note.start)
            notes["end"].append(note.end)
            notes["duration"].append(note.end - note.start)            

        return pd.DataFrame(notes)

    except Exception as e:
        print(f"Failed to process {midi_file}: {e}")
        return pd.DataFrame()  # Return empty if failed
    
    
def extract_all_midi_files(folder, max_files=500):
    paths = list(Path(folder).rglob("*.[mM][iI][dD]*"))[:max_files]
    print(f"Found {len(paths)} MIDI files")

    all_dfs = []
    for path in paths:
        df = extract_advanced_note_features(str(path))
        if not df.empty:
            df["filename"] = path.name
            all_dfs.append(df)

    if all_dfs:
        return pd.concat(all_dfs, ignore_index=True)
    else:
        return pd.DataFrame()

In [4]:
# read data and extract pitch, velocity, note_name, octave, start_time, end, and duration 
data_dir = "/Users/yang/Desktop/Yale Spring 2025/CPSC 552 Deep learning theory and applications /DeepL project - music generation /Data set/Maestro/maestro-v3.0.0"
all_notes_df = extract_all_midi_files(data_dir)
all_notes_df.head()

#all_notes_df.to_csv("/Users/yang/Documents/processed_notes_lakh.csv", index=False)



Found 500 MIDI files


Unnamed: 0,pitch,velocity,note_name,octave,start,end,duration,filename
0,77,56,F5,5,0.994792,1.08724,0.092448,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
1,73,58,C#5,5,1.108073,1.173177,0.065104,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
2,68,58,G#4,4,1.207031,1.268229,0.061198,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
3,73,62,C#5,5,1.315104,1.375,0.059896,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
4,49,32,C#3,3,0.998698,1.401042,0.402344,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...


In [5]:
all_notes_df["filename"].nunique() # check number of unique files in our code 

500

## Version 2: try to extract instrument information correctly

In [6]:
import os

def extract_all_midi_files_3(folder, max_files=500): # this works 
    paths = list(Path(folder).rglob("*.[mM][iI][dD]*"))[:max_files]
    print(f"Found {len(paths)} MIDI files")

    all_dfs = []
    for path in paths:
        df = extract_note_features_with_instrument(str(path))
        if not df.empty:
            df["filename"] = path.name
            all_dfs.append(df)

    if all_dfs:
        return pd.concat(all_dfs, ignore_index=True)
    else:
        return pd.DataFrame()

**Successfully extract the instrument information now**

In [7]:
# second version that also extract information related to instrument used 

data_dir = "/Users/yang/Desktop/Yale Spring 2025/CPSC 552 Deep learning theory and applications /DeepL project - music generation /Data set/Maestro/maestro-v3.0.0"
extracted_mae_df = extract_all_midi_files_3(data_dir)
extracted_mae_df.head()

Found 500 MIDI files


Unnamed: 0,pitch,velocity,note_name,octave,start,end,duration,instrument,filename
0,77,56,F5,5,0.994792,1.08724,0.092448,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
1,73,58,C#5,5,1.108073,1.173177,0.065104,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
2,68,58,G#4,4,1.207031,1.268229,0.061198,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
3,73,62,C#5,5,1.315104,1.375,0.059896,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...
4,49,32,C#3,3,0.998698,1.401042,0.402344,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...


In [None]:
extracted_mae_df.to_csv("/Users/yang/Documents/processed_notes_lakh_instru.csv", index=False) 

### Data preparation & Preprocessing 


In [11]:
# Full pipeline for symbolic MusicGen-style training using extracted note features (with chord conditioning)

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np
from collections import Counter

# Configuration
D_MODEL = 256
NUM_PITCHES = 128
NUM_VELOCITIES = 32
NUM_DURATIONS = 32
NUM_CHORDS = 12  # 12 pitch classes (C, C#, D, ..., B)
NUM_INSTRUMENTS = 128
SEQ_LEN = 64

In [12]:
# Chord Estimation from Notes (chromagram-inspired)
def estimate_chords(df):
    df = df.copy()
    df["chord"] = -1
    filenames = df["filename"].unique()
    for fname in filenames:
        song = df[df["filename"] == fname].copy()
        song = song.sort_values("start")
        chords = []
        for i in range(0, len(song), SEQ_LEN):
            segment = song.iloc[i:i+SEQ_LEN]
            pitch_classes = [p % 12 for p in segment["pitch"]]
            if len(pitch_classes) == 0:
                chord_id = 0
            else:
                chord_id = Counter(pitch_classes).most_common(1)[0][0]
            chords += [chord_id] * len(segment)
        df.loc[df["filename"] == fname, "chord"] = chords
    return df

In [13]:
# data preparation 
def discretize_velocity(velocity):
    return min(int(velocity // 4), NUM_VELOCITIES - 1)

def discretize_duration(duration):
    
    idx = np.floor(duration / 0.1).astype(int)
    # return min(int(duration / 0.1), NUM_DURATIONS - 1) # original version, change to avoid overflow
    return np.clip(idx, 0, NUM_DURATIONS - 1)

def build_sequence_tensor(df, max_seq_len=SEQ_LEN):
    sequences = []
    grouped = df.groupby("filename")
    
    # For debug: record all values
    all_durations = []
    all_velocities = []

    for _, group in grouped:
        group = group.sort_values("start")
        for i in range(0, len(group) - max_seq_len, max_seq_len):
            chunk = group.iloc[i:i+max_seq_len]
            pitch = torch.tensor(chunk["pitch"].values, dtype=torch.long)
            velocity = torch.tensor(chunk["velocity"].apply(discretize_velocity).values, dtype=torch.long)
            duration = torch.tensor(chunk["duration"].apply(discretize_duration).values, dtype=torch.long)
            chord = torch.tensor(chunk["chord"].values, dtype=torch.long)
            instrument = torch.tensor(chunk["instrument"].values, dtype=torch.long)
            sequences.append((pitch, velocity, duration, chord, instrument)) # cannot successfully obtain instru
            
            # save for debug
            all_durations.extend(duration.tolist())
            all_velocities.extend(velocity.tolist())
            
            
    # After discretization inside build_sequence_tensor
    print("Duration max:", max(all_durations))
    print("Duration min:", min(all_durations))
    print("Velocity max:", max(all_velocities))
    print("Velocity min:", min(all_velocities))

    
    return sequences

In [14]:

# Dataset Wrapper
class SequenceDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx]

In [15]:
#df = estimate_chords(all_notes_df)
df = estimate_chords(extracted_mae_df)
sequences = build_sequence_tensor(df)
dataset = SequenceDataset(sequences)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

Duration max: 31
Duration min: 0
Velocity max: 31
Velocity min: 0


In [16]:
df.head()

Unnamed: 0,pitch,velocity,note_name,octave,start,end,duration,instrument,filename,chord
0,77,56,F5,5,0.994792,1.08724,0.092448,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...,1
1,73,58,C#5,5,1.108073,1.173177,0.065104,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...,1
2,68,58,G#4,4,1.207031,1.268229,0.061198,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...,1
3,73,62,C#5,5,1.315104,1.375,0.059896,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...,1
4,49,32,C#3,3,0.998698,1.401042,0.402344,0,ORIG-MIDI_01_7_7_13_Group__MID--AUDIO_12_R1_20...,1


## Import and use REMI tokenizer

In [17]:
import miditok

# Initialize tokenizer
tokenizer = miditok.REMI()  # you can also try TSD, CPWord, Octuple, etc.

In [None]:
# 1. Create tokenizer
from miditok import REMI, TokenizerConfig
config = TokenizerConfig(num_velocities=16, use_chords=True, use_programs=True) # change the use_program = False for 1 instrument 
tokenizer = REMI(config)


  super().__init__(tokenizer_config, params)


In [19]:
from miditok import REMI, TokenizerConfig
from miditoolkit import MidiFile
from pathlib import Path
from symusic import Score

In [20]:
# 1. Create REMI tokenizer
config = TokenizerConfig(
    num_velocities=16,
    use_chords=True,
    use_programs=True
)
tokenizer = REMI(config)

# 2. Extract all MIDI files
data_dir = Path("/Users/yang/Desktop/Yale Spring 2025/CPSC 552 Deep learning theory and applications /DeepL project - music generation /Data set/Maestro/maestro-v3.0.0")
paths = list(data_dir.rglob("*.[mM][iI][dD]*"))
print(f"Found {len(paths)} MIDI files.")

Found 1276 MIDI files.


In [21]:
# tokenize midis
token_seqs = []
for path in paths:
    midi = Score(str(path))   # <=== MUST use Score now
    tokens = tokenizer(midi)
    token_seqs.append(tokens.ids)

# Revising model (complex version)

In [23]:
import torch
import torch.nn as nn
import math


'''
d_model = dimension of my model embeddings = size of the vector for each token at every time step

'''

# updated model 
# ==== Relative Multihead Attention ====
class RelativeMultiHeadAttn(nn.Module):
    def __init__(self, d_model, n_head):
        super().__init__()
        self.n_head = n_head
        self.d_head = d_model // n_head  # divide so that each head work on subvector 
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False) # project input vectors of size d_model in to qkv
        self.o = nn.Linear(d_model, d_model, bias=False) # project concatenated multi-head back to original embedding 
        self.r_r_bias = nn.Parameter(torch.randn(n_head, self.d_head)) # initialize with random small value 
        self.r_w_bias = nn.Parameter(torch.randn(n_head, self.d_head))
        self.rel_embed = nn.Embedding(512, self.d_head)  # relative position embedding

    def forward(self, h, mask=None):
        B, T, D = h.shape
        qkv = self.qkv(h).view(B, T, 3, self.n_head, self.d_head)
        q, k, v = qkv.unbind(dim=2)  # shapes: (B, T, n_head, d_head)

        AC = torch.einsum('bthd,bThd->bhtT', (q + self.r_w_bias, k))

        positions = torch.arange(T, device=h.device)
        rel = positions[None, :] - positions[:, None]
        rel = rel.clamp(min=0, max=511)
        r = self.rel_embed(rel)
        BD = torch.einsum('bthd,Ttd->bhtT', (q + self.r_r_bias, r))

        scores = (AC + BD) / math.sqrt(self.d_head)

        if mask is not None:
            scores = scores.masked_fill(mask[:, None, None, :], -1e9)

        attn = torch.softmax(scores, dim=-1)
        out = torch.einsum('bhtT,bThd->bthd', attn, v)
        out = out.contiguous().view(B, T, D)
        return self.o(out)

# ==== FiLM Layer ====
class FiLMLayer(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.to_gamma = nn.Linear(d_model, d_model)
        self.to_beta = nn.Linear(d_model, d_model)

    def forward(self, x, chord_emb):
        if chord_emb is None:
            return x  # Skip if no chord conditioning (during generation)
        gamma = torch.tanh(self.to_gamma(chord_emb))
        beta = self.to_beta(chord_emb)
        return gamma * x + beta

# ==== Decoder Layer ====
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff):
        super().__init__()
        self.attn = RelativeMultiHeadAttn(d_model, n_head)
        self.ln1 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Linear(d_ff, d_model)
        )
        self.ln2 = nn.LayerNorm(d_model)
        self.film = FiLMLayer(d_model)

    def forward(self, x, chord_emb, mask=None):
        h = self.attn(self.ln1(x), mask)
        x = x + h
        x = self.film(x, chord_emb)
        h2 = self.ff(self.ln2(x))
        return x + h2

# ==== Chord Encoder ====
class ChordEncoder(nn.Module):
    def __init__(self, d_model=256, n_layer=2, n_head=4):
        super().__init__()
        self.emb = nn.Embedding(13, d_model)  # 12 roots + NO_CHORD
        self.layers = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model, n_head, d_model * 4, batch_first=True)
            for _ in range(n_layer)
        ])
        self.pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, chord_seq):
        x = self.emb(chord_seq)
        for layer in self.layers:
            x = layer(x)
        pooled = self.pool(x.transpose(1, 2)).squeeze(-1)
        return pooled

# ==== Full Music Transformer ====
class MusicTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=512, n_layer=8, n_head=8, d_ff=2048):
        super().__init__()

        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Parameter(torch.randn(1, 1024, d_model))
        self.token_ln = nn.LayerNorm(d_model)

        self.decoder = nn.ModuleList([
            DecoderLayer(d_model, n_head, d_ff)
            for _ in range(n_layer)
        ])

        self.ln_final = nn.LayerNorm(d_model)
        self.output_head = nn.Linear(d_model, vocab_size)

        self.chord_encoder = ChordEncoder(d_model=d_model // 2)
        self.proj_chord = nn.Linear(d_model // 2, d_model)

    def forward(self, tokens, chord_seq=None, mask=None):
        B, T = tokens.shape

        x = self.token_emb(tokens) + self.pos_emb[:, :T, :]
        x = self.token_ln(x)

        chord_emb = None
        if chord_seq is not None:
            chord_emb = self.proj_chord(self.chord_encoder(chord_seq)).unsqueeze(1)  # (B, 1, d_model)

        for layer in self.decoder:
            x = layer(x, chord_emb, mask)

        h = self.ln_final(x)
        return self.output_head(h)

In [24]:
from torch.utils.data import Dataset

class SequenceDataset(Dataset):
    def __init__(self, df, seq_len=128):
        self.seq_len = seq_len
        self.samples = []

        # Group by filename (each file is a sequence)
        for fname, group in df.groupby('filename'):
            pitch = torch.tensor(group['pitch'].values, dtype=torch.long)
            velocity = torch.tensor(group['velocity'].values, dtype=torch.long)
            duration = torch.tensor(group['duration'].values, dtype=torch.long)  # make sure it is already int mapped
            instrument = torch.tensor(group['instrument'].values, dtype=torch.long)
            chord = torch.tensor(group['chord'].values, dtype=torch.long)

            # Create sliding windows
            total_len = pitch.shape[0]
            if total_len >= seq_len + 1:
                for i in range(0, total_len - seq_len):
                    self.samples.append((
                        pitch[i:i+seq_len],
                        velocity[i:i+seq_len],
                        duration[i:i+seq_len],
                        instrument[i:i+seq_len],
                        chord[i:i+seq_len],
                        pitch[i+1:i+1+seq_len],
                        velocity[i+1:i+1+seq_len],
                        duration[i+1:i+1+seq_len],
                        instrument[i+1:i+1+seq_len],
                    ))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]

In [25]:
dataset = SequenceDataset(df, seq_len=128)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)

In [31]:
def train_model(model, train_loader, num_epochs=5, lr=1e-4, device='cuda'):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_loader:
            # Now batch is simple (B, T)
            tokens = batch.to(device)

            optimizer.zero_grad()

            # Predict next token for each position
            logits = model(tokens[:, :-1])  # input tokens except last
            target = tokens[:, 1:]          # target tokens shifted by one

            B, T, vocab_size = logits.shape

            loss = criterion(logits.reshape(B*T, vocab_size), target.reshape(B*T))
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")

    return model

## Try to train on REMI

In [28]:
import torch
from torch.utils.data import Dataset

class MusicTokenDataset(Dataset):
    def __init__(self, token_seqs, max_seq_len=1024):
        self.samples = []
        for tokens in token_seqs:
            # break long tokens into smaller chunks
            for i in range(0, len(tokens) - 1, max_seq_len):
                chunk = tokens[i:i+max_seq_len+1]
                if len(chunk) > 1:  # at least (input, output)
                    self.samples.append(chunk)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        tokens = torch.tensor(self.samples[idx], dtype=torch.long)
        return tokens[:-1], tokens[1:]  # (input, target)

In [29]:
from torch.utils.data import DataLoader

dataset = MusicTokenDataset(token_seqs, max_seq_len=1024)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [33]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = MusicTransformer(
    vocab_size=tokenizer.vocab_size,   # single unified vocab size!
    d_model=512,
    n_layer=8,
    n_head=8,
    d_ff=2048
).to(device)

In [34]:
import torch

def generate_music(model, tokenizer, max_steps=512, device='cpu', start_tokens=None):
    model.eval()
    generated = []

    if start_tokens is None:
        # random start: pick a valid token
        start_token = torch.randint(0, tokenizer.vocab_size, (1,), device=device).item()
        generated = [start_token]
    else:
        generated = start_tokens

    generated = torch.tensor(generated, device=device).unsqueeze(0)  # shape (1, T)

    for _ in range(max_steps):
        logits = model(generated)  # (B, T, vocab_size)
        logits = logits[:, -1, :]  # Take the last time step's logits
        probs = torch.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)
        generated = torch.cat([generated, next_token], dim=1)

    return generated.squeeze(0).tolist()  # return as list of token ids

In [35]:
tokens = generate_music(model, tokenizer, max_steps=512, device=device)

In [37]:
from miditok import TokSequence

# Create a TokSequence
generated_seq = TokSequence(ids=tokens)

# Decode back to MIDI Score
generated_score = tokenizer(generated_seq)

# Save to MIDI file
generated_score.dump_midi("generated_music_10.mid")

In [38]:
import torch

def generate_music_conditioned(model, tokenizer, chord_seq, max_steps=512, device='cpu', start_tokens=None):
    model.eval()

    generated = []
    if start_tokens is None:
        start_token = torch.randint(0, tokenizer.vocab_size, (1,), device=device).item()
        generated = [start_token]
    else:
        generated = start_tokens

    generated = torch.tensor(generated, device=device).unsqueeze(0)  # (1, T)

    chord_seq = torch.tensor(chord_seq, device=device).unsqueeze(0)  # (1, T_chords)

    for step in range(max_steps):
        # Slice the chord token to feed
        if step < chord_seq.shape[1]:
            current_chord = chord_seq[:, step].unsqueeze(1)  # (B=1, T=1)
        else:
            current_chord = chord_seq[:, -1].unsqueeze(1)  # Repeat last chord if out of range

        logits = model(generated, chord_seq=current_chord)  # forward with chord conditioning
        logits = logits[:, -1, :]  # (B, vocab_size)

        probs = torch.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1)

        generated = torch.cat([generated, next_token], dim=1)

    return generated.squeeze(0).tolist()

In [39]:
# Example chord sequence: [C major, G major, A minor, F major]
# Assume chord classes are like {0: C, 1: C#, 2: D, ..., 11: B, 12: no-chord}

chord_progression = [0, 7, 9, 5]  # C, G, A, F

# Generate music conditioned on this chord progression
tokens = generate_music_conditioned(
    model,
    tokenizer,
    chord_seq=chord_progression,   # your given chord list
    max_steps=512,
    device=device
)

# Then decode the same way:
from miditok import TokSequence
generated_seq = TokSequence(ids=tokens)
generated_score = tokenizer(generated_seq)
generated_score.dump_midi("generated_conditioned_1.mid")