<a href="https://colab.research.google.com/github/the-crHack/email/blob/main/HW4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt install unzip
!pip install py_midicsv==4.1.2
!pip install midi_player==0.5.1
!unzip /content/sample_data/train-20241205T181153Z-001.zip -d /content/sample_data

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random
import glob
import numpy as np
import torch.nn.functional as F

In [9]:

class SongsDataset():
    def __init__(self, files, context_window=64, stride=4, max_samples_per_song=100):
        self.data = []  # List to store input sequences (X)
        self.labels = []  # List to store corresponding labels (Y)

        # Iterate over each song file
        for file in files:
            # Load the song data (assuming it's stored as a tensor)
            song_data = torch.load(file)  # song_data should be a tensor with shape (num_events, 4)

            # Create indices for sliding window
            indices = range(0, len(song_data) - context_window, stride)
            sampled_indices = random.sample(list(indices), min(max_samples_per_song, len(indices)))

            # Extract data slices and labels
            for i in sampled_indices:
                # Input sequence: slice of notes with size `context_window`
                self.data.append(song_data[i:i + context_window])  # Shape: (context_window, 4)
                # Label: the next note after the context window
                self.labels.append(song_data[i + context_window])  # Shape: (1, 4)

    def __len__(self):
        # Return the number of samples in the dataset
        return len(self.data)

    def __getitem__(self, idx):
        # Return the input (X) and label (Y) for the given index
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)


# --- Model Definition ---

class NotePredictionModel(nn.Module):
    def __init__(self):
        super(NotePredictionModel, self).__init__()

        # Define layers: A simple 2-layer LSTM followed by a fully connected layer
        self.lstm = nn.LSTM(input_size=4, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128, 134)  # We predict 134 values: [µ_t, σ_t, µ_d, σ_d, log(π0), ..., log(π127), µ_v, σ_v]

    def forward(self, x):
      # LSTM layer
      lstm_out, (h_n, c_n) = self.lstm(x)

      # Take the last LSTM output
      last_output = lstm_out[:, -1, :]  # (batch_size, hidden_size)

      # Fully connected layer to predict the required values
      output = self.fc(last_output)  # Shape: (batch_size, 134)

      # Split the output into the predicted values
      # Expecting 134 values for [µ_t, σ_t, µ_d, σ_d, log(π0), ..., log(π127), µ_v, σ_v]
      µ_t, σ_t, µ_d, σ_d, logits_n, µ_v, σ_v = output.split([1, 1, 1, 1, 128, 1, 1], dim=-1)


      # Return the predicted values as a tensor of shape (batch_size, 134)
      return torch.cat((µ_t, σ_t, µ_d, σ_d, logits_n, µ_v, σ_v), dim=-1)


def nll_loss_continuous(pred_mu, pred_sigma, target):
    # Avoid division by zero and log(0) errors
    epsilon = 1e-6
    pred_sigma = torch.max(pred_sigma, torch.tensor(epsilon))  # Prevent log(0)

    # Calculate the NLL for normal distribution
    loss = 0.5 * torch.log(2 * torch.tensor(torch.pi)) + torch.log(pred_sigma) + (target - pred_mu) ** 2 / (2 * pred_sigma ** 2)
    return loss.mean()


def train_model(model, data_loader, epochs, learning_rate):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for step, (context, target) in enumerate(data_loader):

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            output = model(context)

            # Split the output into predicted values (this should be 134 values)
            µ_t_pred, σ_t_pred, µ_d_pred, σ_d_pred, logits_n_pred, µ_v_pred, σ_v_pred = output.split([1, 1, 1, 1, 128, 1, 1], dim=-1)
            logits_n_pred = F.softmax(logits_n_pred, dim=-1)

            # Calculate the loss for continuous distributions using NLL
            t_loss = nll_loss_continuous(µ_t_pred, σ_t_pred, target[:, 0])
            d_loss = nll_loss_continuous(µ_d_pred, σ_d_pred, target[:, 1])
            v_loss = nll_loss_continuous(µ_v_pred, σ_v_pred, target[:, 2])

            # Calculate the categorical cross entropy for the note value logits
            nll_loss = nn.CrossEntropyLoss()(logits_n_pred, target[:, 3].long())

            # Total loss
            loss = t_loss + d_loss + v_loss + nll_loss

            # Print loss every 100 steps
            if step % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Step {step}/{len(data_loader)}, Loss: {loss.item()}")


            # Backward pass
            loss.backward()

            # Optimize the model
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(data_loader)}")


