In [3]:
import gym
import numpy as np

env = gym.make('MountainCar-v0')

In [4]:
class Pile_layer():
    '''
    Class to seperate a continuous environment into grids with the number of single grids for each dimension specified in "dimension"
    The amount of (overlapping) grids can be specified in amount with the "offset", each grid will be moved by offset
    '''
    def __init__(self, dimension, input_dim, amount, offset):
        self.dimension = dimension
        self.amount = amount
        self.offset = offset
        self.input_dim = input_dim
        self.initialize_layer()
        self.total_dimension = (dimension[0]+1)*(dimension[1]+1)*amount
    
    def initialize_layer(self):
        width = []
        self.x_ranges = []
        self.y_ranges = []
        for i in range(len(self.dimension)):
            width.append((input_dim[i][1]-input_dim[i][0])/(self.dimension[i]))
        for num in range(self.amount):
            this_x_offset = self.offset[0]*width[0]*num
            this_y_offset = self.offset[1]*width[1]*num
            this_x_ranges = []
            this_y_ranges = []
            #calculate grid limits for x direction
            for i in range(self.dimension[0]+1):
                if i == 0:
                    this_x_ranges.append(self.input_dim[0][0])
                    if this_x_offset != 0:
                        this_x_ranges.append(self.input_dim[0][0]+this_x_offset+width[0]*i)
                elif i == self.dimension[0]:
                    this_x_ranges.append(self.input_dim[0][1])
                    if this_x_offset == 0:
                        this_x_ranges.append(-np.inf)
                else:
                    this_x_ranges.append(self.input_dim[0][0]+this_x_offset+width[0]*i)                        
            
            #calculate grid limits for y direction
            for i in range(self.dimension[1]+1):
                if i == 0:
                    this_y_ranges.append(self.input_dim[1][0])
                    if this_y_offset != 0:
                        this_y_ranges.append(self.input_dim[1][0]+this_y_offset+width[1]*i)
                elif i == self.dimension[1]:
                    this_y_ranges.append(self.input_dim[1][1])
                    if this_y_offset == 0:
                        this_y_ranges.append(-np.inf)                    
                else:
                    this_y_ranges.append(self.input_dim[1][0]+this_y_offset+width[1]*i) 
        
            self.x_ranges.append(this_x_ranges)
            self.y_ranges.append(this_y_ranges)
            
        self.x_ranges = np.asarray(self.x_ranges)
        self.y_ranges = np.asarray(self.y_ranges)
    
    def get_vector(self, state):
        state_matrix = np.zeros((self.amount, self.dimension[0]+1, self.dimension[1]+1))
        for num in range(self.amount):
            x_tile = np.argmin([np.inf if x<=0 else x for x in self.x_ranges[num]-state[0]])-1
            y_tile = np.argmin([np.inf if x<=0 else x for x in self.y_ranges[num]-state[1]])-1
            state_matrix[num, x_tile, y_tile] = 1
        
        return state_matrix.flatten()
            
input_dim = [[env.observation_space.low[0],env.observation_space.high[0]],[env.observation_space.low[1],env.observation_space.high[1]]]
piler = Pile_layer([15,15],input_dim, 5, [0.2,0.2])

In [7]:
alpha = 0.1 #Learning rate
gamma = 0.95 #Discount factor
epsilon_start = 0.15 #epsilon at the start of training
num_episodes = 1000 

actions = [0,1,2]
theta = np.full((3, piler.total_dimension), 1, dtype=float) #weights

num_actions=len(actions)
episode_id = 0
rewards = []

while episode_id < num_episodes:
    # linearly reduce epsilon
    episode_id += 1
    epsilon = epsilon_start-epsilon_start/(num_episodes)*episode_id

    this_episode_rewards = []
    this_state = env.reset()
    this_state = piler.get_vector(this_state)
    terminal = False
    
    #Update policy and randomly choose next action
    q_all_a = np.array([np.sum(theta[x]*this_state) for x in actions])
    a_star = np.random.choice(np.flatnonzero(q_all_a == q_all_a.max()))
    this_action = np.random.choice(actions, p=[epsilon/num_actions if x!=a_star else 1-epsilon+epsilon/num_actions for x in actions])
    this_q = np.sum(theta[this_action]*this_state)
    while True:
        (next_state, reward, terminal, _) = env.step(this_action)
        
        #Update policy and randomly choose next state
        next_state = piler.get_vector(next_state)
        q_all_a = np.array([np.sum(theta[x]*next_state) for x in actions])
        a_star = np.random.choice(np.flatnonzero(q_all_a == q_all_a.max()))
        next_action = np.random.choice(actions, p=[epsilon/num_actions if x!=a_star else 1-epsilon+epsilon/num_actions for x in actions])
        
        #Update weights
        next_q = np.sum(theta[next_action]*next_state)
        theta[this_action] = theta[this_action] +alpha * (reward+gamma*next_q-this_q)*this_state
        
        this_state = next_state
        this_action = next_action
        this_q = next_q
        this_episode_rewards.append(reward)
        
        if terminal:
            break

    rewards.append(sum(this_episode_rewards))

In [8]:
# Watch the result of the training
epsilon = 0
num_episodes = 10

episode_id=0
while episode_id < num_episodes:
    episode_id += 1
    
    this_state = env.reset()
    this_state = piler.get_vector(this_state)
    terminal = False
    
    q_all_a = np.array([np.sum(theta[x]*this_state) for x in actions])
    a_star = np.random.choice(np.flatnonzero(q_all_a == q_all_a.max()))
    this_action = np.random.choice(actions, p=[epsilon/num_actions if x!=a_star else 1-epsilon+epsilon/num_actions for x in actions])
    while True:
        (next_state, reward, terminal, _) = env.step(this_action)
        next_state = piler.get_vector(next_state)
        
        # update policy to select next action
        q_all_a = np.array([np.sum(theta[x]*next_state) for x in actions])
        a_star = np.random.choice(np.flatnonzero(q_all_a == q_all_a.max()))
        next_action = np.random.choice(actions, p=[epsilon/num_actions if x!=a_star else 1-epsilon+epsilon/num_actions for x in actions])

        this_state = next_state
        this_action = next_action
        env.render()
        if terminal:
            break

env.close()