In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
print(torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

True


In [3]:
class PreContextRewardModel(nn.Module):
  def __init__(self, input_size, hidden_size, output_size, num_layers=1):
    super(PreContextRewardModel, self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers

    # Embedding layers for human and machine pre-context
    self.embedding_human = nn.Embedding(input_size, hidden_size)
    self.embedding_machine = nn.Embedding(input_size, hidden_size)

    # GRU layer
    self.gru = nn.GRU(hidden_size * 2, hidden_size, num_layers, batch_first=True)

    # Output layer
    self.fc = nn.Linear(hidden_size, output_size)
    # self.softmax = nn.Softmax(dim=1)

  def forward(self, x_human, x_machine, hidden):
    # Embed the input tokens for both human and machine part
    x_human = self.embedding_human(x_human)
    x_machine = self.embedding_machine(x_machine)

    # Concatenate the embeddings
    x = torch.cat((x_human, x_machine), dim=2)

    # Pass through GRU
    out, hidden = self.gru(x, hidden)

    # Take the output of the last time step
    out = out[:, -1, :]

    # Pass through fully connected layer
    out = self.fc(out)

    # Apply softmax to get probabilities
    # out = self.softmax(out)

    return out, hidden

  def init_hidden(self, batch_size):
    return torch.zeros(self.num_layers, batch_size, self.hidden_size, device=device)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import json, os

In [6]:
dataset_path = "/content/drive/MyDrive/ECE570/data/"

In [7]:
def create_duet_pairs(parts):
  """Create duet pairs from the four parts."""
  duet_pairs = [
    (parts["soprano"], parts["alto"]),
    (parts["soprano"], parts["tenor"]),
    (parts["soprano"], parts["bass"]),
    (parts["alto"], parts["tenor"]),
    (parts["alto"], parts["bass"]),
    (parts["tenor"], parts["bass"]),
  ]
  return duet_pairs

In [8]:
def tokenize_part(part):
  """Convert a part into a sequence of tokenized notes."""
  tokens = []
  for n in part:
    if n == "hold":
      tokens.append(128)
    else:
      tokens.append(n)
  return tokens

In [9]:
duet_data = []
for filename in os.listdir(dataset_path):
  if filename.endswith(".json"):  # Check if the file is a JSON file
    filepath = os.path.join(dataset_path, filename)
    with open(filepath, "r", encoding="utf-8") as file:
      chorale_data = json.load(file)
      duet_pairs = create_duet_pairs(chorale_data)
      for human_part, machine_part in duet_pairs:
        human_tokens = tokenize_part(human_part)
        machine_tokens = tokenize_part(machine_part)
        duet_data.append((human_tokens, machine_tokens))

In [10]:
def prepare_training_data(duet_data, window_size=16):
  """Prepare training data for the reward model."""
  inputs_human = []
  inputs_machine = []
  targets = []
  for human_tokens, machine_tokens in duet_data:
    # Ensure both parts have the same length
    min_length = min(len(human_tokens), len(machine_tokens))
    human_tokens = human_tokens[:min_length]
    machine_tokens = machine_tokens[:min_length]
    # Create input-target pairs
    for i in range(window_size, min_length):
      # input_seq = [human_tokens[i-window_size:i], machine_tokens[i-window_size:i]]
      inputs_human.append(human_tokens[i-window_size:i])
      inputs_machine.append(machine_tokens[i-window_size:i])
      targets.append(machine_tokens[i])

  return inputs_human, inputs_machine, targets

In [11]:
from sklearn.model_selection import train_test_split

# Prepare dataset
inputs_human, inputs_machine, targets = prepare_training_data(duet_data)

# Split into train and test sets (e.g., 80% train, 20% test)
# train_inputs, test_inputs, train_targets, test_targets = train_test_split(
#     inputs, targets, test_size=0.2, random_state=42
# )
(
    train_inputs_human, test_inputs_human,
    train_inputs_machine, test_inputs_machine,
    train_targets, test_targets
) = train_test_split(inputs_human, inputs_machine, targets, test_size=0.2, random_state=42)


In [18]:
# Hyperparameters
input_size = 129  # Number of unique tokens (MIDI pitches + rest)
hidden_size = 256
output_size = 129  # Same as input_size
num_layers = 2
learning_rate = 0.0005
num_epochs = 10
batch_size = 64

In [13]:
# Initialize model, loss, and optimizer
model = PreContextRewardModel(input_size, hidden_size, output_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [14]:
train_inputs_human = torch.tensor(train_inputs_human, dtype=torch.long).to(device)
train_inputs_machine = torch.tensor(train_inputs_machine, dtype=torch.long).to(device)
train_targets = torch.tensor(train_targets, dtype=torch.long).to(device)

In [19]:
# Training loop
for epoch in range(num_epochs):
  model.train()

  for i in range(0, len(train_inputs_human), batch_size):
    # Get batch
    batch_inputs_human = train_inputs_human[i:i+batch_size]
    batch_inputs_machine = train_inputs_machine[i:i+batch_size]
    batch_targets = train_targets[i:i+batch_size]

    # Dynamically initialize hidden state
    hidden = model.init_hidden(len(batch_inputs_human))

    # Forward pass
    hidden = hidden.detach()  # Detach hidden state to avoid backprop through time
    train_outputs, hidden = model(batch_inputs_human, batch_inputs_machine, hidden)

    # Compute loss
    loss = criterion(train_outputs, batch_targets)
    # Backward pass and optimize
    optimizer.zero_grad()
    loss.backward()

    # Apply gradient clipping (since GRU gradients can explode)
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

    optimizer.step()

  print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")


Epoch [1/10], Loss: 0.6050
Epoch [2/10], Loss: 0.5764
Epoch [3/10], Loss: 0.4232
Epoch [4/10], Loss: 0.4290
Epoch [5/10], Loss: 0.5742
Epoch [6/10], Loss: 0.4625
Epoch [7/10], Loss: 0.3621
Epoch [8/10], Loss: 0.5798
Epoch [9/10], Loss: 0.3163
Epoch [10/10], Loss: 0.2485


In [20]:
model_path = "/content/drive/MyDrive/ECE570/models/pre_context_reward_model.pth"
torch.save(model.state_dict(), model_path)

In [21]:
test_inputs_human = torch.tensor(test_inputs_human, dtype=torch.long).to(device)
test_inputs_machine = torch.tensor(test_inputs_machine, dtype=torch.long).to(device)
test_targets = torch.tensor(test_targets, dtype=torch.long).to(device)

In [22]:
def evaluate_model(model, test_inputs_human, test_inputs_machine, test_targets):
    model.eval()  # Set to evaluation mode
    with torch.no_grad():  # Disable gradient computation
        hidden = model.init_hidden(len(test_inputs_human))
        test_outputs, _ = model(test_inputs_human, test_inputs_machine, hidden)

    # Compute loss
    loss = criterion(test_outputs, test_targets)
    print(f"Test Loss: {loss.item()}")

    # Convert outputs to probabilities
    probs = torch.softmax(test_outputs, dim=1)
    predicted = torch.argmax(probs, dim=1)

    # Compute accuracy
    accuracy = (predicted == test_targets).float().mean().item()
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Run evaluation
evaluate_model(model, test_inputs_human, test_inputs_machine, test_targets)


Test Loss: 0.6871500015258789
Test Accuracy: 79.07%
