<h1> Chord Accompaniment using RL - Agent Training </h1>

\
**POC:** Nina Rauscher (nr2861@columbia.edu)

\
In this notebook, we will **train an RL agent** to **create a chord accompaniment for a given melody**. 

Previously, we built a chord transition matrix based on a subset of Beethoven sonatas (saved in the *transition_matrix.csv* file) to serve as empirical reward for our RL problem.

\
**Our main steps are:**
1. Create the environment using Open AI Gym
2. Define a PPO class corresponding to the selected algorithm
3. Prepare for training by building the appropriate wrappers to visualize the rewards on Weights and Biases
4. Train the PPO model
5. Evaluate the performance of our model vs a baseline model (random actions)

<h2> Necessary imports </h2>

In [None]:
# Operational libraries
import numpy as np
import pandas as pd

In [None]:
# Open AI Gym - useful to create the environment and step function
import gym

In [None]:
# PyTorch to design the PPO algorithm
import torch
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.optim import Adam

In [None]:
# Model performance visualization libraries
!pip install wandb -qqq
import os
import wandb
os.environ["WANDB__SERVICE_WAIT"] = "300" # Wait 300s before timeout

# Login to weights and biases and initialization of the session (run)
wandb.login()
run=wandb.init()

<h2> Step 1: Environment creation with Open AI Gym </h2>

We consider a setting of discrete states and actions:
* Each **state** is represented by a **tuple (Chord, Melody note)**
* The agent's **action** is to **select the next chord**. 

As we restricted the potential chords to major, minor, dominant 7th, or minor 7th, there are **576 states and 48 discrete actions**, which remains reasonable in terms of calculations.

<img src="Chord%20accompaniment%20loop%20schema.png" alt="Chord accompaniment loop schema" style="width: 60%;">

When it comes to the most important part of the environment, which is the reward structure, we decided to consider an **empirical reward** that reflects how good the action is based on **frequencies of chord transitions** we’ve collected from musical datasets, and a **legal reward** that captures the consistency of the action with respect to **music theory rules**.

