In [0]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
%matplotlib inline

In [2]:
env = gym.make("CartPole-v0").env
env.reset()
action_space = env.action_space.n
state_space = env.observation_space.shape[0]

env.close()



In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [0]:
#neural network for dqn
class Net(nn.Module):
    def __init__(self,state_size,action_size):
        super(Net, self).__init__()

        self.layers = nn.Sequential(
            nn.Linear(state_size,256),
            nn.ReLU(),
            nn.Linear(256,128),
            nn.ReLU(),
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Linear(64,action_size)


        )

    def forward(self,x):

        output = self.layers(x)
        return output

In [0]:
GAMMA = 0.99
BUFFER_SIZE = 100000
class DQNAgent():
    def __init__(self, state_size,action_size,batch_size,GAMMA):
        self.GAMMA = GAMMA
        self.state_size = state_size
        self.action_size = action_size
        self.batch_size = batch_size
        #neural network(only one NN for now)
        self.q_net = Net(state_size,action_size)
        self.train_flag=False    

        self.optimizer = optim.Adam(self.q_net.parameters(),lr=0.001)
        self.memory = ReplayBuffer(10000)

    #writes SARS' to memory and makes backprop
    def step(self,state,action,reward,next_state,done):
        self.memory.push(state,action,reward,next_state,done)

        if self.memory.size()>self.batch_size:
            experience = self.memory.sample(self.batch_size)
            loss = self.learn(experience)
            self.train_flag=True
            return loss

    #returns action for given state
    def act(self,state,epsilon):
        # state = torch.tensor(state).float()
        state = torch.FloatTensor(state)
        # self.q_net.eval()
        # with torch.no_grad():
        #     action_values = self.q_net(state)
        # self.q_net.train()
        action_values = self.q_net(state)
        if random.random()>epsilon:
            return(np.argmax(action_values.detach().numpy()))
        else:
            return np.random.choice(range(self.action_size))

    #backprop
    def learn(self,experiences):
        GAMMA = 0.99

        #experiences - batch from memory SARS'
        states,actions,rewards,next_states,dones = experiences

        #converting to tensors to work with pytorch
        states = torch.FloatTensor(states)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)
        rewards = torch.FloatTensor(rewards)

        #loss function is MSE
        criterion = nn.MSELoss()
        #training mod for neural network
        self.q_net.train()

        #predicted q_values for batch of states and actions
        q_vals=[]
        for state,action in zip(states,actions):
            q_vals.append(self.q_net(state)[action])
        q_vals = torch.stack(q_vals)

        #predicted q_values for next_states
        next_q_vals = torch.max(self.q_net(next_states),1)[0]

        targets = rewards + self.GAMMA * (next_q_vals*(1-dones))

        #loss = (q_values - target_q_values)**2
        loss = criterion(q_vals,targets)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss


In [0]:
class ReplayBuffer:

    def __init__(self,max_size):
        self.max_size = max_size
        self.buffer = []
        self.last_exp=0

    def push(self,state,action,reward,next_state,done):
        experience = (state,action,reward,next_state,done)
        if self.last_exp>self.max_size:
            index = self.last_exp%self.max_size
            self.buffer[index] = experience
        else:
            self.buffer.append(experience)
        self.last_exp+=1

    def sample(self,batch_size):
        state_batch=[]
        action_batch=[]
        reward_batch=[]
        next_state_batch=[]
        done_batch=[]

        batch = random.sample(self.buffer,batch_size)

        for experience in batch:
            state,action,reward,next_state,done = experience
            
            state_batch.append(state)
            action_batch.append(action)
            reward_batch.append(reward)
            next_state_batch.append(next_state)
            done_batch.append(done)

        return (state_batch,action_batch,reward_batch,next_state_batch,done_batch)

    def size(self):
        return len(self.buffer)
        

In [7]:
agent = DQNAgent(4,2,batch_size=32,GAMMA = 0.95)

def training(n_episodes=200, max_t = 2000,eps_start = 1.0,eps_end=0.01,eps_decay=0.996):
    scores=[]
    losses=[]
    epsilon = eps_start

    for epoch in range(max_t):
        state = env.reset()
        epoch_score=0
        for episode in range(n_episodes):
            action = agent.act(state,epsilon)
            next_state,reward,done,info = env.step(action)
            loss = agent.step(state,action,reward,next_state,done)
            
            if agent.train_flag:
                losses.append(loss)
            
            state =next_state
            epoch_score+=reward

            #epsilon_decay
            if epsilon>eps_end:
                epsilon*=eps_decay

            if done:
                scores.append(epoch_score)
                break

        if epoch %100==0 and epoch>0:
            
            mean_loss = np.sum(losses)/len(losses)
            mean_reward = np.sum(scores)/len(scores)
            losses=[]
            scores=[]            
            print('epoch: {0}, mean_loss: {1},mean_reward:{2}, memory_size: {3}, epsilon: {4}'.format(epoch,mean_loss,mean_reward,agent.memory.size(),epsilon))
        scores.append(epoch_score)

    return scores,losses

scores,losses = training()

epoch: 100, mean_loss: 0.05511687323451042,mean_reward:147.609756097561, memory_size: 10001, epsilon: 0.009999536012924389


KeyboardInterrupt: ignored

In [0]:
len(scores)