In [1]:
import numpy as np
import pandas as pd

from collections import defaultdict
from tqdm import tqdm

import random

import gym
import adaptive_tutor

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
import numpy as np
action_space = np.load('adaptive_tutor/action_space.npy', allow_pickle=True)

In [12]:
class TutorStatefulAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """

        self.action_space = list(np.load('adaptive_tutor/action_space.npy', allow_pickle=True))
        self.q_values = defaultdict(lambda: np.zeros(len(self.action_space)))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def _create_rating_bracket(self, row):
        if row<1300:
            return 'lt_1300'
        elif 1300<=row<1800:
            return '1300-1900'
        else:
            return 'gt_1800'
    
    def _get_agent_state(self, obs):
        return tuple([self._create_rating_bracket(val) for val in obs['themes_covered']])

    def get_action(self, obs: tuple[int, int, bool]) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        obs = self._get_agent_state(obs)
        if np.random.random() < self.epsilon:
            print("Exploring")
            return random.choice(self.action_space)

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return self.action_space[(np.argmax(self.q_values[obs]))]

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Updates the Q-value of an action."""
        obs = self._get_agent_state(obs)
        next_obs = self._get_agent_state(next_obs)
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [13]:
class TutorAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.action_space = list(np.load('adaptive_tutor/action_space.npy', allow_pickle=True))
        self.action_space_dict = {action: i for i,action in enumerate(self.action_space)}
        self.q_values = np.zeros(len(self.action_space))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            print("Exploring")
            return random.choice(self.action_space)

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return self.action_space[np.argmax(self.q_values)]
    

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Updates the Q-value of an action."""
        
        future_q_value = (not terminated) * np.max(self.q_values)
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[self.action_space.index(action)]
        )

        self.q_values[self.action_space.index(action)] = (
            self.q_values[self.action_space.index(action)] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [14]:
# Hyperparameters
learning_rate = 0.01
n_episodes = 1000
start_epsilon = 0.5
epsilon_decay = start_epsilon / (n_episodes / 2) # Reduce the exploration over time
final_epsilon = 0.1

agent = TutorStatefulAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

In [15]:
# Load custom environment we created 
env = gym.make('adaptive_tutor/PuzzleTutorEnv-v0', render_mode=None) 

# Set to initial state
env.reset()

for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False
    terminated = False
    # Play one episode
    step_counter = 0
    while not terminated:
        step_counter+=1
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        print(action, reward)
        # Update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # Update the current observation, and whether the environment is done
        done = terminated or truncated
        obs = next_obs
    
    print("Number of steps: ", step_counter)

    agent.decay_epsilon()

  logger.warn(f"{pre} is not within the observation space.")
  0%|          | 0/1000 [00:00<?, ?it/s]


TypeError: cannot unpack non-iterable int object