In [1]:
import ast
import random
import time

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from sklearn.model_selection import KFold

## Import Dataset and Definition of Useful functions

### Import Dataset

In [2]:
def safe_parse_all_columns_df(df):
    """
    Parse all columns in a DataFrame to numeric, coercing errors.
    """
    df['notes'] = df['notes'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    df['chords'] = df['chords'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    df['velocities'] = df['velocities'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    df['durations'] = df['durations'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    df['offsets'] = df['offsets'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    df['ordered_events'] = df['ordered_events'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
    return df

def load_dataframe_from_two_csvs(file1, file2):
    """
    Load and concatenate two CSV files into a single pandas DataFrame.
    """
    df1 = pd.read_csv(file1)
    df2 = pd.read_csv(file2)
    full_df = pd.concat([df1, df2], ignore_index=True)
    full_df = safe_parse_all_columns_df(full_df)

    return full_df

def save_dataframe_to_two_csvs(df, file1, file2):
    """
    Split a DataFrame in half and save it into two CSV files.
    """
    halfway = len(df) // 2
    df.iloc[:halfway].to_csv(file1, index=False)
    df.iloc[halfway:].to_csv(file2, index=False)

def load_dataframe_from_one_csv(file):
    """
    Load a DataFrame from a single CSV file.
    """
    df = pd.read_csv(file)
    
    return df

def save_dataframe_to_one_csv(df, file):
    """
    Save a DataFrame to a single CSV file.
    """
    df.to_csv(file, index=True)

def load_reconstructed_events(file):
    """
    Loads the reconstructed events CSV and safely parses the 'sequence' column,
    converting notes to integers and chords to lists of integers.
    """
    df = pd.read_csv(file)

    def safe_parse(seq_str):
        try:
            parsed = ast.literal_eval(seq_str)
            if not isinstance(parsed, list):
                raise ValueError("Parsed sequence is not a list")

            normalized = []
            for el in parsed:
                if isinstance(el, list):
                    normalized.append([int(x) for x in el])
                else:
                    normalized.append(int(el))
            return normalized

        except Exception as e:
            print(f"Error parsing sequence: {seq_str}")
            raise e

    df['sequence'] = df['sequence'].apply(safe_parse)
    return df

In [3]:
root = 'data_processed/'
file1 = root + 'data_part1.csv'
file2 = root + 'data_part2.csv'

df = load_dataframe_from_two_csvs(file1, file2)

### Useful functions

In [4]:
def parse_chord_to_list(chord):
    """
    Convert a chord string to a list of integers.
    """
    if isinstance(chord, str):
        print([int(x) for x in chord.split(',') if x.isdigit()])
        return [int(x) for x in chord.split(',') if x.isdigit()]
    return []

In [5]:
def reconstruct_ordered_events(df):
    """
    Reconstruct the ordered list of events (notes and chords) for each song.
    """
    sequences  = []

    for i in range(len(df)):
        idx_note = 0
        idx_chord = 0
        reconstructed = []

        for element in df['ordered_events'][i]:
            if element == 'n':
                reconstructed.append(df['notes'][i][idx_note])
                idx_note += 1
            elif element == 'c':
                parsed_chord = parse_chord_to_list(df['chords'][i][idx_chord])
                reconstructed.append(df['chords'][i][idx_chord])
                idx_chord += 1
            else:
                raise ValueError(f"Unknown event type: {e}")
        
        sequences.append(reconstructed)

    reconstructed_dataset = pd.DataFrame({'sequence': sequences})
    reconstructed_dataset.index.name = 'index'

    return reconstructed_dataset

In [6]:
save_dataframe_to_one_csv(reconstruct_ordered_events(df), root + 'reconstructed_ordered_events.csv')

## Predict only Events (Notes and Chords)

### Creating the data: Fixed number of events 

Idea for creating the input sequences:
- we take subsets of the list of events representing each song 
- we take the next event of each subset as corresponding training output sequences

This is easy to implement and we will have a consistent sequence lenght for batching, but we are ignoring the timing aspect.

In [7]:
class Vocabulary:
    def __init__(self, reconstructed_df):
        """
        Build vocabulary of unique single notes only.
        """
        self.notes = set()
        for i in range(len(reconstructed_df)):
            sequence = reconstructed_df['sequence'][i]
            for event in sequence:
                if isinstance(event, list):
                    for note in event:
                        self.notes.add(note)
                else:
                    self.notes.add(event)

        self.notes = sorted(self.notes)
        self.note_to_idx = {note: idx for idx, note in enumerate(self.notes)}
        self.idx_to_note = {idx: note for idx, note in enumerate(self.notes)}
        self.vocab_size = len(self.notes)

    def encode_event(self, event):
        """
        Encode an event as a multi-hot vector over single notes.
        """
        vec = np.zeros(self.vocab_size, dtype=np.float32)
        if isinstance(event, list):
            for note in event:
                vec[self.note_to_idx[note]] = 1.0
        else:
            vec[self.note_to_idx[event]] = 1.0
        return vec

    def decode_event(self, vec, threshold=0.5):
        """
        Decode multi-hot vector to list of notes.
        """
        indices = np.where(vec >= threshold)[0]
        notes = [self.idx_to_note[idx] for idx in indices]
        if len(notes) == 1:
            return notes[0]
        else:
            return notes

    def __len__(self):
        return self.vocab_size


Create Dataset object

In [8]:
class MusicEventDataset(Dataset):
    def __init__(self, reconstructed_df, vocab, seq_length=50):
        """
        Constructs all valid (input_seq, target_event) pairs from each song in the dataset.

        Args:
            reconstructed_df: DataFrame with 'sequence' column where each entry is a list of events
            vocab: Vocabulary object to encode events
            seq_length: Length of each training input sequence (target is the next event)
        """
        self.samples = []
        self.seq_length = seq_length
        self.vocab = vocab

        for row_index in range(len(reconstructed_df)):
            sequence = reconstructed_df['sequence'][row_index]
            n_events = len(sequence)

            if n_events <= seq_length:
                continue

            for i in range(n_events - seq_length):
                input_seq = sequence[i : i + seq_length]
                target_event = sequence[i + seq_length]
                self.samples.append((input_seq, target_event))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        input_seq, target_event = self.samples[idx]

        input_encoded = np.array([self.vocab.encode_event(event) for event in input_seq], dtype=np.float32)
        input_tensor = torch.tensor(input_encoded)

        target_encoded = self.vocab.encode_event(target_event)
        target_tensor = torch.tensor(target_encoded, dtype=torch.float32)

        return input_tensor, target_tensor


In [9]:
reconstructed_dataset = load_reconstructed_events(root + 'reconstructed_ordered_events.csv')

vocab = Vocabulary(reconstructed_dataset)

dataset = MusicEventDataset(reconstructed_dataset, vocab=vocab, seq_length=16)

x, y = dataset[0]

print("Input sequence shape:", x.shape)
print("Next event shape:", y.shape)
print("Input sequence (multi-hot vectors):", x)
print("Next event (multi-hot vector):", y)


Input sequence shape: torch.Size([16, 88])
Next event shape: torch.Size([88])
Input sequence (multi-hot vectors): tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
Next event (multi-hot vector): tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])


In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MusicVAE(nn.Module):
    def __init__(self, input_dim, hidden_dim=256, latent_dim=64, seq_length=16):
        super(MusicVAE, self).__init__()
        self.seq_length = seq_length
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim

        # Encoder
        self.encoder_rnn = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)

        # Decoder
        self.decoder_input = nn.Linear(latent_dim, hidden_dim)
        self.decoder_rnn = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.decoder_output = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        _, h = self.encoder_rnn(x)  # h: (1, batch, hidden_dim)
        h = h.squeeze(0)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z, seq_length):
        batch_size = z.size(0)
        h0 = self.decoder_input(z).unsqueeze(0)  # (1, batch, hidden_dim)

        # Start token: zero vector or learnable embedding
        start_input = torch.zeros((batch_size, 1, self.input_dim), device=z.device)
        outputs = []

        # Autoregressive decoding (can be parallelized with teacher forcing)
        x = start_input
        for _ in range(seq_length):
            out, h0 = self.decoder_rnn(x, h0)
            out_step = self.decoder_output(out)  # (batch, 1, input_dim)
            outputs.append(out_step)
            x = out_step.detach()  # next input is current output

        return torch.cat(outputs, dim=1)  # (batch, seq_length, input_dim)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z, self.seq_length)
        return x_recon, mu, logvar


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MusicVAE(nn.Module):
    def __init__(self, input_dim, hidden_dim=256, latent_dim=64, seq_length=16):
        super(MusicVAE, self).__init__()
        self.seq_length = seq_length
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim

        # Encoder
        self.encoder_rnn = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)

        # Decoder
        self.decoder_input = nn.Linear(latent_dim, hidden_dim)
        self.decoder_rnn = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.decoder_output = nn.Linear(hidden_dim, input_dim)

    def encode(self, x):
        _, h = self.encoder_rnn(x)  # h: (1, batch, hidden_dim)
        h = h.squeeze(0)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z, seq_length):
        batch_size = z.size(0)
        h0 = self.decoder_input(z).unsqueeze(0)  # (1, batch, hidden_dim)

        # Start token: zero vector or learnable embedding
        start_input = torch.zeros((batch_size, 1, self.input_dim), device=z.device)
        outputs = []

        # Autoregressive decoding (can be parallelized with teacher forcing)
        x = start_input
        for _ in range(seq_length):
            out, h0 = self.decoder_rnn(x, h0)
            out_step = self.decoder_output(out)  # (batch, 1, input_dim)
            outputs.append(out_step)
            x = out_step.detach()  # next input is current output

        return torch.cat(outputs, dim=1)  # (batch, seq_length, input_dim)

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        x_recon = self.decode(z, self.seq_length)
        return x_recon, mu, logvar


In [12]:
def vae_loss(x_recon, x, mu, logvar):
    recon_loss = F.binary_cross_entropy_with_logits(x_recon, x, reduction='sum')
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_div


In [15]:
from torch.utils.data import DataLoader
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


batch_size = 32  # or whatever you prefer
num_epochs = 3  # or any number of epochs you want
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
model = MusicVAE(input_dim=vocab.vocab_size, seq_length=16)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)   

for epoch in range(num_epochs):
    print (f'epoch {epoch}')
    for batch_x, _ in dataloader:
        batch_x = batch_x.to(device)
        x_recon, mu, logvar = model(batch_x)
        loss = vae_loss(x_recon, batch_x, mu, logvar)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


epoch 0
epoch 1


KeyboardInterrupt: 