In [15]:
import numpy as np
from numpy.random import default_rng
import itertools
import os
import random
import matplotlib.pyplot as plt
import pandas as pd
import sklearn
# from sklearn.kernel_approximation import RBFSampler
# from sklearn.pipeline import Pipeline
import gym
from gym import Env
from gym.spaces import Discrete, Box, Dict

## undrestanding the environment

In [None]:
env = gym.make('MountainCarContinuous-v0')


print('env action space:')
print(f'\t action space: \t\t\t{env.action_space}')
print(f'\t a sample of action space: \t{env.action_space.sample()}')
print(f'\t action space shape: \t\t{env.action_space.shape}')
print(f'\t action space lower bound: \t{env.action_space.low}')
print(f'\t action space upper bound: \t{env.action_space.high}')

print('-----'*20)
print('env observation space (state space):')
print(f'\t observation space: \t\t\t{env.observation_space}')
print(f'\t a sample of observation space: \t{env.observation_space.sample()}')
print(f'\t observation space shape: \t\t{env.observation_space.shape}')
print(f'\t observation space lower bound: \t{env.observation_space.low}')
print(f'\t observation space upper bound: \t{env.observation_space.high}')

print('-----'*20)
print(''' env reward:   \t\tA negative reward of -0.1 * action2 is received at each timestep to penalise for taking actions of large 
                        magnitude. If the mountain car reaches the goal then a positive reward of +100 is added to the negative 
                        reward for that timestep.''')

## A2c from scratch

From there, I borrowed a little https://github.com/dennybritz/reinforcement-learning/blob/master/PolicyGradient/Continuous%20MountainCar%20Actor%20Critic%20Solution.ipynb 

In [16]:
def rbf_state_coder(*, s, rbf_c, rbf_cov):
    ''' based on algorithm in a book named: "Reinforcement Learning: An Introduction second edition. Richard S. Sutton and Andrew G. Barto."
        used to code the state by RBF method. ach RBF has a center(=rbf_c[0, :]) and a covariance(=rbf_cov[0, :]).
        ** d : number of the rbfs in the state space

        input
        --------
            s : state : shape (2, 1) : each shape has a value in range of [0, 1]

            rbf_c : a np array : shape (d, 2) : center of each rbf is stored in each row
                rbf_c_0 : rbf_c[0, :] : 1st row of rbf_c : determines the center of the 1st rbf; 
                state space is 2D, so shape(rbf_c_0) =(1, 2)

            rbf_cov : np array : shape (d, 2) : covariance (diagonal elements) of each rbf is stored in each row
                rbf_cov[0, :] : first row of rbf_cov : detemines the diagonal element of the covariance amtrix in the 1st rbf.

        output
        --------
            x_s : shape (d, 1) : coded value of the state : featurized representation of the space : on the Nili's notation it is Φ
    '''
    d = rbf_c.shape[0]                 # number of the rbf functions
    x_s = np.empty((d, 1))             # define the x matrix
    # calculate each row of the x based on the formula x_i(s) = exp(-(s - c_i)^T * Σ_i * (s - c_i))
    for row in range(d):
        x_s[row, 0] = np.exp(-0.5 * (s-rbf_c[row].reshape((2, 1))).T @ np.linalg.inv(np.diag(rbf_cov[row])) @ (s-rbf_c[row].reshape((2, 1))))[0, 0]
    return x_s

