# Lunar Lander !

## Policy gradient

(c) Fabrice Mulotti

In [4]:
from pathlib import Path

current_path = Path.cwd()
print("Current path:", current_path)
venv_path = Path(sys.prefix)
print("Virtual environment path:", venv_path)

Current path: /home/renato/Git_Ia/ReinforcementLearnig/reinf_learn_work_copy/tp8-PolicyGradient
Virtual environment path: /home/renato/PythonVenvs/venvPolicyGrd


In [8]:
import sys
import time
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Dense,Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam

import matplotlib.pyplot as plt

import os
import sys
sys.path.append("../lib")
import tools

In [9]:
tf.compat.v1.disable_eager_execution()
tf.config.list_physical_devices()

2024-10-30 21:08:27.503651: E external/local_xla/xla/stream_executor/cuda/cuda_driver.cc:152] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

In [11]:
class PolicyGradient():
    def __init__(self,nb_obs,nb_action,learning_rate=0.001,gamma=0.99, layer1=64,layer2=32):
        self.nb_obs=nb_obs
        self.nb_action=nb_action
        self.learning_rate=learning_rate
        self.gamma=gamma
        
        # création du réseau
        self.policy, self.predict = self.buildNN(layer1,layer2)
        
        # mémoire des transitions
        self.histo_states = []
        self.histo_actions =[]
        self.histo_rewards = []
        

        
    def buildNN(self,layer1,layer2):
        """
        Objet : création du réseau neural
        entrée : couches cachées 1 et 2
        sortie : réseau pour entrainement et prédiction
        """

        InputReward = Input(shape=[1])
        InputOBS    = Input(shape=(self.nb_obs,))

         # Hidden layers with ReLU activation
        dense1 = Dense(layer1, activation='relu', name="Dense1")(InputOBS)
        dense2 = Dense(layer2, activation='relu', name="Dense2")(dense1)

        # Output layer with softmax activation for probability output
        proba = Dense(self.nb_actions, activation='softmax', name="OutputProba")(dense2)


        # Model for training (with reward as input)
        policy = Model(inputs=[InputOBS, InputReward], outputs=[proba])
        policy.compile(optimizer=Adam(learning_rate=self.learning_rate), loss=self.loss_function)

        # Model for prediction (without reward as input)
        predict = Model(inputs=[InputOBS], outputs=[proba])

        return policy, predict
    

        
        def loss_function(y_true, y_pred):
            """
            Objet : fonction de perte
            Entrée : y réel et y prédit
            Sortie : loss
            """
            out = tf.keras.backend.clip(y_pred,1e-8,1-1e-8) # on élimine les valeurs extrèmes
            loss_brut = y_true * tf.keras.backend.log(out)
            loss_reward = -loss_brut * InputReward
            print("Loss reward ",loss_reward)
            return tf.keras.backend.sum(loss_reward)
    
        policy=Model(inputs=[InputOBS,InputReward], outputs=[proba])
        policy.compile(optimizer=Adam(learning_rate=self.learning_rate),loss=loss_function)
        predict=Model(inputs=[InputOBS], outputs=[proba])
        
        return policy,predict
                       
    def act(self,state):
        """
        Objet : choisi une action en fonction d'un état et de la distribution des prob
        Entrée : état
        Sortie : action
        """
        # Votre code
         # Get the action probabilities from the model's prediction
        action_probabilities = self.policy.predict(np.array([state]))[0]
        
        # Choose an action based on the probabilities (using np.random.choice)
        action = np.random.choice(len(action_probabilities), p=action_probabilities)
        
        return action
    
    def discount_reward(self,histo_reward):
        """
        Objet : renvoie le gain d'une trajectoire 
        Entrée : historique des récompenses pour chaque transition 
        Sortie : Gain avec dépréciation gamma : np.array pour tracer le gain à chaque step
        """
        # creation d un array a zero memes dimensions que rewards
        discounted = np.zeros_like(histo_reward)
        # votre code
         # Create an array of zeros with the same shape as the input rewards
        discounted = np.zeros_like(histo_reward, dtype=np.float32)
        
        # Initialize the cumulative reward
        cumulative_reward = 0.0
        
        # Calculate the discounted reward for each step in reverse order
        for t in reversed(range(len(histo_reward))):
            cumulative_reward = histo_reward[t] + self.gamma * cumulative_reward
            discounted[t] = cumulative_reward
        
        return discounted
    
    def memory(self,state,action,reward):
        """
        Objet : historisation des transitions
        Entrée : état, action, récompense
        Sortie : none
        """
        self.histo_states.append(state[0])
        self.histo_actions.append(action)
        self.histo_rewards.append(reward)
        
    def train(self):
        """
        Objet : entrainement du model en fonction de la dernière trajectoire
        """
        states_history = np.array(self.histo_states)
        actions_history = np.array(self.histo_actions)
        rewards_history = np.array(self.histo_rewards)

        # transformation de l'historique des action en matrice 0/1
        actions=np.zeros([len(actions_history),self.nb_action])
        actions[ np.arange(len(actions_history)), actions_history] = 1

        # calcul du Gain normalisé
        discounted_reward = self.discount_reward(rewards_history)
        discounted_reward -= np.mean(discounted_reward)
        discounted_reward /= np.std(discounted_reward)
        # print(discounted_reward.shape)
        
        # entrainement
        self.policy.train_on_batch([ states_history, discounted_reward ] , actions)
        self.histo_states = []
        self.histo_actions =[]
        self.histo_rewards = []
        
        

