In [27]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd

import numpy as np
import random
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

import torch.optim as optim

In [28]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.head_dim = embed_size // heads

        assert (
            self.head_dim * heads == embed_size
        ), "Embedding size needs to be divisible by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size)

    def forward(self, values, keys, query, mask):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        # Split the embedding into self.heads different pieces
        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(queries)

        # Einsum does matrix multiplication for query * keys for each training example
        # with every other training example, don't be confused by einsum
        # it's just a way to do batch matrix multiplication
        energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
        # Mask padded indices so their weights become 0
        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)

        out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.head_dim
        )

        out = self.fc_out(out)
        return out


In [29]:
class FeedForward(nn.Module):
    def __init__(self, embed_size, hidden_size):
        super(FeedForward, self).__init__()
        self.fc1 = nn.Linear(embed_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, embed_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [30]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        self.feed_forward = FeedForward(embed_size, forward_expansion * embed_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attention(value, key, query, mask)

        # Add skip connection, run through normalization and finally dropout
        x = self.dropout(self.norm1(attention + query))
        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x))
        return out


In [31]:
class MusicTransformer(nn.Module):
    def __init__(self, chord_vocab_size, embed_size, num_layers, heads, device, forward_expansion, dropout, max_length):
        super(MusicTransformer, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(chord_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
            [
                TransformerBlock(
                    embed_size,
                    heads,
                    dropout=dropout,
                    forward_expansion=forward_expansion,
                )
                for _ in range(num_layers)
            ]
        )
        self.dropout = nn.Dropout(dropout)
        # Output layer to predict two notes (assuming each note is represented by a single number)
        self.fc_out = nn.Linear(embed_size, 128)

    def forward(self, x, mask):
        N, sequence_length, chord_size = x.size()

        x = x.view(N, sequence_length * chord_size)
        positions = torch.arange(0, sequence_length * chord_size).expand(N, sequence_length * chord_size).to(self.device)

        out = self.dropout(self.word_embedding(x) + self.position_embedding(positions))
        for layer in self.layers:
            out = layer(out, out, out, mask)

        # Use the output of the first token for classification or apply pooling
        out = out.mean(dim=1)  # Average pooling over the sequence

        out = self.fc_out(out)
        return out


In [32]:
chords = pd.read_csv("output_chords.csv", header=None)
# chords.head

chords_sorted = chords.apply(lambda x: sorted(x), axis=1)

# Drop duplicates to get unique chords
# Count the unique chords
# vocab_size = len(chords_sorted.drop_duplicates())
# print("Vocabulary Size (Unique Chords):", vocab_size)
# # vocab is 10950 for /content/output_chords.csv
vocab_size = 10950


In [33]:

import numpy as np
import random
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch

batch_size=128

# Load the dataset
dataset = pd.read_csv('output_chords.csv')

def create_sequences(dataset, sequence_length=4):
    input_sequences = []
    output_notes = []

    for i in range(round(len(dataset)*1 - sequence_length)):
        sequence = dataset[i:i + sequence_length].values.tolist()

        # Selecting 2 random notes from the 4th chord
        fourth_chord = sequence[-1]
        output = random.sample(fourth_chord, 2)

        # Removing the selected notes from the 4th chord in the input
        for note in output:
            fourth_chord.remove(note)

        input_sequences.append(sequence)
        output_notes.append(output)

    return input_sequences, output_notes

# Create input-output pairs
input_sequences, output_notes = create_sequences(dataset)

# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(input_sequences, output_notes, test_size=0.2, random_state=42,shuffle=False)

# Preparing PyTorch dataset
class ChordDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        # Prepare the input sequence as a 4x3 matrix
        sequence_matrix = []
        for chord in self.sequences[idx]:
            # Pad the chord with zeros if less than 3 notes
            padded_chord = chord + [0] * (3 - len(chord))
            sequence_matrix.append(padded_chord)

        return {
            'sequence': torch.tensor(sequence_matrix, dtype=torch.long),
            'label': torch.tensor(self.labels[idx], dtype=torch.float32)
        }

# Adjust the create_sequences function if needed to ensure correct formatting


# Creating data loaders for training and validation
train_dataset = ChordDataset(X_train, y_train)
val_dataset = ChordDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# The train_loader and val_loader are now ready to be used for training and validation


In [34]:
import torch
import torch.nn as nn
import torch.optim as optim

# Assuming the definition of the MusicTransformer model is available from model.py
# Initialize the Music Transformer model
note_vocab_size = 128  # As specified earlier
embed_size = 384  # Example size, adjust as needed
num_layers = 6  # Example value, adjust as needed
heads = 6  # Example value, adjust as needed
device = torch.device("cpu" if torch.cuda.is_available() else "cpu")
forward_expansion = 4  # Example value, adjust as needed
dropout = 0.3 # Example dropout rate, adjust as needed
max_length = 100  # Maximum sequence length, adjust as needed
epochs=30
learning_rate= 0.0001

# torch.seed(1337)

print(device)
model = MusicTransformer(
    note_vocab_size,
    embed_size,
    num_layers,
    heads,
    device,
    forward_expansion,
    dropout,
    max_length
).to(device)


# for name, param in model.named_parameters():
#   print(f"{name}: {param.size()}")

# Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001)

# Training Loop
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=1):
    model.train()
    for epoch in range(epochs):
        for batch in train_loader:
            optimizer.zero_grad()

            # Extract data and move tensors to the appropriate device
            input_data = batch['sequence'].to(device)
            targets = batch['label'].to(device)
            # Ensure the tensor is of integer type
            targets = targets.long()  # Convert to long if not already an integer type


            # Select the appropriate column for one-hot encoding
            # Adjust this based on your data's structure
            indices = targets[:, 0]  # or targets[:, 1]

            one_hot_col1 = F.one_hot(targets[:, 0], num_classes=128)
            one_hot_col2 = F.one_hot(targets[:, 1], num_classes=128)

            # Concatenate along the last dimension
            one_hot_combined = torch.cat((one_hot_col1, one_hot_col2), dim=-1)

            one_hot_final = one_hot_combined[:, :128]
            # Check if indices are in the valid range

            outputs = model(input_data, None)  # Assuming no mask for simplicity
            # out1, out2 = outputs.split();
            loss = criterion(outputs, one_hot_final.float())
            # Backward pass and optimize
            loss.backward()
            optimizer.step()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        # Validation Loop
        model.eval()
        with torch.no_grad():
            for batch in val_loader:
                input_data = batch['sequence'].to(device)
                targets = batch['label'].to(device)
                targets = targets.long()
                one_hot_col1 = F.one_hot(targets[:, 0], num_classes=128)
                one_hot_col2 = F.one_hot(targets[:, 1], num_classes=128)

                # Concatenate along the last dimension
                one_hot_combined = torch.cat((one_hot_col1, one_hot_col2), dim=-1)

                one_hot_final = one_hot_combined[:, :128]

                outputs = model(input_data, None)
                val_loss = criterion(outputs, one_hot_final.float())

                # Here, you can also calculate accuracy or other metrics as needed

        print(f"Validation Loss: {val_loss.item():.4f}")

    # Save the model
    print('saving model_state_dict')

    hyperparameters = {
      'learning_rate': learning_rate,
      'batch_size': batch_size,
      'num_epochs': epochs,
      'embed_size': embed_size,
      'num_layers': num_layers,
      'heads': heads,
      'forward_expansion': forward_expansion,
      'dropout': dropout,
      'max_length': max_length,
      'note_vocab_size': note_vocab_size,
    }

    # Combine the model's state dict and the hyperparameters in a single dictionary
    save_dict = {
        'model_state_dict': model.state_dict(),
        'hyperparameters': hyperparameters
    }


    torch.save(save_dict, 'model_state_dict.pth')

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=epochs)

