# Train a model to generate structure of a piece (MELONS-inspired)

1. Read structure dataset from POP909_structure
2. Pre-process str into graph format
3. Setup transformer model
4. Train-val split, data loader
5. Evaluate model predictions

In [103]:
import torch
import os
import re

In [72]:
structure_path = "POP909_structure"

In [82]:
labels = []
for folder in os.listdir(structure_path):
    try:
        f = open(f"{structure_path}/{folder}/human_label1.txt", "r")
        # print(f.read())
        labels.append(f.read())
    except:
        continue

In [84]:
labels[:5]

['i8A8A8B8C4C4b4b4x2A8B8C4C4C4C4X1o1\n',
 'i4A4A4B4B4C4C4C4D4x4B4B4C4C4C4D4X3\n',
 'i4A8A8B4C4b5x1b5A8B4x1C4C4\n',
 'i18A8A8A9x14B8A10B8A10o4\n',
 'X4b4A8B12b4A8B12b4b4B12o4\n']

In [102]:
def split_string(s):
    # This regex pattern matches a letter followed by one or more digits
    pattern = re.compile(r'[a-zA-Z]\d+')
    # Find all matches in the string
    matches = pattern.findall(s)
    return matches

In [107]:
split_string(labels[3])

['i18', 'A8', 'A8', 'A9', 'x14', 'B8', 'A10', 'B8', 'A10', 'o4']

In [90]:
# def split_into_pairs(input_string):
#     # Initialize an empty list to hold the pairs
#     pairs = []
    
#     # Iterate over the string in steps of 2
#     for i in range(0, len(input_string), 2):
#         next_pair = input_string[i:i+2]
#         if next_pair != "\n":
#             # Append the substring of the next two characters to the list
#             pairs.append(next_pair)
    
#     return pairs

In [91]:
split_into_pairs("i4A4A4B9b4A4B9b4B9X5o2")

['i4', 'A4', 'A4', 'B9', 'b4', 'A4', 'B9', 'b4', 'B9', 'X5', 'o2']

In [108]:
all_phrases = []
for label in labels:
    all_phrases.append(split_string(label))

In [109]:
all_phrases[:3]

[['i8',
  'A8',
  'A8',
  'B8',
  'C4',
  'C4',
  'b4',
  'b4',
  'x2',
  'A8',
  'B8',
  'C4',
  'C4',
  'C4',
  'C4',
  'X1',
  'o1'],
 ['i4',
  'A4',
  'A4',
  'B4',
  'B4',
  'C4',
  'C4',
  'C4',
  'D4',
  'x4',
  'B4',
  'B4',
  'C4',
  'C4',
  'C4',
  'D4',
  'X3'],
 ['i4',
  'A8',
  'A8',
  'B4',
  'C4',
  'b5',
  'x1',
  'b5',
  'A8',
  'B4',
  'x1',
  'C4',
  'C4']]

In [44]:
def get_phrase_edge_type(prev_phrase, curr_phrase, prev_phrase_idx, curr_phrase_idx):
    """
    Edge types:
    0: Intro to Any
    1: Any to Outro
    2: Repeated phrase
    3: Melody to Melody
    4: Melody to Non-Melody
    5: Non-Melody to Melody
    6: Non-Melody to Non-Melody
    """
    # print(prev_phrase_idx, curr_phrase_idx)
    
    prev_phrase_type = prev_phrase[0]
    curr_phrase_type = curr_phrase[0]
    
    if prev_phrase == curr_phrase:
            return 2
    
    if prev_phrase_idx + 1 == curr_phrase_idx:
        # print(prev_phrase_type)
    
        if prev_phrase_type == "i":
            return 0
        elif curr_phrase_type == "o":
            return 1
        elif prev_phrase_type.isupper() & curr_phrase_type.isupper():
            return 3
        elif prev_phrase_type.isupper() & curr_phrase_type.islower():
            return 4
        elif prev_phrase_type.islower() & curr_phrase_type.isupper():
            return 5
        elif prev_phrase_type.islower() & curr_phrase_type.islower():
            return 6
        else:
            return None

In [28]:
get_phrase_edge_type("B4", "B9", 0, 3)

In [99]:
all_phrases[0]

['i8',
 'A8',
 'A8',
 'B8',
 'C4',
 'C4',
 'b4',
 'b4',
 'x2',
 'A8',
 'B8',
 'C4',
 'C4',
 'C4',
 'C4',
 'X1',
 'o1']

In [118]:
def create_sequence(phrases):
    # Create sequence of edges from phrase, where each item is a tuple (i, j, edge type, num bars in i, num bars in j)
    seq = []
    for i, phrase_from in enumerate(phrases):
        for j, phrase_to in enumerate(phrases[i+1:]):
            phrase_to_idx = j+i+1
            edge_type = get_phrase_edge_type(phrase_from, phrase_to, i, phrase_to_idx)
            if edge_type is not None:
                phrase_from_len = int(phrase_from[1])
                phrase_to_len = int(phrase_to[1])
                seq.append((i, phrase_to_idx, edge_type, phrase_from_len, phrase_to_len))
    
    # Append END token
    seq.append((len(phrases)-1, len(phrases), 7, 0, 0))
    return seq

