Permalink
Switch branches/tags
Nothing to show
Find file Copy path
6b94208 Jan 10, 2018
2 contributors

Users who have contributed to this file

@jaara @Scratchcat1
178 lines (127 sloc) 4.53 KB
# OpenGym CartPole-v0
# -------------------
#
# This code demonstrates use of a basic Q-network (without target network)
# to solve OpenGym CartPole-v0 problem.
#
# Made as part of blog series Let's make a DQN, available at:
# https://jaromiru.com/2016/10/03/lets-make-a-dqn-implementation/
#
# author: Jaromir Janisch, 2016
#--- enable this to run on GPU
# import os
# os.environ['THEANO_FLAGS'] = "device=gpu,floatX=float32"
import random, numpy, math, gym
#-------------------- BRAIN ---------------------------
from keras.models import Sequential
from keras.layers import *
from keras.optimizers import *
class Brain:
def __init__(self, stateCnt, actionCnt):
self.stateCnt = stateCnt
self.actionCnt = actionCnt
self.model = self._createModel()
# self.model.load_weights("cartpole-basic.h5")
def _createModel(self):
model = Sequential()
model.add(Dense(output_dim=64, activation='relu', input_dim=stateCnt))
model.add(Dense(output_dim=actionCnt, activation='linear'))
opt = RMSprop(lr=0.00025)
model.compile(loss='mse', optimizer=opt)
return model
def train(self, x, y, epoch=1, verbose=0):
self.model.fit(x, y, batch_size=64, nb_epoch=epoch, verbose=verbose)
def predict(self, s):
return self.model.predict(s)
def predictOne(self, s):
return self.predict(s.reshape(1, self.stateCnt)).flatten()
#-------------------- MEMORY --------------------------
class Memory: # stored as ( s, a, r, s_ )
samples = []
def __init__(self, capacity):
self.capacity = capacity
def add(self, sample):
self.samples.append(sample)
if len(self.samples) > self.capacity:
self.samples.pop(0)
def sample(self, n):
n = min(n, len(self.samples))
return random.sample(self.samples, n)
#-------------------- AGENT ---------------------------
MEMORY_CAPACITY = 100000
BATCH_SIZE = 64
GAMMA = 0.99
MAX_EPSILON = 1
MIN_EPSILON = 0.01
LAMBDA = 0.001 # speed of decay
class Agent:
steps = 0
epsilon = MAX_EPSILON
def __init__(self, stateCnt, actionCnt):
self.stateCnt = stateCnt
self.actionCnt = actionCnt
self.brain = Brain(stateCnt, actionCnt)
self.memory = Memory(MEMORY_CAPACITY)
def act(self, s):
if random.random() < self.epsilon:
return random.randint(0, self.actionCnt-1)
else:
return numpy.argmax(self.brain.predictOne(s))
def observe(self, sample): # in (s, a, r, s_) format
self.memory.add(sample)
# slowly decrease Epsilon based on our eperience
self.steps += 1
self.epsilon = MIN_EPSILON + (MAX_EPSILON - MIN_EPSILON) * math.exp(-LAMBDA * self.steps)
def replay(self):
batch = self.memory.sample(BATCH_SIZE)
batchLen = len(batch)
no_state = numpy.zeros(self.stateCnt)
states = numpy.array([ o[0] for o in batch ])
states_ = numpy.array([ (no_state if o[3] is None else o[3]) for o in batch ])
p = self.brain.predict(states)
p_ = self.brain.predict(states_)
x = numpy.zeros((batchLen, self.stateCnt))
y = numpy.zeros((batchLen, self.actionCnt))
for i in range(batchLen):
o = batch[i]
s = o[0]; a = o[1]; r = o[2]; s_ = o[3]
t = p[i]
if s_ is None:
t[a] = r
else:
t[a] = r + GAMMA * numpy.amax(p_[i])
x[i] = s
y[i] = t
self.brain.train(x, y)
#-------------------- ENVIRONMENT ---------------------
class Environment:
def __init__(self, problem):
self.problem = problem
self.env = gym.make(problem)
def run(self, agent):
s = self.env.reset()
R = 0
while True:
self.env.render()
a = agent.act(s)
s_, r, done, info = self.env.step(a)
if done: # terminal state
s_ = None
agent.observe( (s, a, r, s_) )
agent.replay()
s = s_
R += r
if done:
break
print("Total reward:", R)
#-------------------- MAIN ----------------------------
PROBLEM = 'CartPole-v0'
env = Environment(PROBLEM)
stateCnt = env.env.observation_space.shape[0]
actionCnt = env.env.action_space.n
agent = Agent(stateCnt, actionCnt)
try:
while True:
env.run(agent)
finally:
agent.brain.model.save("cartpole-basic.h5")