In [5]:
#https://www.janisklaise.com/post/rl-policy-gradients/

import gym
import numpy as np

In [13]:
class LogisticPolicy:

    def __init__(self, teta, alfa, gamma):
        # Init params
        self.teta = teta # weights?
        self.alfa = alfa # learning rate
        self.gamma = gamma # discount factor for rwds

    def logistic(self, y):
        # Logistic/sigmoid function
        return 1 / (1 + np.exp(-y))
    
    def probs(self, x):
        # Return the probs between the 2 actions of the pole
        # Matrix mult between input and weights
        y = np.dot(x, self.teta) # equals to @ op
        prob_zero = self.logistic(y)

        # Return the probs array for the 2 actions Sum = 1
        return np.array([prob_zero, 1-prob_zero])

    def act(self, x):
        # Sample an action from probs
        probs = self.probs(x)
        action = np.random.choice([0, 1], p=probs)

        # Return the action and the probs of that action
        return action, probs[action]

    def grad_log_p(self, x):
        # Compute grad_log_probs
        y = np.dot(x, self.teta)
        # TODO: understand these 2 lines
        grad_log_p_zero = x - x*self.logistic(y)
        grad_log_p_one = - x*self.logistic(y)

        return grad_log_p_zero, grad_log_p_one

    def grad_log_p_dot_rewards(self, grad_log_p, actions, discounted_rewards):
        # dot grads with future rewards for each action in episode
        return grad_log_p.T @ discounted_rewards

    def discount_rewards(self, grad_log_p, actions, discounted_rewards):
        # dot gras with future rewards for each action in episode
        # TODO: understand this fnc
        return np.dot(grad_log_p.T, discounted_rewards)

    def discount_rewards(self, rewards):
        # compute temporally adjusted, discounted rewards
        discounted_rewards = np.zeros(len(rewards))
        cumulative_rewards = 0

        for i in reversed(range(0, len(rewards))):
            cumulative_rewards = cumulative_rewards * self.gamma + rewards[i]
            discounted_rewards[i] = cumulative_rewards

        return discounted_rewards


    def update(self, rewards, obs, actions):
        # compute gradients for each action over all obs
        grads = [self.grad_log_p(ob)[action] for ob,action in zip(obs,actions)]
        grad_log_p = np.array(grads)

        # compute temporaly adjusted, discounted rewards
        discounted_rewards = self.discount_rewards(rewards)

        dot = self.grad_log_p_dot_rewards(grad_log_p, actions, discounted_rewards)

        # Gradient ascent
        self.teta += self.alfa*dot

In [None]:
def run_episode(env, policy, render=False):
    ob = env.reset()
    total_reward = 0

    obs = []
    actions = []
    rewards = []
    probs = []

    done = False

    while not done:
        if render:
            env.render()

        obs.append(ob)

        # Get the action to do
        action, prob = policy.act(ob)

        # get next ob and reward
        ob, reward, done, info = env.step(action)

        total_reward += reward
        rewards.append(reward)
        actions.append(action)
        probs.append(prob)


    return totalreward, np.array(rewards), np.array(observations), np.array(actions), np.array(probs)