# Music Generation using RNN

#### Training a RNN for music generation using PyTorch. ABC notations are being used for this training, since they are in text format, they are suitable to use to train a neural network

### 1.1 Dependencies

In [1]:
# COMET ML to track model experiments
import comet_ml

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim

# Remaining imports
import numpy as np
import os
import time
import functools
from IPython.display import display
from tqdm import tqdm
from utils import *
from dotenv import load_dotenv

### 1.2 COMET API KEY

In [None]:
load_dotenv()
comet_api_key = os.getenv('COMET_API_KEY')

# Validation
assert comet_api_key is not None, "Please set COMET_API_KEY in .env file"
# TODO: Need to setup CUDA
assert torch.cuda.is_available(), "CUDA not available"

### 1.3 Dataset

In [None]:
# Download the dataset
s = load_data()

# Print one of the samples
example = s[0]
print("Example of a sample:")
print(example)

play_song(example)

In [None]:
# Join all the lyrics, sort and de-duplicate
all_lyrics = "\n\n".join(s)

# Unique characters
vocab = sorted(set(all_lyrics))

print(f"Unique characters in the dataset: {len(vocab)}")


#### 1.3.1 Vectorize text

In [5]:
# Simple function to convert characters to indices
char_to_index = {c: i for i, c in enumerate(vocab)}

# Simple function to convert indices to characters
index_to_char = {i: c for i, c in enumerate(vocab)}
idx2char = np.array(vocab)

#print(f"index_to_char: {len(index_to_char)}")
#print(f"idx2char: {len(idx2char)}")

In [None]:
print('{')
for char, _ in zip(char_to_index, range(10)):
    print(f'  {repr(char)}: {char_to_index[char]}')
print('  ...\n}')

In [7]:
### Vectorize the songs string ###

'''TODO: Write a function to convert the all songs string to a vectorized
    (i.e., numeric) representation. Use the appropriate mapping
    above to convert from vocab characters to the corresponding indices.

  NOTE: the output of the `vectorize_string` function
  should be a np.array with `N` elements, where `N` is
  the number of characters in the input string
'''
def vectorize_string(string):
    return np.array([char_to_index[c] for c in string])
  

vectorized_songs = vectorize_string(all_lyrics)
# print(vectorized_songs[:100])
assert isinstance(vectorized_songs, np.ndarray), "returned result should be a np array"

In [None]:
print (f'{repr(all_lyrics[:10])} ---- characters mapped to int ----> {vectorized_songs[:10]}')

In [None]:
vectorized_songs.shape

#### 1.3.2 Function to create batches of data based on sequence length

In [None]:
### Batch definition to create training examples ###

def get_batch(vectorized_songs, seq_length, batch_size):
  # the length of the vectorized songs string
  n = vectorized_songs.shape[0] - 1
  # randomly choose the starting indices for the examples in the training batch
  idx = np.random.choice(n-seq_length, batch_size)

  # TODO: construct a list of input sequences for the training batch
  input_batch = [vectorized_songs[i : i+seq_length] for i in idx]

  # TODO: construct a list of output sequences for the training batch
  output_batch = [vectorized_songs[i+1 : i+seq_length+1] for i in idx]

  # x_batch, y_batch provide the true inputs and targets for network training
  # Creating a tensor from a list of numpy.ndarrays is extremely slow. 
  # Converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor.
  #x_batch = np.array(input_batch)
  #y_batch = np.array(output_batch)
  x_batch = torch.tensor(np.array(input_batch), dtype=torch.long)
  y_batch = torch.tensor(np.array(output_batch), dtype=torch.long)

  return x_batch, y_batch

# Simple tests
args = (vectorized_songs, 10, 2)
x_batch, y_batch = get_batch(*args)
print("Input: ", x_batch)
print("Target: ", y_batch)

assert x_batch.shape == (2, 10), "Incorrect batch shape"
assert y_batch.shape == (2, 10), "Incorrect batch shape"
print("Passed!")

In [None]:
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)

for i, (input_idx, target_idx) in enumerate(zip(x_batch[0], y_batch[0])):
    print(f"Step {i}")
    print(f"   input: {input_idx} ({repr(idx2char[input_idx.item()])})")
    print(f"   expected output: {target_idx} ({repr(idx2char[target_idx.item()])})")

### 1.4 RNN model

#### 1.4.1 Define the model

In [12]:
#### Defining the RNN ####

class LSTMModel(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_size):
    super(LSTMModel, self).__init__()
    self.hidden_size = hidden_size

    # embedding layer
    # Layer 1: Embedding layer to transform indices into dense vectors
    # of fixed size (embedding_dim)
    self.embedding = nn.Embedding(vocab_size, embedding_dim)

    # LSTM layer
    # Layer 2: LSTM with `hidden_size` number of hidden units
    self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)

    # Layer 3: Linear layer (fully connected layer) that maps the LSTM layer's output
    # to the number of characters we have in our vocabulary
    self.linear = nn.Linear(hidden_size, vocab_size)

  def init_hidden(self, batch_size, device):
      # Initialize hidden and cell states
      return (torch.zeros(1, batch_size, self.hidden_size).to(device),
              torch.zeros(1, batch_size, self.hidden_size).to(device))
    
  def forward(self, x, state=None, return_state=False):
      
      x = self.embedding(x)
      if state is None:
        state = self.init_hidden(x.size(0), x.device)
      out, state = self.lstm(x, state)
      out = self.linear(out)
      return out if not return_state else (out, state)

