forked from gsurma/cartpole
-
Notifications
You must be signed in to change notification settings - Fork 11
/
cartpole.py
104 lines (85 loc) · 3.26 KB
/
cartpole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import random
import gym
import numpy as np
from collections import deque
from sklearn.multioutput import MultiOutputRegressor
from lightgbm import LGBMRegressor
from scores.score_logger import ScoreLogger
from sklearn.model_selection import train_test_split
ENV_NAME = "CartPole-v1"
GAMMA = 0.95
LEARNING_RATE = 0.001
MEMORY_SIZE = 1000
BATCH_SIZE = 20
EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.05
EXPLORATION_DECAY = 0.96
class DQNSolver:
def __init__(self, observation_space, action_space):
self.exploration_rate = EXPLORATION_MAX
self.action_space = action_space
self.memory = deque(maxlen=MEMORY_SIZE)
self.model = MultiOutputRegressor(LGBMRegressor(n_estimators=100, n_jobs=-1))
self.isFit = False
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() < self.exploration_rate:
return random.randrange(self.action_space)
if self.isFit == True:
q_values = self.model.predict(state)
else:
q_values = np.zeros(self.action_space).reshape(1, -1)
return np.argmax(q_values[0])
def experience_replay(self):
if len(self.memory) < BATCH_SIZE:
return
batch = random.sample(self.memory, int(len(self.memory)/1))
X = []
targets = []
for state, action, reward, state_next, terminal in batch:
q_update = reward
if not terminal:
if self.isFit:
q_update = (reward + GAMMA * np.amax(self.model.predict(state_next)[0]))
else:
q_update = reward
if self.isFit:
q_values = self.model.predict(state)
else:
q_values = np.zeros(self.action_space).reshape(1, -1)
q_values[0][action] = q_update
X.append(list(state[0]))
targets.append(q_values[0])
self.model.fit(X, targets)
self.isFit = True
self.exploration_rate *= EXPLORATION_DECAY
self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)
def cartpole():
env = gym.make(ENV_NAME)
score_logger = ScoreLogger(ENV_NAME)
observation_space = env.observation_space.shape[0]
action_space = env.action_space.n
dqn_solver = DQNSolver(observation_space, action_space)
run = 0
while True:
run += 1
state = env.reset()
state = np.reshape(state, [1, observation_space])
step = 0
while True:
step += 1
#env.render()
action = dqn_solver.act(state)
state_next, reward, terminal, info = env.step(action)
reward = reward if not terminal else -reward
state_next = np.reshape(state_next, [1, observation_space])
dqn_solver.remember(state, action, reward, state_next, terminal)
state = state_next
if terminal:
print("Run: " + str(run) + ", exploration: " + str(dqn_solver.exploration_rate) + ", score: " + str(step))
score_logger.add_score(step, run)
break
dqn_solver.experience_replay()
if __name__ == "__main__":
cartpole()