# Example of file list with paths (you need to replace these with actual paths)
train_files = glob.glob("/content/sample_data/train/*/*/*/*.pt")  # Replace with your actual data files

# Initialize the dataset and dataloader
context_window = 64  # Size of the context window
stride = 4  # Step size
max_samples_per_song = 300  # Max samples per song

dataset = SongsDataset(train_files, context_window=context_window, stride=stride, max_samples_per_song=max_samples_per_song)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# --- Model Training ---

# Initialize the model
model = NotePredictionModel()

# Train the model
train_model(model, data_loader, epochs=10, learning_rate=0.01)


  song_data = torch.load(file)  # song_data should be a tensor with shape (num_events, 4)
  return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)


Epoch 1/10, Step 0/3137, Loss: 2.734126788509696e+16
Epoch 1/10, Step 100/3137, Loss: 26808.259765625
Epoch 1/10, Step 200/3137, Loss: 32950.75390625
Epoch 1/10, Step 300/3137, Loss: 21294.27734375
Epoch 1/10, Step 400/3137, Loss: 11856.4541015625
Epoch 1/10, Step 500/3137, Loss: 12486.376953125
Epoch 1/10, Step 600/3137, Loss: 3511.6005859375
Epoch 1/10, Step 700/3137, Loss: 4153.23876953125
Epoch 1/10, Step 800/3137, Loss: 1770.8980712890625
Epoch 1/10, Step 900/3137, Loss: 2149.334716796875
Epoch 1/10, Step 1000/3137, Loss: 2367.224609375
Epoch 1/10, Step 1100/3137, Loss: 1040.6025390625
Epoch 1/10, Step 1200/3137, Loss: 848.9593505859375
Epoch 1/10, Step 1300/3137, Loss: 1823.048828125
Epoch 1/10, Step 1400/3137, Loss: 838.4359130859375
Epoch 1/10, Step 1500/3137, Loss: 1144.5040283203125
Epoch 1/10, Step 1600/3137, Loss: 416.8633728027344
Epoch 1/10, Step 1700/3137, Loss: 498.9329833984375
Epoch 1/10, Step 1800/3137, Loss: 2997.6513671875
Epoch 1/10, Step 1900/3137, Loss: 1015.388

In [None]:
!apt install unzip
!unzip /content/sample_data/train-20241205T181153Z-001.zip -d /content/sample_data

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import random
import glob
import numpy as np
import matplotlib.pyplot as plt

# --- Dataset Definition ---

class SongsDataset(Dataset):
    def __init__(self, files, context_window=64, stride=4, max_samples_per_song=250):
        self.data = []  # List to store input sequences (X)
        self.labels = []  # List to store corresponding labels (Y)

        # Iterate over each song file
        for file in files:
            # Load the song data (assuming it's stored as a tensor)
            song_data = torch.load(file)  # song_data should be a tensor with shape (num_events, 4)

            # Create indices for sliding window with dynamic stride and sampling
            indices = range(0, len(song_data) - context_window, stride)

            # Sample a few indices for training (to avoid memory overload)
            sampled_indices = random.sample(list(indices), min(max_samples_per_song, len(indices)))

            # Extract data slices and labels
            for i in sampled_indices:
                # Input sequence: slice of notes with size `context_window`
                self.data.append(song_data[i:i + context_window])  # Shape: (context_window, 4)
                # Label: the next note after the context window
                self.labels.append(song_data[i + context_window])  # Shape: (1, 4)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)



class NotePredictionModel(nn.Module):
    def __init__(self):
        super(NotePredictionModel, self).__init__()

        # Define layers: A simple 2-layer LSTM followed by a fully connected layer
        self.lstm = nn.LSTM(input_size=4, hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128, 134)  # We predict 134 values: [µ_t, σ_t, µ_d, σ_d, log(π0), ..., log(π127), µ_v, σ_v]

    def forward(self, x):
      # LSTM layer
      lstm_out, (h_n, c_n) = self.lstm(x)

      # Take the last LSTM output
      last_output = lstm_out[:, -1, :]  # (batch_size, hidden_size)

      # Fully connected layer to predict the required values
      output = self.fc(last_output)  # Shape: (batch_size, 134)

      # Split the output into the predicted values
      # Expecting 134 values for [µ_t, σ_t, µ_d, σ_d, log(π0), ..., log(π127), µ_v, σ_v]
      µ_t, σ_t, µ_d, σ_d, logits_n, µ_v, σ_v = output.split([1, 1, 1, 1, 128, 1, 1], dim=-1)


      # Return the predicted values as a tensor of shape (batch_size, 134)
      return torch.cat((µ_t, σ_t, µ_d, σ_d, logits_n, µ_v, σ_v), dim=-1)


