In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt

env = gym.make("MountainCar-v0")
print(env.reset())  # calling env.reset() will give us a random state, e.g. [-0.59585701  0.        ]

# Q-Learning settings
LEARNING_RATE = 0.1  # [0.1, 0.0001]
DISCOUNT = 0.95      # [0.95, 0.99]
EPISODES = 25_000     # 25_000
SHOW_EVERY = 1000    # 1000

# Quantization settings
# DISCRETE_OS_SIZE = [20, 20]  # use 20 groups/buckets for each range. (20 units) 
DISCRETE_OS_SIZE = [40] * len(env.observation_space.high)
# [(0.6 - (-1.2)) / 20, (0.07 - (-0.07)) / 20] == [1.8 / 20, 0.14 / 20] == [0.09, 0.007]
discrete_os_window_size = (env.observation_space.high - env.observation_space.low) / DISCRETE_OS_SIZE

# Epsilon-Greedy Exploration settings
epsilon = 1  # not a constant, going to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES // 2 
epsilon_decay_value = epsilon / (END_EPSILON_DECAYING - START_EPSILON_DECAYING + 1)

# For stats 
epi_rewards = [] 
aggr_epi_rewards = {'epi': [], 'avg': [], 'max': [], 'min': []}
STATS_EVERY = 100
SAVE_QTABLE_EVERY = 100



def retrieve_name(var):
    import inspect
    callers_local_vars = inspect.currentframe().f_back.f_back.f_locals.items()
    return [var_name for var_name, var_val in callers_local_vars if var_val is var]

def log(*argv):
    import torch
    for arg in argv:
        print(f"-"*75)
        print(f"{retrieve_name(arg)}")
        print(f"content: ")
        print(arg)
        print(f"type: {type(arg)}")
        if isinstance(arg, np.ndarray) or isinstance(arg, torch.Tensor): 
            print(f"shape: {arg.shape}")
        elif isinstance(arg, list) or isinstance(arg, str) or isinstance(arg, dict):
            print(f"len: {len(arg)}")




In [None]:
print(f"number of actions: {env.action_space.n}")  # 3 
# there are "3" actions we can pass: 0 means push left, 1 is stay still, and 2 means push right
print(f"random action: {env.action_space.sample()}")  # 0, 1, or 2

# we can query the enviornment to find out the possible ranges for each of these state values
print(f"state values range: ") 
print(f"  {env.observation_space.high}")  # [0.6  0.07]
print(f"  {env.observation_space.low}")   # [-1.2  -0.07]

print(f"len(env.observation_space.high): {len(env.observation_space.high)}")  # 
print(f"discrete observation space window size: {discrete_os_window_size}")   # [0.09  0.007]

size = DISCRETE_OS_SIZE + [env.action_space.n]
print(f"q_table size: {size}")  # [20, 20, 3] is a 20x20x3 shape


In [None]:
q_table = np.random.uniform(low=-2, high=0, size=DISCRETE_OS_SIZE + [env.action_space.n])
# log(q_table)

def get_discrete_state(state):
    discrete_state = (list(state) - env.observation_space.low) / discrete_os_window_size
    # log(discrete_state, state, env.observation_space.low, discrete_os_window_size)
    # we use this tuple to look up the 3 Q-values for the available actions in the 'q_table'
    return tuple(discrete_state.astype(np.int8)) 


In [None]:

test_state = get_discrete_state(env.observation_space.sample())
print(type(test_state))                       # <class 'tuple'>
print(test_state)                             # (6, 10)
test_action = np.argmax(q_table[test_state])  
print(type(test_state + (test_action, )))     # <class 'tuple'>
print(test_state + (test_action, ))           # (6, 10, 2)


In [None]:
env.reset()

for episode in range(EPISODES):
    episode_reward = 0  # current episode reward

    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False
    
    # Get the initial state values from env.reset() and store it to 'discrete_state'
    discrete_state = get_discrete_state(env.observation_space.sample())
    done = False
    while not done:
        # Take an action
        if epsilon > 0 and np.random.random() > epsilon:
            # Get action from Q-table
            action = np.argmax(q_table[discrete_state])  # get the index of the greatest Q-value in the q_table
        else:
            # Get random action
            action = np.random.randint(0, env.action_space.n)
        
        new_state, reward, done, info = env.step(action=action)
        # e.g. reward: -1.0, state := [position, velocity] == [-0.5519343  -0.01300341]
        episode_reward += reward

        new_discrete_state = get_discrete_state(new_state)

        if episode % SHOW_EVERY == 0:
            # print(reward, new_state)  
            env.render()

        # If simulation did not end yet after last step - update Q table
        if not done:

            # Maximum possible Q-value in next step (for new state)
            max_future_q = np.max(q_table[new_discrete_state])  # get the max value, not the index

            # Current Q value (for current state and performed action)
            current_q = q_table[discrete_state + (action,)]

            # And here's our equation for a new Q value for current state and action:
            #   Q_{new}(s_t, a_t) <-- (1 - \alpha) \cdot Q(s_t, a_t) +    \alpha      \cdot (  r_t   +    \gamma      \cdot \max_{a} Q(s_{t + 1}, a))
            #       new-Q-value                            old-value    learning-rate         reward   discount-fator       estimate-of-future-value
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

            # Update Q table with new Q value
            q_table[discrete_state + (action,)] = new_q


        # Simulation ended (for any reson) - if goal position is achived - update Q-value with reward directly
        elif new_state[0] >= env.goal_position:
            # q_table[discrete_state + (action, )] = reward
            q_table[discrete_state + (action,)] = 0
        
        # Updating the old state with the new state
        discrete_state = new_discrete_state
    
    # Decaying is being done every episode if episode number is within decaying range
    if START_EPSILON_DECAYING <= episode <= END_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value

    # Save stats for further analysis
    epi_rewards.append(episode_reward)
    if not episode % STATS_EVERY:
        average_reward = sum(epi_rewards[-STATS_EVERY:]) / STATS_EVERY  # running average of past 'STATS_EVERY' number of rewards 
        aggr_epi_rewards['epi'].append(episode)
        aggr_epi_rewards['avg'].append(average_reward)
        aggr_epi_rewards['max'].append(max(epi_rewards[-STATS_EVERY:]))
        aggr_epi_rewards['min'].append(min(epi_rewards[-STATS_EVERY:]))
        # ':>5d' pad decimal with zeros (left padding, width 5), ':>4.1f' format float 1 decimal places (left padding, width 4)
        print(f'Episode: {episode:>5d}, average reward: {average_reward:>4.1f}, current epsilon: {epsilon:>1.2f}')

    # Save the q_table for every SAVE_QTABLE_EVERY number of episodes
    # if episode % SAVE_QTABLE_EVERY == 0:
    #     np.save(f"qtables/{episode}-qtable.npy", q_table) 

env.close()


In [None]:
# Plot the reward figure 
plt.plot(aggr_epi_rewards['epi'], aggr_epi_rewards['avg'], label="avg rewards")
plt.plot(aggr_epi_rewards['epi'], aggr_epi_rewards['max'], label="max rewards")
plt.plot(aggr_epi_rewards['epi'], aggr_epi_rewards['min'], label="min rewards")

plt.legend(loc='best')
plt.xlabel(f"episodes")
plt.ylabel(f"rewards")
plt.grid(True)

plt.show()
# plt.savefig(f"figures/avg-max-min-rewards.png")