# torch.save(model, './music_transformer_complete_model.pt')
# print("Model saved successfully.")

# sample_input = val_dataset.sequences[0]
# # Move input to the same device as the model
# sample_input = sample_input.to(device)
# # Make a prediction
# with torch.no_grad():
#     output = model(sample_input, None)  # Assuming no mask for simplicity

# # The output is likely logits; to get the predicted indices:
# predicted_indices = output.argmax(dim=-1)

# # Convert indices to notes/chords (depends on your specific mapping)
# predicted_notes = [index_to_note[index] for index in predicted_indices[0]]

# print("Predicted Notes/Chords:", predicted_notes)


cpu
Epoch [1/1], Loss: 3.2473
Validation Loss: 3.4490
saving model_state_dict


In [35]:
# Choose a random sample from the validation set
sample_idx = np.random.randint(0, len(val_dataset))
print(sample_idx)
sample_input = val_dataset[sample_idx]['sequence']


print(sample_input)
# Move input to the same device as the model
sample_input = sample_input.unsqueeze(0).to(device)  # Add a batch dimension

# Make a prediction
model.eval()  # Ensure the model is in evaluation mode
with torch.no_grad():
    raw_output = model(sample_input, None)  # Assuming no mask for simplicity


# The output is logits; convert to probabilities
probabilities = torch.softmax(raw_output, dim=-1)

