In [1]:
#######################################################################
# Following are some utilities for tile coding from Rich.
# To make each file self-contained, I copied them from
# http://incompleteideas.net/tiles/tiles3.py-remove
# with some naming convention changes
#
# Tile coding starts
from math import floor

class IHT:
    "Structure to handle collisions"
    def __init__(self, size_val):
        self.size = size_val
        self.overfull_count = 0
        self.dictionary = {}

    def count(self):
        return len(self.dictionary)

    def full(self):
        return len(self.dictionary) >= self.size

    def get_index(self, obj, read_only=False):
        d = self.dictionary
        if obj in d:
            return d[obj]
        elif read_only:
            return None
        size = self.size
        count = self.count()
        if count >= size:
            if self.overfull_count == 0: print('IHT full, starting to allow collisions')
            self.overfull_count += 1
            return hash(obj) % self.size
        else:
            d[obj] = count
            return count

def hash_coords(coordinates, m, read_only=False):
    if isinstance(m, IHT): return m.get_index(tuple(coordinates), read_only)
    if isinstance(m, int): return hash(tuple(coordinates)) % m
    if m is None: return coordinates

def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
    """returns num-tilings tile indices corresponding to the floats and ints"""
    if ints is None:
        ints = []
    qfloats = [floor(f * num_tilings) for f in floats]
    tiles = []
    for tiling in range(num_tilings):
        tilingX2 = tiling * 2
        coords = [tiling]
        b = tiling
        for q in qfloats:
            coords.append((q + b) // num_tilings)
            b += tilingX2
        coords.extend(ints)
        tiles.append(hash_coords(coords, iht_or_size, read_only))
    return tiles
# Tile coding ends
#######################################################################


In [2]:
import numpy as np
import gym
from operator import mul
from tqdm import tqdm

POSITION_MIN = -1.2
POSITION_MAX = 0.5
VELOCITY_MIN = -0.07
VELOCITY_MAX = 0.07

max_size = 2048

iht = IHT(max_size)
num_of_tilings = 8

position_scale = num_of_tilings / (POSITION_MAX - POSITION_MIN)
velocity_scale = num_of_tilings / (VELOCITY_MAX - VELOCITY_MIN)

In [3]:
# Tiling
import random

def get_active_tiles(state, action):
    return tiles(iht, num_of_tilings, state, [action])

def state_action_value(state, action, weights):
    return sum(weights[get_active_tiles(state, action)])

def epsilon_greedy(state, actions, weights, epsilon):
    if random.random() < epsilon:
        return random.choice(actions)
    else :
        action_values = [(action, state_action_value(state, action, weights)) for action in actions]
        max_action_value = None
        max_actions = []
        for a, v in action_values:
            if max_action_value == None or v > max_action_value:
                max_action_value = v
                max_actions = [a]
            elif v == max_action_value:
                max_actions.append(a)
        return random.choice(max_actions)

In [4]:
env = gym.make('MountainCar-v0')
env._max_episode_steps = 2000

EPISODES = 10000
NUM_LOGS = 10

POLICY = epsilon_greedy

ALPHA = 0.5 / num_of_tilings
EPSILON = 0
GAMMA = 1
ACTIONS = [0, 1, 2]



weights = np.zeros(max_size)

total_reward = 0

bar = tqdm(range(EPISODES))
last_10_rewards = []

for e in bar:
    s = env.reset()
    a = POLICY(s, ACTIONS, weights, EPSILON)
    done = False
    episode_reward = 0
    step = 0
    while not done:
#         env.render()
        s_prime, r, done, info = env.step(a)
#         done = False
        if done:
            current_value = state_action_value(s, a, weights)
            weights[get_active_tiles(s, a)] += ALPHA * (r - current_value)
        else:
            a_prime = POLICY(s_prime, ACTIONS, weights, EPSILON)
            current_value = state_action_value(s, a, weights)
            next_value = state_action_value(s_prime, a_prime, weights)
            weights[get_active_tiles(s, a)] += ALPHA * (r + GAMMA * next_value - current_value)
            s = s_prime
            a = a_prime
        
        episode_reward += r
        total_reward += r
        step += 1
        
    if (len(last_10_rewards)) >= 10:
        last_10_rewards.pop(0)
        
    last_10_rewards.append(episode_reward)
    bar.set_description("Average reward: %f" % (sum(last_10_rewards) / len(last_10_rewards)))


Average reward: -130.100000: 100%|██████████| 10000/10000 [03:12<00:00, 51.99it/s]


In [56]:
weights

array([ -93.9461783 , -178.69544956, -154.47866136, ...,    0.        ,
          0.        ,    0.        ])

In [9]:
from math import floor

def hash_coords(coordinates, m, read_only=False):
    if isinstance(m, int): return hash(tuple(coordinates)) % m
    if m is None: return coordinates

def tiles(iht_or_size, num_tilings, floats, ints=None, read_only=False):
    """returns num-tilings tile indices corresponding to the floats and ints"""
    if ints is None:
        ints = []
    qfloats = [floor(f * num_tilings) for f in floats]
    tiles = []
    for tiling in range(num_tilings):
        tilingX2 = tiling * 2
        coords = [tiling]
        b = tiling
        for q in qfloats:
            coords.append((q + b) // num_tilings)
            b += tilingX2
        coords.extend(ints)
        tiles.append(hash_coords(coords, iht_or_size, read_only))
    return tiles

tiles(10, 2, [1.2, 0.02], [1])

[1, 9]