In [8]:
import sys
%matplotlib qt5
import pylab as plb
from matplotlib import pyplot as plt
import numpy as np
import mountaincar
from tqdm import tqdm

In [39]:
class DummyAgent():
    """A not so good agent for the mountain-car task.
    """

    def __init__(self, mountain_car = None, x_linspace = (-150, 30, 20),
                v_linspace = (-15, 15, 20), w = None, tau = 1, gamma = 0.95,
                 eta = 0.01, lambda_ = 0.5):
        ''' Initialize the object '''
        
        # saving the environment object
        if mountain_car is None:
            self.mountain_car = mountaincar.MountainCar()
        else:
            self.mountain_car = mountain_car
        
        # range for x neurons grid
        self.x_values = np.linspace(*x_linspace)

        # range for v neurons grid
        self.v_values = np.linspace(*v_linspace)

        # steps x and v
        self.delta_x = self.x_values[1] - self.x_values[0]
        self.delta_v = self.v_values[1] - self.v_values[0]

        # sigmas x and v
        self.sigma_x = np.array([self.delta_x] * len(self.x_values))
        self.sigma_v = np.array([self.delta_v] * len(self.v_values))

        # number of actions
        self.n_actions = 3

        # number of neurons
        self.n_neurons = len(self.x_values) * len(self.v_values)

        # weight matrix
        if w is None:
            self.w = np.random.randn(self.n_actions, self.n_neurons)
        else:
            self.w = w

        # sampling softmax temperature
        self.tau = tau
        
        # reward discount factor
        self.gamma = gamma
        
        # learning rate
        self.eta = eta
        
        # eligibility trace parameter
        self.lambda_ = lambda_
            
    def r(self, x, v):
        ''' get neuron activations for s = (x, v) '''
        # x in rows, v in columns
        part_x = np.reshape(np.divide((self.x_values - x) ** 2, self.sigma_x ** 2), (-1, 1))
        part_v = np.reshape(np.divide((self.v_values - v) ** 2, self.sigma_v ** 2), (1, -1))
        return np.exp(-(part_x + part_v))

    def get_Q(self, x, v):
        ''' Get Q-function at given s = (x, v) with weights w '''
        
        return np.reshape(self.w @ np.reshape(self.r(x, v), (-1, 1)), (-1,))

    def get_action_probas(self, Q):
        ''' get action probabilities as a vector '''

        vector = np.exp(Q / self.tau)
        return vector / np.sum(vector)

    def get_action_index(self, x, v):
        ''' Sample action for s = (x, v) and weights w with parameter tau '''

        Q = self.get_Q(x, v)
        action_probas = self.get_action_probas(Q)
        return np.random.choice(range(self.n_actions), p = action_probas)

    def update_w(self, x, v, a_index, delta):
        ''' Perform gradient descent on Q(s, a) by delta given s and a'''
        
        dQ_dwa = np.reshape(self.r(x, v), -1)
        self.w[a_index, :] += delta * dQ_dwa

    def visualize_trial(self, n_steps = 200):
        """Do a trial without learning, with display.

        Parameters
        ----------
        n_steps -- number of steps to simulate for
        """
        
        # prepare for the visualization
        plb.ion()
        mv = mountaincar.MountainCarViewer(self.mountain_car)
        mv.create_figure(n_steps, n_steps)
        plb.draw()
        plb.pause(1e-3)
            
        # make sure the mountain-car is reset
        self.mountain_car.reset()
        
        for n in (range(n_steps)):
            print('\rt =', s, self.mountain_car.t)
            sys.stdout.flush()

            # update the visualization
            mv.update_figure()
            plb.draw()
            plb.pause(1e-3)
            
            # check for rewards
            if self.mountain_car.R > 0.0:
                print("\rreward obtained at t = ", self.mountain_car.t)
                break

    def learn(self, max_steps = 10000):
        """Do a trial without learning, with display.

        Parameters
        ----------
        n_steps -- number of steps to simulate for
        """
            
        # make sure the mountain-car is reset
        self.mountain_car.reset()

        # saved previous state
        old_s = None
        old_a = None
        
        # all states and actions array
        all_s_a = []
        
        for n in range(max_steps):
            # get current state
            s = (self.mountain_car.x, self.mountain_car.x_d)
            
            #print('\rt =', s, self.mountain_car.t, self.get_action_index(*s))
            sys.stdout.flush()

            # selection current action based on softmax
            action_index = self.get_action_index(*s)

            # save s, a
            all_s_a.append((s, action_index))
            
            # perform the action
            self.mountain_car.apply_force(action_index - 1)
            
            # simulate the timestep
            self.mountain_car.simulate_timesteps(100, 0.01)
            
            # check for rewards (runs at the end once)
            if self.mountain_car.R > 0.0:
                # print the obtained reward
                print("\rreward obtained at t = ", self.mountain_car.t)
                
                # compute vector [xi ^ (T-1), ..., 1] where xi = gamma * lambda
                eligibility_trace = np.flip(np.array([self.gamma * self.lambda_]) **
                                            np.arange(len(all_s_a)), axis = 0)
                
                # compute the update for the Q function
                # update = eta * delta (from lectures)
                
                # old Q
                Q = self.get_Q(*old_s)[old_a]
                
                # new Q
                Q1 = np.max(self.get_Q(*s))
                
                # eta * (R + gamma * Qnew - Qold)
                update = self.eta * (self.mountain_car.R + self.gamma * Q1 - Q)
                
                # loop over history
                i = 0
                for s0, a0 in all_s_a:
                    # updating Q based on SARSA and eligibility traces
                    self.update_w(s0[0], s0[1], a0, update * eligibility_trace[i])
                    i += 1
                    
                # no steps after the reward
                break
                
            # saving old state
            old_s = s
            old_a = action_index

In [None]:
# create an agent
d = DummyAgent(tau = 1)

# learn for 100 iterations
for i in range(100):
    d.learn()

reward obtained at t =  562.0
reward obtained at t =  290.0
reward obtained at t =  728.0
reward obtained at t =  366.0
reward obtained at t =  615.0
reward obtained at t =  1366.0
reward obtained at t =  226.0