#### 1.4.2 Instantiate model

In [None]:
## Instantiate the model with some hyperparameters

vocab_size = len(vocab)
embedding_dim = 256
hidden_size = 1024
batch_size = 8

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LSTMModel(vocab_size, embedding_dim, hidden_size).to(device)

# Model summary
print(model)

### 1.4.3 Test the model

In [None]:
## Test model with sample data

x, y = get_batch(vectorized_songs, seq_length=100, batch_size=16)
x = x.to(device)
y = y.to(device)

pred = model(x)
print(f"Input shape: {x.shape} # (batch_size, seq_length)")
print(f"Prediction shape: {pred.shape} # (batch_size, seq_length, vocab_size)")
print(f'Pred data type: {pred.dtype}')

In [None]:
sampled_indices = torch.multinomial(torch.softmax(pred[0], dim=1), num_samples=1)
sampled_indices = sampled_indices.squeeze(-1).cpu().numpy()
sampled_indices

In [None]:
print(f"Input \n {repr(''.join(idx2char[x[0].cpu()]))}")
print()
print(f"Next char predictions \n {repr(''.join(idx2char[sampled_indices]))}")

#### 1.4.4 Training the model

In [31]:
## Loss function

cross_entropy_loss = nn.CrossEntropyLoss()

def compute_loss(label, logits):
  """
  Inputs:
    label: target tensor (batch_size, seq_len)
    logits: model's prediction (batch_size, seq_len, vocab_size)

  Outputs:
    loss: scalar value
  """
  
  # Batch the labels so that the shape of the labels should be (batch_size * seq_len)
  batched_labels = label.view(-1)
  #blabel = torch.tensor(batched_labels, dtype=torch.long)

  # Batch the logits so that the shape of the logits should be (batch_size * seq_len, vocab_size)
  batched_logits = logits.view(-1, logits.size(-1))

  # Compute the loss
  loss = cross_entropy_loss(batched_logits, batched_labels)

  return loss


In [None]:
## Compute loss on the prediction from the untrained model

y.shape
pred.shape

example_batch_loss = compute_loss(y, pred)

print(f'Prediction shape: {pred.shape} # (batch size, seq_len, vocab_size)')
print(f'Scalar loss: {example_batch_loss.mean().item()}')

#### 1.4.5 Define Hyperparameters

In [24]:
## Hyperparameter setting and optimization

vocab_size = len(vocab)

# Model params
params = dict(
  num_training_iters = 3000,   # Increase to train longer
  batch_size = 8,    # Experiment between 1 and 64
  seq_length = 100,   # Experiment between 50 and 100
  learning_rate = 5e-3,  # Experiment between 1e-5 and 1e-1
  embedding_dim = 256,
  hidden_size = 1024    # Experiment between 1 and 2048
)

# Checkpoint location
checkpoint_dir = 'training_checkpoint'
os.makedirs(checkpoint_dir, exist_ok=True)
chk_point_prefix = os.path.join(checkpoint_dir, "chk_point")
final_model = os.path.join(checkpoint_dir, "final_model")

### 1.5 Experiment Tracking with Comet

In [20]:
## Create function to track experiments

def create_experiment():
  # stop prev experiments
  if 'experiment' in locals():
    experiment.end()

  # Inititate the comet experiment for tracking
  experiment = comet_ml.Experiment(
    api_key=comet_api_key,
    project_name='py-music_'
  )
  # Log params (defined above) to the experiment
  for param, value in params.items():
    experiment.log_parameter(param, value)
  experiment.flush()

  return experiment


In [21]:
## Define optimizer and training operation

model = LSTMModel(
  vocab_size=vocab_size, 
  embedding_dim=params['embedding_dim'], 
  hidden_size=params['hidden_size']
  )

In [30]:
# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'])

def train_step(x, y):
  # set model to train mode
  model.train()

  # Zero gradients for every step
  optimizer.zero_grad()

  # Forward pass
  y_hat = model(x)

  # Compute loss
  loss = compute_loss(y, y_hat)

  # Backward pass
  loss.backward()
  optimizer.step()

  return loss

### 2. Training the model

In [None]:
history = []

plotter = Plotter(sec=2, xlabel='Iterations', ylabel='Loss')
experiment = create_experiment()

if hasattr(tqdm, '_instances'): tqdm._instances.clear
for iter in tqdm(range(params["num_training_iters"])):

  # Grab a batch and propogate through the network
  x_batch, y_batch = get_batch(vectorized_songs, params['seq_length'], params['batch_size'])

  # Convert numpy arrays in to PyTorch tensors
  x_batch = torch.tensor(np.array(x_batch), dtype=torch.long)
  y_batch = torch.tensor(np.array(y_batch), dtype=torch.long)

  # Take a train step
  loss = train_step(x_batch, y_batch)

  # Log loss to the Comet interface
  experiment.log_metric("loss", loss.item(), step=iter)

  # Update the progress bar and visualize in notebook
  history.append(loss.item())
  plotter.plot(history)

  # Save the model
  if iter % 100 == 0:
    torch.save(model.state_dict(), chk_point_prefix)

# Save the final trained model
torch.save(model.state_dict(), final_model)
experiment.flush()
experiment.end()