In [17]:
class ValueEstimator():
    """Value Function approximator. """
    def __init__(self, *, w=None, alpha_w=0.1, state_coding= None, rbf_c=None, rbf_cov=None):
        '''
            input
            -------
                w : 2D np array shape (..., 1); shape depends x_s : weight vector in value formula; value formula is v = w^T @ x_s ; 
                alpha_w : learning rate for w in update method
                state_coding : the function which is used to code the state. state_coding ∈ {'rbf_state_coder', 'featurize_state'}
                rbf_c : the centers in rbf state coding : shape(d, 2)
                rbf_cov : the covariance in rbf state coding : shape (d, 2)
        '''
        if w == None:
            self.w = np.zeros((rbf_c.shape[0], 1))            # weight vector. v_hat is a linear function of the weight vector
        else:
            self.w = w
        self.alpha_w = alpha_w
        self.rbf_c = rbf_c                      # rbf parameter for state coding
        self.rbf_cov = rbf_cov                  # rbf parameter for state coding
        self.x_s = None
        self.state_coding = state_coding
         
    def predict(self, state):
        '''
            take the state and give the value of the state (v_hat) based on below formula
                        v_hat(s, w) = w^T x_s
            input
            ------
                state : shape (2, 1)
            output
            -------
                v_hat : scalar
        '''
        # TODO : later change the state_coding input argumnet from string to function
        if self.state_coding == 'rbf_state_coder':        # code the state using rbf_state_coder
            x_s = rbf_state_coder(s=state, rbf_c=self.rbf_c, rbf_cov=self.rbf_cov)
            
        # calculate the v_hat by fomula -->>  v_hat(s, w) = w^T x_s ; then by .item() we change it to scalar 
        v_hat = (self.w.T @ x_s).item()   
        return v_hat

    def update(self, *, state, value_state, target):
        '''
            take state, value_state and target and update the weight vector using below formula
                    w_new = w_old + alpha_w*(target-value_state)*x_s 

            input
            ---------
                state : np array : shape(2, 1) : 
                value_state : scalar 
                target : scalar ; target = reward + (discount_factor * value_next)
        '''
        # w_new = w_old + alpha_w*(target-value_state)*x_s
        if self.state_coding == 'rbf_state_coder': 
            self.w = self.w + self.alpha_w*(target-value_state)*rbf_state_coder(s=state, rbf_c=self.rbf_c, rbf_cov=self.rbf_cov)  
        elif self.state_coding == 'featurize_state':
            self.w = self.w + self.alpha_w*(target-value_state)*featurize_state(state)
 

