# Test environnement Tracking avec CACLA

Reproduction de l'environnement de test pour le papier https://dspace.library.uu.nl/bitstream/handle/1874/25514/wiering_07_reinforcementlearning.pdf

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import gym
import numpy as np
import random
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import sys
 
# ajout de la classe Tracking
sys.path.insert(0, '../')
from Tracking import Tracking

## hyper paramètres

In [None]:
observation_space = 4
action_space = 2

discount_factor = 0.99
learning_rate_critic =   2.5e-4  # 3e-2  
learning_rate_actor =  8.5e-4
sigma = [0.1,0.1]
beta = 0.001
var_0 = 1
nb_episode = 2000
test_frequency = 10
nb_tests = 1

## Apprentissage

In [None]:
env = Tracking()
env.reset()

class ActorNetwork(nn.Module) :
    
 
    def __init__(self) -> None:
        super().__init__()


        self.net = nn.Sequential(
            nn.Linear(observation_space, 64),
            nn.Sigmoid(),
            nn.Linear(64, 12),
            nn.Sigmoid(),
            nn.Linear(12,action_space)
        )

    def forward(self,x):
        return self.net(x)
    
    
class CriticNetwork(nn.Module) :
    
    def __init__(self) -> None:
        super().__init__()


        self.net = nn.Sequential(
            nn.Linear(observation_space, 12),
            nn.Sigmoid(),
            nn.Linear(12,1)
        )

    def forward(self,x):
        return self.net(x)

def test(actor) : 
    cum_rewards = list()
    state = env.reset()
    done = False
    while not done :
        state_t = torch.as_tensor(state , dtype=torch.float32)
        action =  actor(state_t).detach().numpy()
        new_state, reward, done = env.step(action)
        cum_rewards.append(reward)
        state = new_state
    return sum(cum_rewards) / len(cum_rewards)


actor = ActorNetwork()
critic = CriticNetwork()

best_model = ActorNetwork()
best_value = -1e10

optimizer_critic = torch.optim.Adam(critic.parameters(),lr=learning_rate_critic)

list_rewards_mean_4 = list()
list_rewards_std_4 = list()

for episode in tqdm(range(nb_episode)) : 
    
    var = var_0
    
    state = env.reset()

    done = False
    
    while not done :
        
        state_t = torch.as_tensor(state , dtype=torch.float32)
        
        action = torch.as_tensor(np.array(np.random.normal(loc=actor(state_t).detach().numpy(),scale=sigma,size=(1,action_space))),dtype=torch.float32)[0].detach().numpy()
       
        new_state, reward, done = env.step(action)
        
        new_state_t = torch.as_tensor(new_state , dtype=torch.float32)
        reward_t = torch.as_tensor(reward , dtype=torch.float32)
        
        
        with torch.no_grad():
            td_error = reward_t + discount_factor * (1 - done) * critic(new_state_t) - critic(state_t)
        
        # learning critic
        loss_critic = - td_error.detach() * critic(state_t)

        optimizer_critic.zero_grad()
        loss_critic.backward()
        optimizer_critic.step()
        
        if td_error > 0 :
            
            optimizer_actor = torch.optim.Adam(actor.parameters(),lr=learning_rate_actor)
            action_t = torch.as_tensor(action , dtype=torch.float32)
            
            # learning actor
            loss_actor = ( (action_t - actor(state_t).detach()) * actor(state_t) ).mean()

            optimizer_actor.zero_grad()
            loss_actor.backward()
            optimizer_actor.step()
        
        state = new_state
        
    if episode % test_frequency == 0 :    
        list_tests = list()
        for t in range(nb_tests) :
            list_tests.append(test(actor))
            
        list_tests = np.array(list_tests)
        
        print(f"test episode : {episode} - mean value : {list_tests.mean()} - best value : {best_value}")

        if list_tests.mean() > best_value :
            best_value = list_tests.mean()
            best_model.load_state_dict(actor.state_dict())
        
        list_rewards_mean_4.append( list_tests.mean() )
        list_rewards_std_4.append( list_tests.std() )

## Affichage des rewards

In [None]:
plt.figure(figsize=(10,7))
list_mean1_4 = np.array(list_rewards_mean_4) + np.array(list_rewards_std_4)
list_mean2_4 = np.array(list_rewards_mean_4) - np.array(list_rewards_std_4)
plt.fill_between(np.arange(0,nb_episode,test_frequency),list_mean1_4,list_mean2_4, color = 'salmon', label = 'std reward')
plt.plot(np.arange(0,nb_episode,test_frequency),list_rewards_mean_4, c= 'r',label = 'mean reward')
plt.legend()
plt.xlabel('episode')
plt.ylabel('reward')
plt.title('Continuous Actor Critic (CACLA) - Tracking - Rewards')
plt.show()

## Démonstration de la simulation de l'agent dans l'environement

In [None]:
state = env.reset()
done = False

list_x_agent = list()
list_y_agent = list()
list_x_target = list()
list_y_target = list()

nb_iter = 0
r = 0.0
while not done :
    state_t = torch.as_tensor(state , dtype=torch.float32)
    action =  best_model(state_t).detach().numpy()
    new_state, reward, done = env.step(action)
    r += reward
    list_x_agent.append(env.agent[0])
    list_y_agent.append(env.agent[1])
    list_x_target.append(env.target[0])
    list_y_target.append(env.target[1])
    state = new_state
    nb_iter += 1
    
print(f"iteration : {nb_iter}, reward : ",(r/300))
plt.figure()
plt.scatter(list_x_agent,list_y_agent , label="agent")
plt.scatter(list_x_target,list_y_target, label='target')
plt.legend()
plt.show()
print((env.agent[0] - env.target[0])**2 + (env.agent[1] - env.target[1])**2)

## Etape par étape

In [None]:
state = env.reset()
done = False

list_x_agent = list()
list_y_agent = list()
list_x_target = list()
list_y_target = list()

iteration = 0
while not done :
    state_t = torch.as_tensor(state , dtype=torch.float32)
    action =  best_model(state_t).detach().numpy()
    new_state, reward, done = env.step(action)
    state = new_state
    iteration += 1
    
    plt.figure(figsize=(4,4))
    # plt.scatter(list_x_target[0],list_y_target[0], color="red",label="first position target")
    plt.scatter(env.agent[0] ,env.agent[1], label="agent")
    plt.scatter(env.target[0],env.target[1], label='target')
    rect=mpatches.Rectangle((4,5),5,1, 
                            fill=False,
                            color="purple",
                           linewidth=2)
                           #facecolor="red")
    plt.gca().add_patch(rect)
    plt.xticks([0, 2, 4, 6, 8, 10])
    plt.yticks([0, 2, 4, 6, 8, 10])
    plt.legend()
    plt.show()