In [12]:
# transformation des données pour compatabilité avec l'alimentation du réseau de neurones (1,s)`
def trans_state(s):
    return  np.reshape(s, [1, nb_obs])

In [None]:
# graf=tools.Grafana("/var/www/html/files/train.csv")

In [14]:
env=gym.make('LunarLander-v3')
nb_obs=env.observation_space.shape[0]
nb_action=env.action_space.n
print(f"Nombre de carateristiques des états : {nb_obs}, nombre d actions possibles : {nb_action}")

Nombre de carateristiques des états : 8, nombre d actions possibles : 4


In [15]:
agent=PolicyGradient(nb_obs,nb_action, learning_rate=0.0001, gamma=0.99,layer1=64,layer2=32 ) # 0.98 l2 96

AttributeError: 'PolicyGradient' object has no attribute 'nb_actions'

In [None]:
checkpoint_path=os.path.join("poids","pp_lunar_save_weights.hp5")

In [None]:
num_episodes = 20000
time_step = 0  # comptage du nombre total de mouvement
histoReturn=[] # pour graphique sur historique récompense

seuilWin=180
win=False

for i in range(num_episodes):
    # somme de la récompense total pour une cycle
    Return = 0
    
    # reset env et conversion state
    state = trans_state(env.reset()[0])
    done=False
    truncated=False
    startTime=time.time()
    moveCount=0
    while not(done or truncated):
        
        # décommenter pour affichage
        # env.render()
    
        time_step += 1
        moveCount += 1
                
        # sélection d'une action selon notre politique
        action = agent.act(state)
 
        # jouer l'action
        next_state, reward, done, truncated , _ = env.step(action)
        next_state=trans_state(next_state) # reformatage

        # on met en memoire
        agent.memory(state,action,reward)
        
        # et shift d'état
        state = trans_state(next_state)
        
        # cumul du retour G
        Return += reward

       
        # Done ?
        if done or truncated:
            # affichage du résultat du cyle
            duration=time.time()-startTime
            print('Episode: ',i, ',' 'Return', np.round(Return,2),', durée ',np.round(duration,2),' seconde en ',moveCount, ' mouvements, end=',truncated)
            # graf.add_record(i,Return,0,moveCount,duration)
            if np.mean(histoReturn[-5:]) > seuilWin:
                print("Yeah ! Gagné !")
                win=True
                
            histoReturn.append(Return)

            agent.train()
            
            # Sauvegarde des poids tous les 50 cycles
            #if i % 50 == 0:
            #    agent.policy.save_weights(checkpoint_path)
                
            break
    if win:
        break
        

In [None]:
plt.figure()
plt.plot(histoReturn)
plt.show()