In [18]:
class PolicyEstimator():
    """The object contain alpha_theta (learning_rate) and policy parameter (theta_mu and theta_sigma). used to predict and upda"""
    # TODO : later change the state_coding input argumnet from string to function
    def __init__(self, theta_mu=None, theta_sigma=None, 
                 alpha_theta=0.001, state_coding= None, rbf_c=None, rbf_cov=None):
        '''
            input
            ------- 
                theta_mu : the parametr of the mu function. the mu formula is mu = theta_mu.T @ x_mu_s : shape(d, 1)
                theta_sigma : the parametr of the sigma function. the sigma formula is sigma = exp(theta_sigma.T @ x_sigma_s)
                alpha_theta : scalar : learning rate in learning of the theta (theta is the policy parameter)
                state_coding : the function which is used to code the state. state_coding ∈ {'rbf_state_coder', 'featurize_state'}
                rbf_c, rbf_cov : if state_coding=='rbf_state_coder' these arguments are used in state coding function.
        '''
        if (theta_mu == None) and (theta_sigma == None):
            self.theta_mu = np.zeros((rbf_c.shape[0], 1))                       # policy parameter for theta
            self.theta_sigma = np.zeros((rbf_c.shape[0], 1))
        else:
            self.theta_mu = theta_mu                # policy parameter for theta
            self.theta_sigma = theta_sigma
        
        self.alpha_theta = alpha_theta
        self.state_coding = state_coding
        self.rbf_c = rbf_c                      # rbf parameter for state coding
        self.rbf_cov = rbf_cov                  # rbf parameter for state coding
        self.x_s = None
        
    def predict(self, state):
        '''
            take the state and provide the action; 
            action is a smaple from a normal distribution with parametrized mu and sigma with respect to theta_mu and theta_sigma. 
            input
            -----
                state : shape (2, 1) 
            ouput
            ------
                action : shape (1,)
            '''
        # coding of the state for mu and sigma
        if self.state_coding == 'rbf_state_coder':        # code the state using rbf_state_coder
            x_mu_s = rbf_state_coder(s=state, rbf_c=self.rbf_c, rbf_cov=self.rbf_cov)
            x_sigma_s = x_mu_s
        elif self.state_coding == 'featurize_state':      # code the state using featurize_state
            x_mu_s = featurize_state(state)
            x_sigma_s = x_mu_s
        else:
            raise Exception("state_coding is not valid.")
            
        # calculate the mu and the sigma in the state
        self.mu = self.theta_mu.T @ x_mu_s
        self.sigma = np.exp(self.theta_sigma.T @ x_sigma_s)     
                # self.mu.shape = (1, 1) and self.sigma.shape = (1, 1)
            
        # take a sample from the normal dist; the sample is our action; the normal dist is the parametrized policy; 
        action = default_rng().normal(loc=self.mu[0], scale=self.sigma[0], size=1)
        # clip the the in range [0, 1] and save it on the action
        np.clip(action, a_min=0, a_max=1, out=action)
        return action

    def update(self, *, state, td_error, action):
        '''
            update policy parameters theta_mu and theta_sigma
            Note: because in the actor critic function, update method is called after predict method on same state, 
                  I use mu and sigma  which have been calculated in the predict method.
            
            input
            --------
                state : np array : shape(2, 1) : 
                td_error : scalar ; td_error = reward + discount_factor * value_next - value_state
                action : shape (1,) : action which has been taken
        '''
        # coding of the state for mu and sigma
        if self.state_coding == 'rbf_state_coder':        # code the state using rbf_state_coder
            x_mu_s = rbf_state_coder(s=state, rbf_c=self.rbf_c, rbf_cov=self.rbf_cov)
            x_sigma_s = x_mu_s
        elif self.state_coding == 'featurize_state':      # code the state using featurize_state
            x_mu_s = featurize_state(state)
            x_sigma_s = x_mu_s
            
        # update theta_mu and theta_sigma based on formula: θ = θ + α_θ * I * 𝛿 * ∇ln(π(A|S, θ)),  𝛿 = td_error
        self.theta_mu = self.theta_mu + self.alpha_theta * td_error * (1/(self.sigma[0, 0])**2)*(action[0] - self.mu[0, 0]) * x_mu_s
        self.theta_sigma = self.theta_sigma+self.alpha_theta*td_error*(((action[0] - self.mu[0, 0])**2/self.sigma[0, 0]**2) - 1) * x_sigma_s
   

In [25]:
def actor_critic(*, env, estimator_policy, estimator_value, num_episodes, discount_factor=0.97):
    """
        Actor Critic Algorithm. Optimizes the policy 
        function approximator using policy gradient.

        input:
        -------
            env: OpenAI environment.
            estimator_policy: an instance of PolicyEstimatory class
            estimator_value: an instance of ValueEstimator calss
            num_episodes: Number of episodes to run
            discount_factor : 

        output:
        -------
            sum_reward_per_episode : np array : shape(num_episodes,) : each row contains sum of reward for corresponding episode
    """
    sum_reward_per_episode = np.zeros(num_episodes)   
    avg_reward_per_episode = np.zeros(num_episodes)
    
    # iterate over episodes
    for i_episode in range(num_episodes):
        info_per_episode = {'episod_number': i_episode, 'reward_episode': [], 'avg_reward': None}
        # Reset the environment and pick the first action
        state = env.reset()
        
        # One step in the environment
        for t in itertools.count():            
            # Take a step
            action = estimator_policy.predict(state)
            next_state, reward, done, info = env.step(action)
        
            # Update statistics            
            sum_reward_per_episode[i_episode] += reward
            info_per_episode['reward_episode'].append(reward)
            
            # Calculate TD Target
            value_state = estimator_value.predict(state)
            value_next = estimator_value.predict(next_state)
            td_target = reward + discount_factor * value_next
            
            td_error = td_target - value_state
            
            # Update the value estimator
            estimator_value.update(state=state, value_state=value_state, target=td_target)
            
            # Update the policy estimator;   using the td error as our advantage estimate
            estimator_policy.update(state=state, td_error=td_error, action=action)
        
            print(f"\rStep {t} @ Episode {i_episode+1}/{num_episodes} -- sum_Episode_reward ({sum_reward_per_episode[i_episode]:.2f})", end='')
            if done:
                # update the avg_reward in info_per_episode
                info_per_episode['avg_reward'] = sum(info_per_episode['reward_episode'])/len(info_per_episode['reward_episode'])
                print(f'----> Episode {i_episode+1} -- avg reward {info_per_episode["avg_reward"]:.2f}')
                avg_reward_per_episode[i_episode] = info_per_episode['avg_reward']
                # plot the episdoe details and save the picture
                # plot_episode_info_2(info_per_episode)
                break
                
            state = next_state

    return sum_reward_per_episode