In [119]:
seqs = []
for phrases in all_phrases:
    # print(phrases)
    seqs.append(create_sequence(phrases))

In [120]:
seqs[0]

[(0, 1, 0, 8, 8),
 (1, 2, 2, 8, 8),
 (1, 9, 2, 8, 8),
 (2, 3, 3, 8, 8),
 (2, 9, 2, 8, 8),
 (3, 4, 3, 8, 4),
 (3, 10, 2, 8, 8),
 (4, 5, 2, 4, 4),
 (4, 11, 2, 4, 4),
 (4, 12, 2, 4, 4),
 (4, 13, 2, 4, 4),
 (4, 14, 2, 4, 4),
 (5, 6, 4, 4, 4),
 (5, 11, 2, 4, 4),
 (5, 12, 2, 4, 4),
 (5, 13, 2, 4, 4),
 (5, 14, 2, 4, 4),
 (6, 7, 2, 4, 4),
 (7, 8, 6, 4, 2),
 (8, 9, 5, 2, 8),
 (9, 10, 3, 8, 8),
 (10, 11, 3, 8, 4),
 (11, 12, 2, 4, 4),
 (11, 13, 2, 4, 4),
 (11, 14, 2, 4, 4),
 (12, 13, 2, 4, 4),
 (12, 14, 2, 4, 4),
 (13, 14, 2, 4, 4),
 (14, 15, 3, 4, 1),
 (15, 16, 1, 1, 1),
 (16, 17, 7, 0, 0)]

In [52]:
seq

[(0, 1, 0, 4, 4),
 (1, 2, 2, 4, 4),
 (1, 5, 2, 4, 4),
 (2, 3, 3, 4, 9),
 (2, 5, 2, 4, 4),
 (3, 4, 4, 9, 4),
 (3, 6, 2, 9, 9),
 (3, 8, 2, 9, 9),
 (4, 5, 5, 4, 4),
 (4, 7, 2, 4, 4),
 (5, 6, 3, 4, 9),
 (6, 7, 4, 9, 4),
 (6, 8, 2, 9, 9),
 (7, 8, 5, 4, 9),
 (8, 9, 3, 9, 5),
 (9, 10, 1, 5, 2)]

In [54]:
# add END token
seq.append((10, 11, 7, 0, 0))

In [124]:
def create_input_output_pairs(seq):
    # Create input and output pairs
    input_seqs = []
    output_seqs = []
    for idx in range(1, len(seq)):
        input_seqs.append(seq[:idx])
        output_seqs.append(seq[idx:])
    return input_seqs, output_seqs

In [125]:
inputs = []
outputs = []

for seq in seqs:
    input_seqs, output_seqs = create_input_output_pairs(seq)
    inputs.append(input_seqs)
    outputs.append(output_seqs)

In [129]:
inputs[0][3]

[(0, 1, 0, 8, 8), (1, 2, 2, 8, 8), (1, 9, 2, 8, 8), (2, 3, 3, 8, 8)]

In [130]:
outputs[0][3]

[(2, 9, 2, 8, 8),
 (3, 4, 3, 8, 4),
 (3, 10, 2, 8, 8),
 (4, 5, 2, 4, 4),
 (4, 11, 2, 4, 4),
 (4, 12, 2, 4, 4),
 (4, 13, 2, 4, 4),
 (4, 14, 2, 4, 4),
 (5, 6, 4, 4, 4),
 (5, 11, 2, 4, 4),
 (5, 12, 2, 4, 4),
 (5, 13, 2, 4, 4),
 (5, 14, 2, 4, 4),
 (6, 7, 2, 4, 4),
 (7, 8, 6, 4, 2),
 (8, 9, 5, 2, 8),
 (9, 10, 3, 8, 8),
 (10, 11, 3, 8, 4),
 (11, 12, 2, 4, 4),
 (11, 13, 2, 4, 4),
 (11, 14, 2, 4, 4),
 (12, 13, 2, 4, 4),
 (12, 14, 2, 4, 4),
 (13, 14, 2, 4, 4),
 (14, 15, 3, 4, 1),
 (15, 16, 1, 1, 1),
 (16, 17, 7, 0, 0)]

In [161]:
inputs_flat = [seq for seqs in inputs for seq in seqs]
outputs_flat = [seq for seqs in outputs for seq in seqs]

In [162]:
len(inputs_flat)

19982

In [163]:
max(len(x) for x in inputs_flat)

150

In [164]:
len(outputs_flat)

19982

In [165]:
max(len(x) for x in outputs_flat)

150

In [166]:
len(tokens)

31

In [167]:
len(set([token for tokens in inputs_flat for token in tokens]))

2932

In [168]:
len(set([token for tokens in outputs_flat for token in tokens]))

2897

## Dataloader

In [179]:
import torch
from torch.nn.utils.rnn import pad_sequence

