In [None]:
import numpy as np
import torch
import tensorflow as tf
import gym
import sys
from collections import deque
import random

from explorerl.agents import BaseAgent
from explorerl.utils import *
from explorerl.REINFORCE import *

# Lunar Lander Actor Critic (TD learning)

In [None]:
env = gym.make("LunarLander-v2")
e = EnvRunner(env)

## Tensorflow

In [None]:
class DualPolicyValueHeadTf(tf.keras.Model):
    def __init__(self,input_space,output_space,configs={}):
        super(DualPolicyValueHeadTf, self).__init__()
        value = []
        #value head
        value.append(create_linear_tf(input_dims=input_space,output_dims=1,relu=False))
        self.valuehead = tf.keras.models.Sequential(value)
        #policy head
        policy = []
        policy.append(create_linear_tf(input_dims=input_space,output_dims=output_space,relu=False))
        self.policyhead = tf.keras.models.Sequential(policy)        
        if "softmax" in configs:
            self.policyhead = tf.keras.models.Sequential([self.policyhead,tf.keras.layers.Softmax()])
            
    def call(self,x,training=True):
        value = self.valuehead(x)
        policy = self.policyhead(x)
        return value, policy


In [None]:
class ActorCriticTf(BaseTfAgent):
    def __init__(self,estimator=None,gamma=0.995,learning_rate=0.001, featurizer=None,scaler=None,configs={"softmax"},replay_size=500):
        super(ActorCriticTf, self).__init__(estimator,gamma,learning_rate,featurizer,scaler,configs,replay_size=replay_size)
        self.name = "ActorCriticTf"
    
    def initialize_model(self,observation_space,action_space):
        super(ActorCriticTf, self).initialize_model(observation_space,action_space)          

        def log_loss(model,predictions,targets):
            return -1*(tf.reduce_sum(tf.multiply(tf.math.log(predictions),targets))) + tf.add_n(model.losses)
        
        def value_loss_fn(model,predictions,targets):
            return tf.losses.mean_squared_error(targets,predictions) + tf.add_n(model.losses)
        
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

        def train_step(model,inputs,target,action):
            with tf.GradientTape(persistent=True) as tape:
                value,predictions = model(inputs)
                arr = np.zeros((1,self.action_space))
                arr[0][action] = target - np.max(value)
                arr = tf.stop_gradient(np.array(arr,dtype="float32"))
                policy_loss = log_loss(model.policyhead,predictions,arr)
                value_loss = value_loss_fn(model.valuehead,value,tf.cast(tf.stop_gradient(target),dtype="float32"))
            policy_gradients = tape.gradient(policy_loss,model.policyhead.trainable_variables)
            value_gradients = tape.gradient(value_loss,model.valuehead.trainable_variables)
            optimizer.apply_gradients(zip(policy_gradients,model.policyhead.trainable_variables))
            optimizer.apply_gradients(zip(value_gradients,model.valuehead.trainable_variables))
            del tape
                
        self.model["loss"] = log_loss
        self.model["training_op"] = train_step
        print("Model Created!")

    def train_policy(self):
        return self.stochastic()
    
    def test_policy(self):
        return self.stochastic()
    
    def stochastic(self):
        def act(obs):
            estimator = self.model["estimator"]
            _, probs = estimator(obs)
            if "continuous" not in self.configs:
                return np.random.choice(self.action_space,p=np.array(probs[0])) , probs
        return act
    
    def greedy(self):
        def act(obs):
            estimator = self.model["estimator"]
            probs = estimator(obs)
            return np.argmax(probs[0]) , probs
        return act
    
    def episodal_train_iter(self,policy):
        #has experience memory, but only updates 
        obs_arr = []
        reward_arr = []
        training_op = self.model["training_op"]
        for obs, action, next_obs, reward, done in self.experience_replay:
            value = self.model["estimator"].valuehead(obs)
            next_value = self.model["estimator"].valuehead(next_obs)
            target = reward
            if done == False:
                target += self.gamma*np.max(next_value)
            training_op(self.model["estimator"],obs,target,action)
        self.experience_replay = deque(maxlen=self.replay_size)
    
    def train_iter(self,policy,action,values,obs,next_obs,reward,done):
        training_op = self.model["training_op"]
        value = self.model["estimator"].valuehead(obs)
        next_value = self.model["estimator"].valuehead(next_obs)
        target = reward
        if done == False:
            target += self.gamma*np.max(next_value)
        training_op(self.model["estimator"],obs,target,action)
        
        

In [None]:
a = ActorCriticTf(estimator=DualPolicyValueHeadTf,learning_rate=0.001)

In [None]:
e.train(a,episodes=2500,train_episodal=False)

In [None]:
e.test(a)

### Pytorch

In [None]:
class DualPolicyValueHeadTorch(torch.nn.Module):
    def __init__(self,input_space,output_space,configs={}):
        super(DualPolicyValueHeadTorch, self).__init__()
        value = []
        #value head
        value.append(create_linear_torch(input_dims=input_space[0],output_dims=1,relu=False))
        self.valuehead = torch.nn.Sequential(*value)
        #policy head
        policy = []
        policy.append(create_linear_torch(input_dims=input_space[0],output_dims=output_space,relu=False))
        self.policyhead = torch.nn.Sequential(*policy)        
        if "softmax" in configs:
            self.policyhead = torch.nn.Sequential(*[self.policyhead,torch.nn.Softmax(dim=-1)])
        
    def forward(self,x):
        value = self.valuehead(x)
        policy = self.policyhead(x)
        return value, policy

In [None]:
a = DualPolicyValueHeadTorch([4],2)