# --- Loss Function ---

def nll_loss_continuous(pred_mu, pred_sigma, target):
    epsilon = 1e-6
    pred_sigma = torch.max(pred_sigma, torch.tensor(epsilon))  # Prevent log(0)

    loss = 0.5 * torch.log(2 * torch.tensor(torch.pi)) + torch.log(pred_sigma) + (target - pred_mu) ** 2 / (2 * pred_sigma ** 2)
    return loss.mean()


# --- Training Function ---

def train_model(model, train_loader, val_loader, epochs, learning_rate, scheduler=None):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    training_losses = []
    validation_losses = []

    for epoch in range(12):
        model.train()
        running_loss = 0.0

        for step, (context, target) in enumerate(train_loader):
            optimizer.zero_grad()

            # Forward pass
            output = model(context)

            # Split the output into predicted values
            µ_t_pred, σ_t_pred, µ_d_pred, σ_d_pred, logits_n_pred, µ_v_pred, σ_v_pred = output.split([1, 1, 1, 1, 128, 1, 1], dim=-1)
            logits_n_pred = F.softmax(logits_n_pred, dim=-1)

            # Calculate the loss for continuous distributions using NLL
            t_loss = nll_loss_continuous(µ_t_pred, σ_t_pred, target[:, 0])
            d_loss = nll_loss_continuous(µ_d_pred, σ_d_pred, target[:, 1])
            v_loss = nll_loss_continuous(µ_v_pred, σ_v_pred, target[:, 2])

            # Calculate the categorical cross entropy for the note value logits
            nll_loss = nn.CrossEntropyLoss()(logits_n_pred, target[:, 3].long())

            # Total loss
            loss = t_loss + d_loss + v_loss + nll_loss

            if step % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Step {step}/{len(train_loader)}, Loss: {loss.item()}")


            # Backward pass
            loss.backward()

            # Optimize the model
            optimizer.step()

            running_loss += loss.item()

        # Log training loss
        training_losses.append(running_loss / len(train_loader))

        # Validate after every epoch
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for context, target in val_loader:
                output = model(context)
                µ_t_pred, σ_t_pred, µ_d_pred, σ_d_pred, logits_n_pred, µ_v_pred, σ_v_pred = output.split([1, 1, 1, 1, 128, 1, 1], dim=-1)
                logits_n_pred = F.softmax(logits_n_pred, dim=-1)

                # Calculate the loss for validation
                t_loss = nll_loss_continuous(µ_t_pred, σ_t_pred, target[:, 0])
                d_loss = nll_loss_continuous(µ_d_pred, σ_d_pred, target[:, 1])
                v_loss = nll_loss_continuous(µ_v_pred, σ_v_pred, target[:, 2])

                nll_loss = nn.CrossEntropyLoss()(logits_n_pred, target[:, 3].long())
                val_loss += t_loss + d_loss + v_loss + nll_loss

        validation_losses.append(val_loss / len(val_loader))

        print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {running_loss / len(train_loader)}, Validation Loss: {val_loss / len(val_loader)}")

    return training_losses, validation_losses


# --- Dataset Preparation ---

# Example of file list with paths (you need to replace these with actual paths)
train_files = glob.glob("/content/sample_data/train/*/*/*/*.pt")  # Replace with your actual data files

# Initialize the dataset
context_window = 64  # Size of the context window
max_samples_per_song = 250  # Max samples per song
stride = 4

dataset = SongsDataset(train_files, context_window=context_window, stride = stride, max_samples_per_song=max_samples_per_song)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = len(dataset) - train_size  # 20% for validation
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# DataLoader for training and validation
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# --- Model Initialization and Training ---

# Initialize the model
model = NotePredictionModel()  # Adding dropout for regularization

# Train the model
training_losses, validation_losses = train_model(model, train_loader, val_loader, epochs=12, learning_rate=0.01)

# --- Plotting the Training and Validation Loss ---

plt.plot(training_losses, label='Training Loss')
plt.plot(validation_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss over Epochs')
plt.show()


  song_data = torch.load(file)  # song_data should be a tensor with shape (num_events, 4)
  return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)


