In [1]:
import gym
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as img

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
def reward(obs, r):
    return np.float32(r)

In [3]:
def getAction(obs):
    if obs[2] < 0:
        return 0  # left
    else:
        return 1  # right


In [4]:
def getAction(obs):
    pos = obs[0]
    ang = obs[2]
    if 10*ang - pos < 0:
        return 0  # left
    else:
        return 1  # right


In [5]:
def getAction(obs):
    pos = obs[0]
    vel = obs[1]
    ang = obs[2]
    if 10*ang + pos - vel < 0:
        return 0  # left
    else:
        return 1  # right

In [6]:
def simpleHeuristic(env):
    obs = env.reset()
    # obs = [x_pos, x_vel, angle, pole tip speed]
    # episode ends (done = True) when obs[2] > 0.20944 (12 degree)
    i = 1
    done = False
    ret = 0
    while not done:
        env.render()
        action = getAction(obs)
        # action = env.action_space.sample()
        obs, r, done, info = env.step(action)
        r = reward(obs, r) 
        ret += r
    print (f"Score = {ret}")

In [7]:
env = gym.make('CartPole-v0').unwrapped
simpleHeuristic(env)
env.close()

Score = 347.0


In [8]:
env.close()

In [9]:
class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def segTuple(self, s, a, r, sp, done):
        seg = (s, a, r, sp, done)
        return seg

    def push(self, s, a, r, sp, done):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = self.segTuple(s,a,r,sp,done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

    def clear(self):
        self.position = 0

    def getLast(self):
        p = self.position - 1
        if p < 0:  p = self.capacity - 1
        return self.memory[p]

In [10]:
class AgentBase:
    def __init__(self, env):
        self.env = env
        self.eps = 0.5
        self.dimState = env.observation_space.shape[0]
        self.dimAction = env.action_space.n
        self.gamma = 0.99
        self.rb = ReplayMemory(50000)

    def getStateRep(self, obs):
        return obs

    def getQ(self, s):
        return self.Q[s]

    def piGreedy(self, state):
        q = self.getQ(state)
        a = q.argmax()
        return a

    def getAction(self, state):  # eps-greedy policy
        # eps-greedy
        if np.random.random() < self.eps:
            a = np.random.choice(self.dimAction)
            self.eps *= 0.99
            # print (f"exploration move, eps={self.eps}")
        else:
            a = self.piGreedy(state)
        return a

    @staticmethod
    def getSigmoid(z):
        exp_z = np.exp(z)
        return exp_z / (1+exp_z)

    @staticmethod
    def getSoftmax(z, tau=1):
        exp_z = np.exp((z - np.max(z)) / tau)
        return exp_z / exp_z.sum()

    def piSoftmax(self, state):
        q = self.getQ(state)
        p = self.getSoftmax(q).reshape(-1)
        a = np.random.choice(self.dimAction, p=p)
        return a

    def runEpisode1(self, saveRB=True, maxStep=200, render=False):
        obs = self.env.reset()
        s = self.getStateRep(obs)
        done = False
        ret = 0
        nStep = 0
        while not done:
            if render:
                self.env.render()
            a = self.getAction(s)
            # a = self.piSoftmax(s)
            obs, r, done, info = self.env.step(a)
            r = reward(obs, r)
            ret += r
            sp = self.getStateRep(obs)
            if saveRB:
                self.rb.push(s, a, r, sp, done)
            s = sp
            nStep += 1
            if nStep >= maxStep:  # 500 for v1
                break
        self.nStep = nStep
        return ret
    
    
    def runTest(self, nEpisode=1, maxStep=1000):
        eps = self.eps
        self.eps = 0
        i = 0
        while i < nEpisode:
            i += 1
            ret = self.runEpisode1(saveRB=False, maxStep=maxStep, render=True)
            print(f"Test episode {i}, return = {ret} in {self.nStep} steps")
        self.eps = eps

In [11]:
class MLP(torch.nn.Module):
    def __init__(self, lsize):
        super().__init__()
        self.layers = nn.ModuleList()
        self.n_layers = len(lsize) - 1
        for i in range(self.n_layers):
            self.layers.append(torch.nn.Linear(lsize[i], lsize[i+1]))
            # self.layers.append(nn.BatchNorm2d(lsize[i]))

    def forward(self, x):
        x = x.reshape(x.size(0), -1)
        for i in range(self.n_layers):
            x = self.layers[i](x)
            if i < self.n_layers-1:
                # x = torch.tanh(x)
                x = F.relu(x)
        return x  # softmax is done within F.cross_entropy
        # return F.softmax(x, dim=-1)

    def save(self, fn):
        torch.save(self.state_dict(), fn)

    def load(self, fn):
        self.load_state_dict(torch.load(fn))
