In [None]:
!apt-get update
!apt-get install -y build-essential 
!pip install pybind11 torch matplotlib tqdm gymnasium 
!cd ..; make  # Compile the shared library 

In [None]:
import sys
import os

# Add the root folder or 'lib' folder where the shared library is located
sys.path.append(os.path.abspath('../lib'))  # Adjust according to your setup

import Rocket as rck  # Import the Rocket class from the C++ library
import torch
from torch import nn
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import copy
from tqdm import tqdm
import math

# Define PyTorch device
device = 'cuda' if torch.cuda.is_available() else 'cpu'


In [None]:
class Red(nn.Module):

    def __init__(self,il=4,hl=10,ol=2):

        super().__init__()

        self.network=nn.Sequential(nn.Linear(il,hl),
                                  nn.ReLU(),
                                  nn.Linear(hl,hl),
                                  nn.ReLU(),
                                  nn.Linear(hl,ol))
    def forward(self,x):
        return self.network(x)


In [None]:


class ReplayMemory():

    def __init__(self,D=10000):
        self.deque=deque(maxlen=D)
        self.deque_shuffle=deque(maxlen=D)

    def append(self,x):
        self.deque.append(x)

    def minibatch(self,size=32):

        if(len(self.deque_shuffle)==0):
            self.deque_shuffle=copy.deepcopy(self.deque)
            random.shuffle(self.deque_shuffle)


        while(len(self.deque_shuffle)>0):
            ls=[]


            cont=0

            while cont<size and len(self.deque_shuffle)>0:
                ls.append(self.deque_shuffle[0])
                self.deque_shuffle.popleft()
                cont+=1


            yield np.array(ls)

    def reset(self,D=10000):
        self.deque=deque(maxlen=D)
        self.deque_shuffle=deque(maxlen=D)



In [None]:
class WrapAgent():

    def __init__(self,epsilon=0.7,decay=10):
        self.epsilon=epsilon
        self.decay=decay

        self.rng=np.random.default_rng(234343)


    def choose_action(self,env,state,repetition,Qnet,epsilon_o,step_epsilon,cont_epsilon):

        rand=self.rng.uniform(0,1)
        if(rand<(epsilon-step_epsilon*cont_epsilon) or rand<0.01):
            return env.sample()
        else:
            with torch.inference_mode():

                return torch.argmax(Qnet(torch.from_numpy(state).to(device).unsqueeze(dim=0).type(torch.float32))).detach().to("cpu").numpy()

In [None]:
gamma=0.9999
epsilon=1
decay=100
step_epsilon=0.001
cont_epsilon=0
tau=0.05
batch_size=128

start_train=100

max_steps=6000
total_repetitions=100
episodes=5000

state_length=9

rp_len=15000

rp=ReplayMemory(rp_len)

agent=WrapAgent(epsilon,decay)


Qnet=Red(state_length,90,4).to(device)
target=Red(state_length,90,4).to(device)

target.load_state_dict(Qnet.state_dict())

optimizer=torch.optim.SGD(Qnet.parameters(),lr=3e-4)
loss=nn.MSELoss()
rng=np.random.default_rng(33234)
env=rck.Rocket()




In [None]:
Qnet.load_state_dict(torch.load("../models/net.pth",map_location=torch.device(device)))
target.load_state_dict(torch.load("../models/net.pth",map_location=torch.device(device)))



In [None]:
#epsilon=0.01
#max_steps=10000

In [None]:
rew=[]
t_steps=[]
error_t=[]



for i in range(episodes):

    #replay memory (s,r,a,s')



    with torch.inference_mode():
        reward=0
        time_steps=0

        #empiezo un episodio
        state=env.reset()

        for j in range(max_steps):

            action=agent.choose_action(env,state,i,Qnet,epsilon,step_epsilon,cont_epsilon)

            new=env.step(action)


            state_n=new[0:state_length]
            r=new[state_length]
            done=new[-1]

            reward+=r
            time_steps+=1


            if(done==0):
                rp.append(
                    np.hstack((state,r,np.array(action),state_n,done))
                )


                break
            else:

                rp.append(
                    np.hstack((state,r,np.array(action),state_n,done))
                )

                state=state_n



    rew.append(reward)
    t_steps.append(time_steps)

    error=0


    if(i>=start_train):

        for batch in rp.minibatch(batch_size):

            Qnet.train()

            #Estimo el q-value del estado con la red
            state=torch.from_numpy(batch[:,:state_length]).type(torch.float32).to(device)




            finish_states=torch.from_numpy(batch[:,-1]).type(torch.float32).unsqueeze(dim=0).to(device)
            sample_space=np.arange(4)


            actions=torch.vstack(
                tuple(
                    torch.from_numpy(sample_space==action_selected) for action_selected in batch[:,state_length+1]
                )
            ).to(device)



            q_values_qnet=Qnet(state)[actions].unsqueeze(dim=0)




            #Estimo el q_value del estado usando la ecuaciÃ³n de Bellman y la target net
            state_n=torch.from_numpy(batch[:,state_length+2:-1]).type(torch.float32).to(device)
            reward=torch.from_numpy(batch[:,state_length+1]).unsqueeze(dim=0).type(torch.float32).to(device)





            q_values_target=reward+gamma*torch.max(target(state_n),dim=1)[0]*finish_states




            err_train=loss(q_values_target,q_values_qnet)

            error+=(err_train.detach().to("cpu").numpy())


            optimizer.zero_grad()
            err_train.backward()

            optimizer.step()

            Qnet.eval()


        error_t.append(error)

        print(f"Process, {i*100/episodes:03.2f} %,  Episode {i:04d}, Steps {t_steps[-1]:03.0f}, Reward {rew[-1]:03.0f}, Error {error_t[-1]:09.5f}, Epsilon {epsilon-step_epsilon*cont_epsilon if epsilon-step_epsilon*cont_epsilon>0.01 else 0.01:03.3f}")

        if(i%1==0):
            #target.load_state_dict(Qnet.state_dict())

            Qnet_dict=Qnet.state_dict()
            target_dict=target.state_dict()

            for entrada in target_dict.keys():
                target_dict[entrada]=tau*Qnet_dict[entrada]+(1-tau)*target_dict[entrada]

            target.load_state_dict(target_dict)
            torch.save(Qnet.state_dict(),f="../models/net.pth")

        cont_epsilon+=1

In [None]:
torch.save(Qnet.state_dict(),f="../models/net.pth")

In [None]:
torch.cuda.empty_cache()

In [None]:
import gc
gc.collect()

In [None]:

del loss
del optimizer
#del rp

In [None]:
plt.figure(1,figsize=(10,6))

plt.plot(np.arange(len(error_t)),error_t)

In [None]:
plt.figure(1,figsize=(10,6))

plt.hist(t_steps,bins=100)

In [None]:
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Rectangle
%matplotlib inline
plt.rcParams["animation.html"] = "jshtml"
plt.rcParams['figure.dpi'] = 150
plt.rcParams['animation.embed_limit'] = 2**128
plt.ioff()

fig, ax = plt.subplots(figsize=(5,5))

state=env.reset()
flag=True

def animate(t):

    global state
    global flag

    ax.clear()
    xlim=10
    ylim=10
    plt.tight_layout()


    with torch.inference_mode():
      action=torch.argmax(Qnet(torch.from_numpy(state[:state_length]).unsqueeze(dim=0).type(torch.float32).to(device))).detach().to("cpu").squeeze().item()


    new=env.step(action)

    state=new[0:state_length]
    r=new[4]
    done=new[5]


    x_cm=state[0]
    y_cm=state[1]

    phi=state[2]


    if(action==1 and flag==False):
        flag=True
    elif(action==1 and flag==True):
        flag=False


    ax.set_ylim(-ylim,ylim)

    ax.set_xlim(-xlim,xlim)


    x1=(-0.25)*np.sin(phi)+(-2)*np.cos(phi)
    x2=(0.25)*np.sin(phi)+(-2)*np.cos(phi)
    x3=(-2-0.5)*np.cos(phi)
    x4=(-0.25)*np.sin(phi)+(-2)*np.cos(phi)

    y1=-(-0.25)*np.cos(phi)+(-2)*np.sin(phi)
    y2=-(+0.25)*np.cos(phi)+(-2)*np.sin(phi)
    y3=-(-2-0.5)*np.sin(phi)
    y4=-(-0.25)*np.cos(phi)+(-2)*np.sin(phi)




    rect = Rectangle((x_cm-0.25, y_cm-2), width=0.5, height=4,angle=phi*180/np.pi,rotation_point="center", edgecolor='blue', facecolor='lightblue')

    ax.add_patch(rect)

    if(flag):
        plt.plot(np.array([-y1+x_cm,-y2+x_cm,x_cm+2.5*np.sin(phi),-y4+x_cm]),np.array([x1+y_cm,x2+y_cm,y_cm-2.5*np.cos(phi),x4+y_cm]),color="red",linewidth=1.0)
        #plt.plot([(x_cm)*np.sin(phi)+(y_cm-2)*np.cos(phi)],[(x_cm)*np.cos(phi)+(y_cm-2)*np.sin(phi)],color="red",linewidth=1.0)


anim=FuncAnimation(fig, animate, frames=1000,interval=40)

In [None]:
anim.save(
    "animacion.gif",
    writer="pillow",
    fps=20
)


In [None]:
del anim