$$
r^{\text{emp}}_t 
=
\left\{
\begin{align}
  &\text{If $\hat{\mathbb{P}}$($C_t$, $M_t$) > 0,} \quad \text{MaxiReward}*\text{$\hat{\mathbb{P}}$($C_t$, $M_t$)} \nonumber \\
  & \text{Otherwise,} \quad \text{MiniReward} \nonumber
\end{align}
\right.
$$

$$
\begin{align}
    &\text{and}
\end{align}
$$

$$
r^{\text{leg}}_t 
=
\left\{
\begin{align}
  &\text{If rules are respected,} \quad \text{BonusReward} \nonumber \\
  &\text{Otherwise,} \quad \text{NegReward} \nonumber
\end{align}
\right.
$$

To compute the empirical reward for a given state and action, we need to retrieve the data from our analysis of famous songs datasets (e.g., a subset of Beethoven sonatas). Let's start by doing that!

In [None]:
frequencies_matrix = pd.read_csv('transition_matrix.csv', delimiter = ',', index_col = 0)

In [None]:
# You can check the format of the matrix by taking a look at its first 10 rows
frequencies_matrix.head(10)

Now that we have all the data we need, we can create the environment by defining the `MusicEnv` class:

In [None]:
class MusicEnv(gym.Env):
    def __init__(self, initial_state, melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward):
      super(MusicEnv, self).__init__()
      self.obs_space = gym.spaces.MultiDiscrete([48, 12]) # 48 chords x 12 melody notes
      self.act_space = gym.spaces.Discrete(48) # we choose to respect this order M - m - 7M - 7m
      self.start_pos = initial_state
      self.current_state = initial_state


      self.melody = melody_notes # Only the downbeats of a given melody
      self.current_melody_index = 0
      self.horizon = len(melody_notes)-1 # The number of actions to take is equal to the length of the input melody - 1


      self.frequencies = frequencies_matrix # Matrix from the empirical chord transition frequencies
      self.MaximuxReward = MaxiReward # Hyperparameter : Reward if the transition was already seen on data
      self.MinimumReward = MiniReward # Hyperparameter : Reward to be given if the transition was not seen on data
      self.BonusReward = BonusReward # Hyperparameter: Bonus for respecting the rules
      self.NegReward = NegReward # Hyperparameter: Negative reward for not respecting the rules


      self.CHORDS_to_INT = {'C':0, 'C#':1, 'D':2, 'D#':3, 'E':4,'F':5,'F#':6,'G':7, 'G#':8, 'A':9, 'A#':10, 'B':11}
      self.INT_to_CHORDS = {0:'C', 1:'C#', 2:'D', 3:'D#', 4:'E', 5:'F', 6:'F#', 7:'G', 8:'G#', 9:'A', 10:'A#', 11:'B'}
      self.TYPES_to_INT = {'M':0, 'm':1, 'M7':2, 'm7':3}
      self.INT_to_TYPES = {0:'M', 1:'m', 2:'M7', 3:'m7'}


    def reset(self):
      self.current_chord = self.start_pos
      self.current_melody_index = 0
      return self.current_chord

    def _integer_to_chord(self, integer):
      """
      Returns chord, type given an integer between 0 and 47
      """
      q, r = integer//12, integer%12
      return self.INT_to_CHORDS[r], self.INT_to_TYPES[q]

    def _chord_to_integer(self, chord, chord_type):
      """
      Returns an integer between 0 and 47 given a chord and chord_type
      """
      return self.TYPES_to_INT[chord_type]*12 + self.CHORDS_to_INT[chord]

    def _melody_note_to_integer(self, melody_note):
      """
      Returns an integer between 0 and 11 given a melody note
      """
      return self._chord_to_integer(melody_note, 'M')

    def _possible_legal_transitions(self, chord, chord_type='M'):
      """
      chord_type could take values in {'M', 'm', 'M7', 'm7'}
      Returns legal melody notes to be played with the chord `chord`
      """
      x = self.CHORDS_to_INT[chord]
      if chord_type=='M': # (X, X+4, (X+5), X+7, X+9) are legal
          legal_melodies = [c for c, idx in self.CHORDS_to_INT.items() if idx in {x, (x+4)%12, (x+5)%12, (x+7)%12, (x+9)%12}]
      if chord_type=='m': # (X, X+3, (X+5), X+7, X+8) are legal
          legal_melodies = [c for c, idx in self.CHORDS_to_INT.items() if idx in {x, (x+3)%12, (x+5)%12, (x+7)%12, (x+8)%12}]
      if chord_type=='M7': # (X, X+4, X+7, X+10) are legal
          legal_melodies = [c for c, idx in self.CHORDS_to_INT.items() if idx in {x, (x+4)%12, (x+7)%12, (x+10)%12}]
      if chord_type=='m7': # (X, X+3, X+7, X+10) are legal
          legal_melodies = [c for c, idx in self.CHORDS_to_INT.items() if idx in {x, (x+3)%12, (x+7)%12, (x+10)%12}]
      return legal_melodies

    def _is_legal_transition(self, chord, chord_type, melody_note):
      legal_transitions = self._possible_legal_transitions(chord, chord_type)
      return (melody_note in legal_transitions)

    def step(self, action):
      "action is an integer between 0-47"

      if self.current_melody_index >= self.horizon:
          print(f"Action taken after episode end: Action: {action}, State: {self.current_state}")
          # Episode has ended, no further actions should be taken
          return np.array([0, 0]), 0, True, {}  # Example default values

      next_chord, next_chord_type = self._integer_to_chord(action)
      curr_chord, curr_chord_type = self._integer_to_chord(self.current_state[0])
      empirical_freq = self.frequencies.loc[curr_chord+curr_chord_type, next_chord+next_chord_type]
      # print("Empirical_freq:", empirical_freq)

      if (empirical_freq > 0) :
          empirical_reward = self.MaximuxReward * empirical_freq
      else:
          empirical_reward = self.MinimumReward

      # Check if the next index is within the range of the melody list
      legal_reward = 0
      if self.current_melody_index + 1 < self.horizon:
            next_melody_note = self.melody[self.current_melody_index + 1]

            is_legal_transition = self._is_legal_transition(next_chord, next_chord_type, next_melody_note)
            legal_reward = self.BonusReward * is_legal_transition + self.NegReward * (1 - is_legal_transition)
            next_melody_int = self._melody_note_to_integer(next_melody_note)

      # Give extra reward if we are in the last action and it is equal to the first chord
      if (self.current_melody_index + 1 == self.horizon):
        if (action == self.start_pos[0]):
            legal_reward += self.BonusReward*3
        next_melody_int = self._melody_note_to_integer(self.melody[-1])

      # Debug information for early steps of an episode
      # if self.current_melody_index < 5:
      #       print(f"Early step debugging: Action: {action}, Current State: {self.current_state}")

      # Update the current state
      self.current_state = np.array([action, next_melody_int])
      reward = empirical_reward + legal_reward

      self.current_melody_index += 1
      done = (self.current_melody_index +1 >= self.horizon)


      # Debug: Print the current state, action, and next state

      # print(f"Current state: {self.current_state}, Action: {action}, Reward: {reward}, Done: {done}")

      return self.current_state, reward, done, {}

      #except Exception as e:
      #    print(f"Error in step method: {e}")
          # Return a default tuple on error
      #    return np.array([0, 0]), 0, True, {}


<h2> Step 2: Define a PPO class corresponding to the selected algorithm </h2>

Now that we have created the environment, we need to implement an algorithm whose objective will be to choose the best action to take at each step.

\
Our chosen algorithm, **the proximal policy optimization (PPO) with a clipped objective**, has numerous advantages. 

First, PPO is **simple to implement**, with no requirement of analytical results or second-order derivatives. Instead, it relies on updates via SGD algorithms, namely ADAM in this case. 

Besides, PPO exhibits far more **stable training** than a vanilla policy gradient method (such as
REINFORCE), since it discourages large policy updates by clipping the surrogate advantage. 

Moreover, the **complexity of the spaces can be increased** without requiring any modifications to our algorithm. This was an important point at the beginning of our work as we were likely to modify the definition of our state and action spaces.

\
As literature review has shown the **relevancy of actor-critic methods** for our problem, we incorporated one in our PPO implementation. Both the actor and the critic are structured as **deep neural networks with three hidden layers**, each containing 64 units:

The **actor** takes in the current state as input and **outputs the probability distribution over possible actions**, while the **critic** estimates the value of the current state and **predicts the total expected reward**, which helps to guide the actor’s decisions.

\
Finally, to ensure that our results are satisfying, we have implemented a **baseline model** where actions are selected randomly from a **uniform random distribution** (see the results later).

\
ℹ️ More information on the theory can be found in the README file!

In [None]:
class PPO:

  def __init__(self, env, obs_dim, act_dim, lr=0.005):

        self.env = env
        self.obs_dim = env.obs_space.shape[0]
        self.act_dim = env.act_space.n
        self.lr = lr

        # Actor Network
        self.actor = torch.nn.Sequential(
            torch.nn.Linear(self.obs_dim, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, self.act_dim) # output: probability distribution over all possible actions
        )

        # Critic Network
        self.critic = torch.nn.Sequential(
            torch.nn.Linear(self.obs_dim, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1) # output: value function estimation
        )

        self._init_hyperparameters()

        self.optimizer = torch.optim.Adam([
            {'params': self.actor.parameters()},
            {'params': self.critic.parameters()}
        ], lr=self.lr)


  def _init_hyperparameters(self):
        self.steps_per_batch = 4800
        self.max_steps_per_episode = 1600
        self.gamma = 0.95
        self.n_updates_per_iteration = 5
        self.clip = 0.2

  def get_action(self, obs):
        # If obs is already a tensor, clone and detach it. Otherwise, create a new tensor.
        if isinstance(obs, torch.Tensor):
          obs = obs.clone().detach()
        else:
          obs = torch.tensor(obs, dtype=torch.float)
        action_probs = F.softmax(self.actor(obs), dim=-1)
        dist = Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        if action is None or log_prob is None:
          print(f"Invalid action or log_prob: action={action}, log_prob={log_prob}, obs={obs}")

        return action.item(), log_prob

  def get_action_random(self, obs):
        action_probs = torch.tensor([1/self.act_dim] * self.act_dim)

        dist = Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        if action is None or log_prob is None:
          print(f"Invalid action or log_prob: action={action}, log_prob={log_prob}, obs={obs}")

        return action.item(), log_prob

  def evaluate(self, batch_obs, batch_acts):
        action_probs = F.softmax(self.actor(batch_obs), dim=-1)
        dist = Categorical(action_probs)

        action_logprobs = dist.log_prob(batch_acts)
        dist_entropy = dist.entropy()
        state_values = self.critic(batch_obs).squeeze()

        return action_logprobs, state_values, dist_entropy

  def learn(self, max_steps):
        step = 0

        while step < max_steps:
            batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens = self.rollout()

            step += np.sum(batch_lens)

            for _ in range(self.n_updates_per_iteration):
                curr_log_probs, V, _ = self.evaluate(batch_obs, batch_acts)
                V = V.squeeze()

                ratios = torch.exp(curr_log_probs - batch_log_probs) # importance sampling ratio
                A_k = batch_rtgs - V.detach() # advantage
                surr1 = ratios * A_k # surrogate advantage 1
                surr2 = torch.clamp(ratios, 1 - self.clip, 1 + self.clip) * A_k # surrogate advantage 2

                actor_loss = (-torch.min(surr1, surr2)).mean()
                critic_loss = torch.nn.MSELoss()(V, batch_rtgs)

                self.optimizer.zero_grad()
                total_loss = actor_loss + 0.5 * critic_loss
                total_loss.backward()
                self.optimizer.step()

            # Update steps
            step += np.sum(batch_lens)

  def rollout(self):
        batch_obs = []
        batch_acts = []
        batch_log_probs = []
        batch_rews  =[]
        batch_rtgs = []
        batch_lens = []

        step = 0
        while step < self.steps_per_batch:

          ep_rews = []
          obs = self.env.reset()
          done = False

          for ep_t in range(self.max_steps_per_episode):
            step += 1
            batch_obs.append(obs)

            action, log_prob = self.get_action(obs)
            obs, rew, done, _ = self.env.step(action)

            ep_rews.append(rew)
            batch_acts.append(action)
            batch_log_probs.append(log_prob)

            if done:
              break

          batch_lens.append(ep_t + 1)
          batch_rews.append(ep_rews)

        batch_obs = torch.tensor(batch_obs, dtype=torch.float)
        batch_acts = torch.tensor(batch_acts, dtype=torch.float)
        batch_log_probs = torch.tensor(batch_log_probs, dtype=torch.float)

        batch_rtgs = self.compute_rtgs(batch_rews)

        return  batch_obs, batch_acts, batch_log_probs, batch_rtgs, batch_lens

  def compute_rtgs(self, batch_rews):

        """ Computes the "return-to-go" for each timestep in each episode"""

        batch_rtgs = []

        for ep_rews in reversed(batch_rews):
          discounted_reward = 0

          for rew in reversed(ep_rews):
            discounted_reward = rew + discounted_reward * self.gamma
            batch_rtgs.insert(0, discounted_reward)

        batch_rtgs = torch.tensor(batch_rtgs,  dtype=torch.float)

        return batch_rtgs


<h2> Step 3: Prepare for training by building the appropriate wrappers to visualize the rewards on Weights and Biases </h2>

For proper accounting of rewards while the agent learns, we build a wrapper around `env.step()` and `env.reset()`. In an episode, every time you take an action the reward will be appended to the episode reward, and whenever the environment is reset (at the end of an epsiode), the episode reward is reset to 0.

In [None]:
class Memory:
    def __init__(self):
        self.actions = []
        self.states = []
        self.logprobs = []
        self.rewards = []
        self.is_terminals = []

    def clear_memory(self):
        del self.actions[:]
        del self.states[:]
        del self.logprobs[:]
        del self.rewards[:]
        del self.is_terminals[:]

In [None]:
def train_ppo(env, ppo_model, num_episodes, max_steps_per_episode, config):
    wandb.init(project="ppo-training", config=config)
    memory = Memory()
    rrecord = []  # Record of rewards for each episode
    fixedWindow = 10

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0

        for step in range(max_steps_per_episode):
            # Convert state to a NumPy array if it's not already one, then to tensor
            state_array = np.array(state) if isinstance(state, list) or isinstance(state, tuple) else state
            state_tensor = torch.from_numpy(state_array).float().unsqueeze(0)

            # Get action and log probability from PPO policy
            action, log_prob = ppo_model.get_action(state_tensor)

            # Debug: Print the output of env.step(action)
            if env.step(action) is None:
                print(f"Episode: {episode}, Step: {step}, Received None action. State tensor: {state_tensor}")
                continue  # Skip this step or handle it as needed

            try:
                next_state, reward, done, _ = env.step(action)
            except Exception as e:
                print(f"Exception in env.step(): {e}, Episode: {episode}, Step: {step}, Action: {action}, State: {state}")
                break

            episode_reward += reward

            # Save data in memory
            memory.states.append(state_tensor)
            memory.actions.append(torch.tensor([action]))
            memory.logprobs.append(log_prob)
            memory.rewards.append(reward)
            memory.is_terminals.append(done)

            if done:
                break

            # Update state
            state = next_state

        # Update PPO policy after each episode
        total_steps = 4800  # Number of steps to collect before each update, can be tuned further
        ppo_model.learn(total_steps)

        rrecord.append(episode_reward)

        # Printing functions for debugging purposes. Feel free to add more if necessary
        if episode % 10 == 0 and episode != 0:
            print(f"Episode: {episode}, Action: {action}, Reward: {reward}, Done: {done}")
            print('episode {} ave training returns {}'.format(episode, np.mean(rrecord[-10:])))

        # Log episode reward

        # Calculate average of the last 'fixedWindow' elements. If less than 'fixedWindow' episodes, average of all so far
        movingAverage = np.mean(rrecord[-fixedWindow:]) if len(rrecord) >= fixedWindow else np.mean(rrecord)

        wandb.log({"training reward": episode_reward, "training reward moving average": movingAverage})


    wandb.run.summary["number of training episodes"] = num_episodes


    print("Training completed.")

<h2> Step 4: Train the PPO model </h2>

In [None]:
# Let's create training melody notes first 
text = """
Ab Ab F F
G C E G
Bb Bb G G
Ab Ab F F
Bb Bb G G
C C C C
Ab Eb Db Db
Db Bb Ab Ab
G Eb Db Db
Ab Db C C
F F F F
G Eb Db C
C Bb Bb Ab
G Eb Db C
C Bb Bb Ab
G G G E
Eb Db Bb G
E E Ab E
Eb Db Bb G
E E Ab E
Eb Db Bb G
C Bb G G
Ab Db F Ab
Eb Eb F Ab
Eb Eb Db E
Ab Ab Ab Ab
"""

# Define replacements
replacements = {'Db': 'C#', 'Bb': 'A#', 'Eb': 'D#', 'Ab': 'G#'}

# Apply replacements and create a list
train_data = [replacements.get(chord, chord) for line in text.strip().split('\n') for chord in line.split()]

In [None]:
%%wandb

# PPO hyperparameters and training configuration
config = {
    "learning_rate": 0.008,
    "gamma": 0.95,
    "num_episodes": 100,
    "max_steps_per_episode": 100,
    # ... any other hyperparameters or settings ...
}

initial_state = np.array([5,1]) # Equivalent to (chord, chord_type) = Fm

melody_notes = train_data

# Rewards hyperparameters (potential to tune them more but these values already lead to pretty good results)
MaxiReward = 100
MiniReward = -5
BonusReward = 15
NegReward = -10

# Initialize the environment
env = MusicEnv(initial_state, melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward)  # Initialize with appropriate parameters

# PPO hyperparameters
obs_dim = env.obs_space.shape[0]
act_dim = env.act_space.n
ppo_model = PPO(env, obs_dim, act_dim, lr=config['learning_rate'])

# Start training
train_ppo(env, ppo_model, config["num_episodes"], config["max_steps_per_episode"], config)

<h2> Step 5: Evaluate the performance of our model vs baseline (random actions) </h2>

In [None]:
def evaluate_ppo(ppo_model, initial_state, melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward, max_steps_per_melody, num_episodes=100, baseline=True):
    rList = []  # List to store rewards of each episode
    movingAverageArray = []  # List to store moving average
    best_reward = -float('inf')  # Initialize best_reward with a very low value

    for episode in range(num_episodes):
        env = MusicEnv(initial_state, melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward)
        state = env.reset()
        episode_reward = 0
        generated_chords = []  # List to store generated chords for this episode

        for step in range(max_steps_per_melody):
            state_array = np.array(state) if isinstance(state, list) else state
            state_tensor = torch.from_numpy(state_array).float().unsqueeze(0)
            if baseline == True:
              action, log_prob = ppo_model.get_action_random(state_tensor) # Baseline model
            else:
              action, log_prob = ppo_model.get_action(state_tensor)

            next_state, reward, done, _ = env.step(action)
            chord, chord_type = env._integer_to_chord(action)  # Convert action to chord
            generated_chords.append(chord + chord_type)  # Combine chord and type

            episode_reward += reward
            if done:
                break
            state = next_state

        # Check if this episode has the highest reward so far
        if episode_reward > best_reward:
            best_reward = episode_reward
            best_chords = generated_chords.copy()

        # Log episode reward per 10 episodes and compute moving average
        rList.append(episode_reward)
        movingAverage = np.mean(rList[-10:]) if len(rList) >= 10 else np.mean(rList)
        movingAverageArray.append(movingAverage)

        # Log to Wandb
        if baseline == True:
          wandb.log({"evaluation reward_baseline": episode_reward, "evaluation reward moving average_baseline": movingAverage, "evaluation episode_baseline": episode})

        else:
          wandb.log({"evaluation reward": episode_reward, "evaluation reward moving average": movingAverage, "evaluation episode": episode})

    # Calculate the final score
    score = max(movingAverageArray[-10:]) if len(movingAverageArray) >= 10 else max(movingAverageArray)
    wandb.run.summary["score"] = score

    return best_chords, best_reward

<h3> 5.1. Evaluate a baseline model taking random actions </h3>

In [None]:
%%wandb
# Baseline model
initial_state = [5, 1]  # Define your initial state: Fm
test_melody_notes = ['D', 'D', 'C', 'C', 'A', 'A', 'A', 'F', 'D', 'D', 'C', 'C', 'G', 'G', 'A', 'A']
max_steps_per_melody = len(test_melody_notes)

generated_chords, best_reward = evaluate_ppo(ppo_model, initial_state, test_melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward, max_steps_per_melody, num_episodes=100, baseline=True)
print("Generated chords:", generated_chords, "with highest average reward = ", best_reward)

<h3> 5.2. Evaluate our PPO model </h3>

In [None]:
%%wandb
# Our model
initial_state = [5, 1]  # Define your initial state: Fm
test_melody_notes = ['D', 'D', 'C', 'C', 'A', 'A', 'A', 'F', 'D', 'D', 'C', 'C', 'G', 'G', 'A', 'A']
max_steps_per_melody = len(test_melody_notes)

generated_chords, best_reward = evaluate_ppo(ppo_model, initial_state, test_melody_notes, frequencies_matrix, MaxiReward, MiniReward, BonusReward, NegReward, max_steps_per_melody, num_episodes=100, baseline=False)
print("Generated chords:", generated_chords, "with highest average reward = ", best_reward)

In [None]:
run.finish()

Once the `ppo_model` is tuned, you can obtain the generated chords for any given melody notes and initial chord using the `evaluate_ppo` method.

We thus decided to apply it to 3 melodies to assess the performance as human listeners. The results are available in the README file along with the corresponding audios and more detailed explanations on our training process.

If after reading the README file you have any questions, feel free to reach out at nr2861@columbia.edu ✨