In [1]:
# imports

from collections import defaultdict
import gymnasium as gym
import numpy as np
from tqdm import tqdm
import pprint
import random

In [2]:
class Mountaincar:
    def __init__(
            self,
            env: gym.Env,
            initial_epsilon: float,
            final_epsilon: float,
            epsilon_decay: float,
            learning_rate: float,
            discount_factor: float
    ):
        self.env = env
        self.epsilon = initial_epsilon
        self.final_epsilon = final_epsilon
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor

        self.q_value = defaultdict(lambda: np.zeros(env.action_space.n))

        if DEBUG:
            print("Created q-value")
            print(np.max(self.q_value[0]))

        self.training_error = []

    def get_action(
            self,
            obs: tuple[float, float]
    ) -> int:

        obs = tuple(np.reshape(obs, (2)))
        
        if random.randint(0,2) < self.epsilon:
            if DEBUG:
                print("random sampling")
            return self.env.action_space.sample()
        else:
            if DEBUG:
                print("sampling from Q-function")
            return int(np.max(self.q_value[obs]))
    
    def get_action_final(
            self,
            obs: tuple[float, float]
    ) -> int:
        
        return np.max(self.q_value[obs])
        
    def update_q_value(
        self,
        obs: tuple[float, float],
        reward: float,
        action: int,
        terminated: bool,
        next_obs: tuple[float, float],
    ):
        
        future_q_value = (not terminated) * np.max(self.q_value[next_obs])
        
        # if DEBUG:
            # print(self.q_value[0])
            # print(future_q_value)

        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_value[obs][action]
        )

        self.q_value[obs][action] = (
            self.q_value[obs][action] + self.learning_rate * temporal_difference
        )

        self.training_error.append(temporal_difference) # why temporal difference is the training error
    
    def decay_epsilon(
        self
    ):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)


In [3]:
DEBUG = False

n_episodes = 100
learning_rate = 0.1
start_epsilon = 1.0
final_epsilon = 0.1
epsilon_decay = start_epsilon / (n_episodes / 2)
discount_factor = 0.95

env = gym.make("MountainCar-v0", render_mode="human")

# env = gym.make("MountainCar-v0")

agent = Mountaincar(env, start_epsilon, final_epsilon, epsilon_decay, learning_rate, discount_factor)

In [None]:

for episode in tqdm(range(n_episodes)):

    obs, info = env.reset()
    done = False

    obs = tuple(np.reshape(obs, (2)))

    if DEBUG:
        print("================================")
        print("Starting car position: ", obs[0])
        print("Starting car velocity: ", obs[1])
    
    idx = 0
    
    while not done:
        action = agent.get_action(obs)

        next_obs, reward, terminated, truncated, info = env.step(action)
        next_obs = tuple(np.reshape(next_obs,(2)))

        agent.update_q_value(obs, reward, action, terminated, next_obs)

        done = terminated or truncated

        obs = next_obs
        idx += 1

        print("idx: ", idx)
        print("reward: ", reward)
        print("-------------------------------------")
        
        if DEBUG:
            print("action: ", action)
            print("reward: ", reward)
            print("Current car position: ", obs[0])
            print("Current car velocity: ", obs[1])
            print("idx: ", idx)
            print("-------------------------------------")
    # if DEBUG:
    print("=======================================")
    agent.decay_epsilon()

env.close()

In [5]:
# env = gym.make("MountainCar-v0", render_mode="human")

# obs, info = env.reset()
# obs = tuple(np.reshape(obs, (2)))
# done = False

# while not done:
#     action = agent.get_action_final(obs)

#     next_obs, reward, terminated, truncated, info = env.step(action)

#     done = terminated or truncated
    
#     obs = next_obs

# env.close()

In [6]:
# obs, info = env.reset()

# obs = tuple(np.reshape(obs, (2)))
# print(obs[1])

# if DEBUG:
#     print("Srarting state")
#     print("obs: ", obs)
#     print("=================")

# for _ in range(20):
#     # action = env.action_space.sample()
#     action = 2
#     next_obs, reward, terminated, truncated, info = env.step(action)
#     next_obs = tuple(np.reshape(next_obs,(2)))

#     if DEBUG:
#         print("action: ", action)
#         print("obs: ", obs)
#         print("reward: ", reward)
#         print("terminated: ", terminated)
#         print("truncated: ", truncated)
#         print("info: ", info)
#         print("-----------------------------")