Epoch 1/12, Step 0/2391, Loss: 4.860358938329088e+16
Epoch 1/12, Step 100/2391, Loss: 2385.017333984375
Epoch 1/12, Step 200/2391, Loss: 2411.20654296875
Epoch 1/12, Step 300/2391, Loss: 1727.6552734375
Epoch 1/12, Step 400/2391, Loss: 1961.35498046875
Epoch 1/12, Step 500/2391, Loss: 2580.5673828125
Epoch 1/12, Step 600/2391, Loss: 1045.5284423828125
Epoch 1/12, Step 700/2391, Loss: 1213.0501708984375
Epoch 1/12, Step 800/2391, Loss: 1309.9677734375
Epoch 1/12, Step 900/2391, Loss: 1297.3577880859375
Epoch 1/12, Step 1000/2391, Loss: 1541.7164306640625
Epoch 1/12, Step 1100/2391, Loss: 709.4432373046875
Epoch 1/12, Step 1200/2391, Loss: 1057.7952880859375
Epoch 1/12, Step 1300/2391, Loss: 837.0156860351562
Epoch 1/12, Step 1400/2391, Loss: 883.5498046875
Epoch 1/12, Step 1500/2391, Loss: 1008.971435546875
Epoch 1/12, Step 1600/2391, Loss: 757.1150512695312
Epoch 1/12, Step 1700/2391, Loss: 423.9794921875
Epoch 1/12, Step 1800/2391, Loss: 465.5028381347656
Epoch 1/12, Step 1900/2391, L

In [10]:
# --- Predicting on Test Data ---

def predict_on_test_data(model, test_file, context_window=64):
    # Load the test data (Xte)
    Xte = torch.load(test_file)  # Xte shape: (M, C, 4), where M is the number of test instances, C is context window size

    model.eval()  # Set the model to evaluation mode
    predictions = []

    with torch.no_grad():  # No need to track gradients for inference
        for i in range(Xte.shape[0]):  # Loop through each test instance
            test_instance = Xte[i:i + 1, :, :]  # Shape: (1, C, 4)
            output = model(test_instance.float())  # Predict distribution parameters (shape: (1, 134))

            predictions.append(output.squeeze(0))  # Remove the batch dimension (1, 134) -> (134)

    # Convert list of predictions to a tensor of shape (M, 134)
    predictions_tensor = torch.stack(predictions)  # Shape: (M, 134)

    # Save the predictions to a file
    torch.save(predictions_tensor, "note_predictions.pt")
    print("Predictions saved to 'note_predictions.pt'.")

# Example usage:
test_file = "/content/sample_data/test.pt"  # Path to your test data file
predict_on_test_data(model, test_file)


  Xte = torch.load(test_file)  # Xte shape: (M, C, 4), where M is the number of test instances, C is context window size


Predictions saved to 'note_predictions.pt'.


4) To ensure an adequate and diverse dataset, we extracted multiple overlapping context windows from each song with the following considerations:

I used Multiple Instances Per Song to better capture the variations in musical patterns. This approach avoided under-utilizing the available data. I used a stride of 4 to avoid overlapping windows, increasing the effective dataset size.I limited max_samples_per_song to ensure that no single song dominated the dataset due to repeated patterns.
Random Sampling: Random sampling of context windows ensured diversity in the extracted data, making the dataset more robust.

5a) Input: (Batch, Context_Window, 4)
        |
    LSTM (Input=4, Hidden=128, Layers=2)
        |
    Fully Connected Layer (Input=128, Output=134)
        |
    Outputs: [μ_t, σ_t, μ_d, σ_d, log(π0)...log(π127), μ_v, σ_v]





5b) Design Choices:

**LSTM Layers:** We used a 2-layer LSTM to capture temporal dependencies in the sequence of notes. LSTM is ideal for handling sequential data such as music.

**Fully Connected Layer:** The FC layer outputs the parameters of the probability distributions, ensuring alignment with the task’s requirements.

**Hyperparameters**: We used a hidden size of 128 to balance model capacity and computational efficiency.

6)

 **Partitioning:**

Training Set: 80% of the data.
Validation Set: 20% of the data, stratified to ensure diversity.

**Loss Function:**

Continuous Attributes (t, d, v): Negative Log-Likelihood (NLL) loss for normal distributions.
Categorical Attribute (n): Cross-entropy loss.

**Optimizer:**

Adam Optimizer: Used for its adaptive learning rates.
Learning Rate: Set to 0.01 based on initial experiments.

**Hyperparameters:**

**Batch Size**: 32
**Epochs** : 10

**Regularization**: L2 weight decay was considered but found unnecessary due to no overfitting in initial runs.

No data augmentation was used since music patterns are inherently structured.
Implementation:
