In [None]:
import matplotlib.pyplot as plt
import gym
import time
import numpy as np
import torch
from collections import deque
import torch.nn as nn
import random
import plotly.graph_objects as go
import pandas as pd

In [None]:
class DeepQAgent:
    def __init__(self,env,state_size=4,action_size=2,discount_factor=0.95,epsilon_greedy=1,epsilon_min=0.01,epsilon_decay=0.995,learning_rate=1e-3,max_memory_size=500):
        super(DeepQAgent,self).__init__()
        self.env=env
        self.epsilon=epsilon_greedy
        self.epsilon_decay=epsilon_decay
        self.learning_rate=learning_rate
        self.epsilon_decay=epsilon_decay
        self.state_size=state_size
        self.action_size=action_size
        self.gamma=discount_factor
        self.epsilon_min=epsilon_min
        self.build_model()
        self.actions=[0,1]
        self.memory=deque(maxlen=max_memory_size)
    def build_model(self):
        self.model=nn.Sequential(nn.Linear(self.state_size,256),
                                nn.ReLU(),
                                nn.Linear(256,128),
                                nn.ReLU(),
                                nn.Linear(128,64),
                                nn.ReLU(),
                                nn.Linear(64,self.action_size))
        self.loss_fn=nn.MSELoss()
        self.optimizer=torch.optim.Adam(self.model.parameters(),self.learning_rate)
    def remember(self,transition ):
        self.memory.append(transition)
    def choose_action(self,state):
        if np.random.rand()<=self.epsilon:

            return np.random.choice(self.actions)
        else:

            with torch.no_grad():
                q_values=self.model(torch.tensor(state,dtype=torch.float32))
                #print(q_values)
                return torch.argmax(q_values).item()
    def learn(self,batch_samples):
        batch_states,batch_targets=[],[]
        for transition in batch_samples:
            s,a,r,next_s,done=transition
            with torch.no_grad():
                if done:
                    target=r
                else:
                    pred=self.model(torch.tensor(next_s,dtype=torch.float32))

                    target=r+self.gamma*pred.max()

                target_all=self.model(torch.tensor(s,dtype=torch.float32))
                target_all[a]=target
            batch_states.append(s)
            batch_targets.append(target_all)
            self.adjust_epsilon()
        self.optimizer.zero_grad()
        pred=self.model(torch.tensor(batch_states,dtype=torch.float32))
        loss=self.loss_fn(pred,torch.stack(batch_targets))
        loss.backward()
        self.optimizer.step()
        return loss.item()
    def adjust_epsilon(self):
        if self.epsilon>self.epsilon_min:
            self.epsilon*=self.epsilon_decay
    def replay(self,batch_size):
        sample=random.sample(self.memory,batch_size)
        return self.learn(sample)

In [None]:
episodes=35
batch_size=32
memory_size=500
env=gym.make('CartPole-v1',render_mode='human')
agent=DeepQAgent(env)
state=env.reset()[0]

In [None]:
print("Filling memory")
for i in range(memory_size):
    action=agent.choose_action(state)
    next_state,reward,done,_,_=env.step(action)
    agent.remember((state,action,reward,next_state,done))
    if done:
        state=env.reset()[0]
    else:
        state=next_state
total_rewards,losses=[],[]

In [None]:
print("Training")
for e in range(episodes):
    state=env.reset()[0]
    rewards=0
    for j in range(500):
        action=agent.choose_action(state)
        next_state,reward,done,_,_=env.step(action)
        agent.remember((state,action,reward,next_state,done))
        state=next_state
        rewards+=1
        if done:
            total_rewards.append(j)
            print(f'Episode: {e} Total reward:{j} Epsilon:{agent.epsilon}')
            break
        loss=agent.replay(batch_size)
        losses.append(loss)
env.close()

In [None]:
import plotly.graph_objects as go
import pandas as pd

def moving_average(x, span=100):
    return pd.DataFrame({'x': np.asarray(x)}).x.ewm(span=span).mean().values
fig=go.Figure()
fig.add_trace(go.Scatter(y=total_rewards,mode='lines+markers',name='Recompensa'))
fig.add_trace(go.Scatter(y=moving_average(total_rewards),name='Tendencia'))
fig.show()