def sequences_to_tensor(sequences, padding_value=0):
    """
    Convert a list of sequences of different lengths to a padded tensor.

    Args:
        sequences (list of list of tuples): List of sequences where each sequence is a list of tuples.
        padding_value (int, optional): Value to use for padding. Defaults to 0.

    Returns:
        torch.Tensor: Padded tensor of shape (batch_size, max_length, tuple_length)
    """
    # Convert each sequence to a tensor
    tensor_sequences = [torch.tensor(seq) for seq in sequences]

    # Pad sequences to the length of the longest sequence
    padded_sequences = pad_sequence(tensor_sequences, batch_first=True, padding_value=padding_value)

    return padded_sequences


In [182]:
padded_input = sequences_to_tensor(inputs_flat, padding_value=0)
padded_output = sequences_to_tensor(outputs_flat, padding_value=0)

print("Padded input shape:", padded_input.shape)
print("Padded output shape:", padded_output.shape)

Padded input shape: torch.Size([19982, 150, 5])
Padded output shape: torch.Size([19982, 150, 5])


In [185]:
# Define the custom dataset
class TupleSequenceDataset(Dataset):
    def __init__(self, input_sequences, output_sequences):
        self.input_sequences = input_sequences
        self.output_sequences = output_sequences

    def __len__(self):
        return len(self.input_sequences)

    def __getitem__(self, idx):
        return [self.input_sequences[idx], self.output_sequences[idx]]

# Parameters
batch_size = 32
shuffle = True

# Create the dataset
dataset = TupleSequenceDataset(padded_input, padded_output)

# Create the DataLoader
dataloader = DataLoader(input_dataset, batch_size=batch_size, shuffle=shuffle)


## Autoregression transformer

In [206]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence


In [203]:
# Define the Autoregressive Transformer Model
class AutoregressiveTransformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_size, ff_size, dropout_rate, batch_size, tuple_size):
        super(AutoregressiveTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = nn.Parameter(torch.zeros(batch_size, max_seq_length, tuple_size, embed_size))
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_size,
            nhead=num_heads,
            dim_feedforward=ff_size,
            dropout=dropout_rate
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, src_mask):
        # Add positional encoding to the input
        seq_length = x.size(1)
        print(self.embedding(x).shape)
        print(self.positional_encoding[:, :seq_length, :].shape)
        x = self.embedding(x) + self.positional_encoding[:, :seq_length, :]
        x = self.dropout(x)
        x = self.transformer_encoder(x, src_mask)
        logits = self.fc_out(x)
        return logits

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


In [213]:
class PositionalEncoding(nn.Module):
    def __init__(self, tuple_length, max_length=max_seq_length):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_length, tuple_length)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, tuple_length, 2).float() * (-torch.log(torch.tensor(10000.0)) / tuple_length))
        
        print(pe.shape)
        print(position.shape)
        print(div_term.shape)
        print((position * div_term).shape)
        print(pe[:, 1::2].shape)

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (batch_size, seq_len, tuple_length)
        seq_len = x.size(1)
        return x + self.pe[:seq_len, :].unsqueeze(0)

class AutoregressiveTransformer(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, num_heads, hidden_size, ff_size, dropout_rate):
        super(AutoregressiveTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_size,
            nhead=num_heads,
            dim_feedforward=ff_size,
            dropout=dropout_rate
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, src_mask):
        # x: (batch_size, seq_len, tuple_length)
        x = self.embedding(x)  # Convert token IDs to embeddings
        x = self.positional_encoding(x)
        x = self.dropout(x)
        x = x.permute(1, 0, 2)  # (batch_size, seq_len, embed_size) -> (seq_len, batch_size, embed_size)
        x = self.transformer_encoder(x, src_mask)
        x = x.permute(1, 0, 2)  # (seq_len, batch_size, embed_size) -> (batch_size, seq_len, embed_size)
        logits = self.fc_out(x)
        return logits

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


In [215]:
# Hyperparameters
vocab_size = 2932  # Example vocab size
embed_size = 6
num_layers = 4
num_heads = 4
hidden_size = embed_size
ff_size = 1024
dropout_rate = 0.1
learning_rate = 1e-4
max_seq_length = 150

# Initialize the model, optimizer, and loss function
model = AutoregressiveTransformer(vocab_size, embed_size, num_layers, num_heads, hidden_size, ff_size, dropout_rate)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

torch.Size([150, 6])
torch.Size([150, 1])
torch.Size([3])
torch.Size([150, 3])
torch.Size([150, 3])


AssertionError: embed_dim must be divisible by num_heads

In [216]:
## TODO: Figure out shapes!!

In [None]:
# Example input sequence
input_sequence = torch.randint(0, vocab_size, (32, max_seq_length))  # batch size of 32 for illustration
src_mask = model.generate_square_subsequent_mask(max_seq_length)

# Training loop (simplified)
model.train()
for epoch in range(10):  # number of epochs
    optimizer.zero_grad()
    output = model(input_sequence, src_mask)
    loss = criterion(output.view(-1, vocab_size), input_sequence.view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")

print("Training complete.")