### Lunar Lander Environment
Observation Space: 8-dimensional vector: the coordinates of the lander, its linear velocities, its angle, angular velocity, and bools representing whether each leg is touching the ground  
[x, y, vx, vy, angle, angle_vel, left_leg_on_ground, right_leg_on_ground]  
Action Space: 4 Discrete actions: do nothin, fire left orinetation engine, fire main engine, fire right orientation engine.



### Theory
V(s) = max(Q(s,a)  
Q(s,a) = R(s,a) + gV(s')  
p(s) = max Q(s, a)

In [1]:
import gym

### Imports


In [2]:
import numpy as np
import gym
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import sys
from collections import deque
import random

In [9]:
class LLAgent:
    """A learning Agent for the Lunar Landar environment"""
    
    def __init__(self, env = gym.make("LunarLander-v2", render_mode = "human"), max_memory: int = 10_000):
        """
        Create a new LLAgent. Initialize the q_network and the target_q_network
        
        Params:
        env:
            the environment that the agent will learn from
        max_memory: int
            the size of the memory space for the learning algorithm
        """
        
        self.env = env
        self.q_function_input_size = env.observation_space.shape[0] + 1
        self.q_function_output_size = 1
        
        self.q_function = self.get_q_function(self.q_function_input_size,
                                              self.q_function_output_size)
        self.q_target_function = self.get_q_function(self.q_function_input_size,
                                              self.q_function_output_size)
        self.replay_memory = deque(maxlen = max_memory) # Memory of items stored in the format [state, action, reward, next state, terminated]
        
    def get_optimum_action(self, state: np.ndarray, q_function: keras.Model) -> int:
        """
        Return an optimal action based on the state
        
        Params:
        state: np.ndarray
            the numpy array representing the state for wchich an optimal action is needed
        q_function: keras.Model
            the model that will be used to predict the reward of each action
        
        Return:
        int:
            the int that is the optimal action to take
        """
        states_actions = np.tile(np.append(state, [0]), (4, 1))
        states_actions[:, -1] = np.array([0, 1, 2, 3]) * 0.25
        return np.argmax(q_function.predict(states_actions))
        
    
    def train_model(self, training_batch_size: int = 32):
        """
        Based on new history data, train the model
        
        Params:
        training_batch_size: int
            the number of items to train over
        """
        
        indices = np.random.choice(len(self.replay_memory), training_batch_size)
        xs = []
        ys = []
        for index in indices:
            # xs.append(self.replay_memory[index][0].append(self.replay_memory[index][1]))
            xs.append(np.append(self.replay_memory[index][0][0], np.array([self.replay_memory[index][1]])))
            if self.replay_memory[index][4] == True:
                r = 0
            else:
                r = self.replay_memory[index][2]
            
            predicted_reward = self.q_target_function.predict(np.array([np.append(self.replay_memory[index][0][0], np.array([self.replay_memory[index][1]]))]), verbose = 0)[0] + r
            ys.append(np.array([predicted_reward]))

        self.q_function.fit(np.array(xs), np.array(ys))
        
        
        
    
    def train(self, trajectories: int, max_timesteps: int = None, steps_random: int = 20, epsilon = 0.4, training_period: int = 4, training_batch_size: int = 32, update_target_period: int = 200):
        """
        Run the model for a specified number of trajectories, training the model as it goes.
        
        Params:
        trajectories: int
            the number of trajectories to run
        max_timesteps: int
            the max number of steps any single trajectory should take. If set to None, the trajectory will go till it has reached a terminal state
        steps_random: int
            the number of steps for which the agent should act completly randomly at the start of each episode
        epsilon: int
            the probability of making a random 
        training_period: int
            the number of steps between each training
        training_batch_size: int
            the number of samples that will be trained over each training session
        """
        
        current_total_step = 0
        
        for _ in range(trajectories):
            state = self.env.reset()
            generate_trajectory = True
            current_step = 0
            
            while generate_trajectory:
                # Generate action
                if current_step < steps_random:
                    action = self.random_action()
                else:
                    make_random = self.epsilon_greedy_policy(epsilon)
                    if make_random:
                        action = self.random_action()
                    else:
                        action = self.get_optimum_action(state = state, q_function = self.q_function)
                
                next_state, reward, terminated, truncated, info = self.env.step(action)
                self.replay_memory.append([np.array(state), action, reward, next_state, terminated])
                self.env.render()
                
                if current_step % training_period == 0 and len(self.replay_memory) > training_batch_size:
                    # Train
                    self.train_model(training_batch_size = training_batch_size)
                state = next_state
                current_step += 1
                print(current_step)
                
                generate_trajectory = (max_timesteps is None or current_step < max_timesteps) and  not terminated

                
                if current_total_step % update_target_period == 0:
                    self.q_target_function = keras.models.clone_model(self.q_function)
                
    
    # HELPER METHODS
    def get_q_function(self, 
                         input_size: int, 
                         output_size: int,
                         num_layers: int = 3,
                         layer_sizes: list[int] = [64, 32, 16],
                         activation: str = "relu") -> keras.Model:
        """
        Create a neural net to represent the q-function
        
        Params:
        input_size: int
            the size/dimensions of the function input (should be the shape of the observation space)
        output_size: int
            the size/dimensions of the function output (should be the shape of the action space)
        num_layers: int
            the number of hidden layers in the neural network
        layer_sizes: list[int]
            the sizes of each hidden layer: [hidden layer 1 size, hiddden layer 2 size...hidden layer -num-layyers- size]
        activation: str
            the activation function of the neural network
        """
        # Assertions
        assert num_layers == len(layer_sizes), f"Number of layers must be the same as the length of layer sizes: num: {num_layers} != sizes: {len(layer_sizes)}"
        
        # Build Neural Net
        inputs = layers.Input(shape=(input_size,)) 
        layer = inputs
        for layer_num in range(len(layer_sizes)):
            layer = layers.Dense(layer_sizes[layer_num], activation = "relu")(layer)
        output = layers.Dense(output_size, activation = "sigmoid")(layer)
        
        model = keras.Model(inputs = inputs, outputs = output)
        model.compile(optimizer = "adam", loss = "mean_squared_error")
        # model.summary()
        return model
    
    def random_action(self) -> int:
        """Return a random number in the range [0, 3], representing a random action"""
        return random.randint(0, 3)
    
    def epsilon_greedy_policy(self, epsilon: float) -> bool:
        """
        Return true epsilon% of the time, otherwise return false
        epsilon: float
            the percent of time that the function should return random
        """
        if random.uniform(0, 1) < epsilon:
            return True
        return False
        
        
    
        

In [10]:

a = LLAgent()
a.train(100)

  self.replay_memory.append([np.array(state), action, reward, next_state, terminated])


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


ValueError: in user code:

    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2041, in predict_function  *
        return step_function(self, iterator)
    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2027, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 2015, in run_step  **
        outputs = model.predict_step(data)
    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\training.py", line 1983, in predict_step
        return self(x, training=False)
    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\purs0007\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\keras\engine\input_spec.py", line 295, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "model_6" is incompatible with the layer: expected shape=(None, 9), found shape=(None, 2)


In [None]:
generate_trajectory