# Get the top 2 predicted notes
top_probabilities, top_indices = torch.topk(probabilities, 2, dim=-1)
top_indices = top_indices.cpu().numpy().flatten()  # Move back to CPU and flatten

# Print the raw output
print("Raw Output from the Transformer Model:", top_indices)



19283
tensor([[77, 62, 53],
        [77, 62, 53],
        [69, 62, 53],
        [69,  0,  0]])
Raw Output from the Transformer Model: [72 62]


In [36]:
num_windows = 4
window_size = 4  # The number of samples in each window
val_dataset_size = len(val_dataset)

for _ in range(num_windows):
    # Randomly select a start index, ensuring the window doesn't exceed dataset bounds
    start_idx = np.random.randint(0, val_dataset_size - window_size + 1)

    print(f"\nWindow starting from sample {start_idx + 1}:")

    for j in range(window_size):
        # Get the sample
        sample_idx = start_idx + j
        sample_input, true_output = val_dataset[sample_idx]['sequence'], val_dataset[sample_idx]['label']

        # Move input to the same device as the model
        sample_input = sample_input.unsqueeze(0).to(device)  # Add a batch dimension

        # Make a prediction
        with torch.no_grad():
            output = model(sample_input, None)  # Assuming no mask for simplicity
            probabilities = torch.softmax(output, dim=-1)
            top_probabilities, top_indices = torch.topk(probabilities, 2, dim=-1)
            top_indices = top_indices.cpu().numpy().flatten()  # Move back to CPU and flatten

        print(f"Sample {sample_idx + 1}:")
        print(sample_input)
        print("Predicted Notes/Chords:", top_indices)

    # Add a separator for readability
    if _ < num_windows - 1:
        print("\n" + "-"*50)


Window starting from sample 27615:
Sample 27615:
tensor([[[76, 70, 60],
         [77, 70, 60],
         [76, 70, 62],
         [57,  0,  0]]])
Predicted Notes/Chords: [72 62]
Sample 27616:
tensor([[[77, 70, 60],
         [76, 70, 62],
         [79, 72, 57],
         [79,  0,  0]]])
Predicted Notes/Chords: [72 62]
Sample 27617:
tensor([[[76, 70, 62],
         [79, 72, 57],
         [79, 72, 60],
         [72,  0,  0]]])
Predicted Notes/Chords: [72 62]
Sample 27618:
tensor([[[79, 72, 57],
         [79, 72, 60],
         [78, 72, 62],
         [78,  0,  0]]])
Predicted Notes/Chords: [72 62]

--------------------------------------------------

Window starting from sample 19886:
Sample 19886:
tensor([[[77, 69, 55],
         [77, 69, 62],
         [77, 69, 62],
         [62,  0,  0]]])
Predicted Notes/Chords: [72 62]
Sample 19887:
tensor([[[77, 69, 62],
         [77, 69, 62],
         [69, 62, 57],
         [62,  0,  0]]])
Predicted Notes/Chords: [72 62]
Sample 19888:
tensor([[[77, 69, 62],