In [26]:
# the matrix which includes the rbf's centers. each row is one center
rbf_c = np.array(list(itertools.product(
    [-1.2, -1.1, -1, -0.9, -0.8, -0.75, -0.7, -0.65, 0.6, -0.55, -0.5, -0.45, -0.4, -0.35, -0.3, 0.25, -0.2, -0.15, -0.1, -0.05, 0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6], 
    [-0.07, -0.0652, -0.0604, -0.0556, -0.0508, -0.046, -0.0412, -0.0364, -0.0316, -0.0268, -0.022, -0.0172, -0.0124, -0.0076, -0.0028, 0.002, 0.0068, 0.0116, 0.0164, 0.0212, 0.026, 0.0308, 0.0356, 0.0404, 0.0452, 0.05, 0.0548, 0.0596, 0.0644, 0.0692])))
# the covariance matrix which includes the rbf's covariances. each row is the covariance for the coresponding center.
rbf_cov = np.array([[0.06, 0.0001]]*rbf_c.shape[0])

# used to turn of 
gym.logger.set_level(40)
num_episodes = 2
# make the environment
env = gym.make('MountainCarContinuous-v0')
policy_estimator = PolicyEstimator(state_coding='rbf_state_coder', rbf_c=rbf_c, rbf_cov=rbf_cov)
value_estimator = ValueEstimator(state_coding='rbf_state_coder', rbf_c=rbf_c, rbf_cov=rbf_cov)
sum_reward_per_episode = actor_critic(env=env, estimator_policy=policy_estimator, estimator_value=value_estimator, 
                                      num_episodes=num_episodes, discount_factor=0.98)

Step 998 @ Episode 1/2 -- sum_Episode_reward (-24.89)Episode 1 --- avg reward -0.024916755542655097
Step 998 @ Episode 2/2 -- sum_Episode_reward (-26.43)Episode 2 --- avg reward -0.026453537003343016


In [None]:
# save w and theta_mu and theta_sigma in a .npy file

parameters = {'theta_mu': policy_estimator.theta_mu, 'theta_sigma': policy_estimator.theta_sigma,
                'weight': value_estimator.w, 'sum_R_per_E': sum_reward_per_episode}

if not os.path.exists('Actor-critic/value-policy-params/'):
    os.makedirs('Actor-critic/value-policy-params/')

cwd2 = os.getcwd()
path2 = os.path.join(cwd2, 'Actor-critic/value-policy-params/')
# save parameter in a .npy file 
np.save(path2+f'theta_&_w.npy', parameters)

## A2c stable-baselines3

In [None]:
from stable_baselines3 import A2C

# make the A2C model
a2c_model = A2C("MlpPolicy", env, n_steps=50000, verbose=1)
a2c_model.learn(total_timesteps=100000)
a2c_model.save("a2c_mountain_car_continuous")

In [None]:
# test the trained model
obs = env.reset()
while True:
    action, _states = a2c_model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()