## Checking tensorflow version

In [1]:
import tensorflow as tf
print(tf.__version__)
print(tf.executing_eagerly())

2.0.0-rc1
True


Note that we're now in eager mode by default!

---

# Simple Environment

In [2]:
import numpy as np

class Env:
    def __init__(self):
        self.action_dim = 2
        self.state = np.array([1., 1.])
    
    def reset(self):
        self.state = np.array([1., 1.])
        return self.state
        
    def obs(self):
        return np.array(self.state)
    
    def step(self, action):
        # 아래로 이동
        if action == 0:
            self.state[0] += 1
        # 오른쪽으로 이동
        elif action == 1:
            self.state[1] += 1
        reward, done = self.calc_reward()
        return self.state, reward, done
            
    def calc_reward(self):
        if self.state[0] > 3 or self.state[1] > 3:
            return -1, True
        elif self.state[0] == 1 and self.state[1] == 3:
            return -1, False
        elif self.state[0] == 3 and self.state[1] == 1:
            return -1, False
        elif self.state[0] == 3 and self.state[1] == 3:
            return 1, True
        else:
            return 1, False

### Env test

In [3]:
e = Env()
print(e.reset())
print(e.step(0)) # move to down
print(e.step(1)) # move to right
print(e.step(0)) # move to down
print(e.step(1)) # move to right
print()
print(e.reset())
print(e.step(0)) # move to down
print(e.step(0)) # move to down
print(e.step(0)) # move to down

[1. 1.]
(array([2., 1.]), 1, False)
(array([2., 2.]), 1, False)
(array([3., 2.]), 1, False)
(array([3., 3.]), 1, True)

[1. 1.]
(array([2., 1.]), 1, False)
(array([3., 1.]), -1, False)
(array([4., 1.]), -1, True)


---

# A2C

## 모델 구성

In [4]:
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras

class ProbabilityDistribution(tf.keras.Model):
    def call(self, logits):
        # sample a random categorical action from given logits
        return tf.squeeze(tf.random.categorical(logits, 1), axis=-1)

class Model(tf.keras.Model):
    def __init__(self, num_actions):
        super().__init__('mlp_policy')
        # no tf.get_variable(), just simple Keras API
        self.hidden1 = keras.layers.Dense(128, activation='relu')
        self.hidden2 = keras.layers.Dense(128, activation='relu')
        self.value = keras.layers.Dense(1, name='value')
        # logits are unnormalized log probabilities
        self.logits = keras.layers.Dense(num_actions, name='policy_logits')
        self.dist = ProbabilityDistribution()

    # 레이어를 구성해주는 부분인 것 같음
    def call(self, inputs):
        # inputs is a numpy array, convert to Tensor
        x = tf.convert_to_tensor(inputs, dtype=tf.float32)
        # separate hidden layers from the same input tensor
        hidden_logs = self.hidden1(x)
        hidden_vals = self.hidden2(x)
        
        return self.logits(hidden_logs), self.value(hidden_vals)

    def action_value(self, obs):
        # executes call() under the hood
        logits, value = self.predict(obs)
        action = self.dist.predict(logits)
        # a simpler option, will become clear later why we don't use it
        # action = tf.random.categorical(logits, 1)

        return np.squeeze(logits, axis=0), np.squeeze(action, axis=-1), np.squeeze(value, axis=-1)

기본적으로 구성한 모델이 잘 동작하는지 확인하기 위해서 테스트(feeding), 학습이 되지 않은 모델임

In [5]:
env = Env()
model = Model(num_actions=env.action_dim)

state = env.reset()
state = np.expand_dims(state, axis=0)
# no feed_dict or tf.ession() needed at all
logits, action, value = model.action_value(state)
print(logits, action, value)

[-0.03893859 -0.22745085] 1 [0.1200963]


## Random Agent
위 학습이 되지 않은 모델을 이용하여 어느 단계까지 가는지 확인(즉, 학습이 되지 않았을 때의 성능 테스트)

In [6]:
class A2CAgent:
    def __init__(self, model):
        self.model = model
        
    def test(self, env):
        state, done, ep_reward = env.reset(), False, 0
        while not done:
            _, action, _ = self.model.action_value(state[None, :])
            state, reward, done = env.step(action)
            ep_reward += reward
        return ep_reward

100 번 정도 테스트를 수행해보면 에러가 많이 발생하는 것을 알 수 있음.

In [10]:
agent = A2CAgent(model)
iter_cnt = 100
succ_cnt = 0
for _ in range(100):
    rewards_sum = agent.test(env)
    if rewards_sum == 4:
        succ_cnt +=1
print(succ_cnt / 100 * 100, "%")

24.0 %


## Loss / Ojbective Function
## Agent Training Loop

In [11]:
class A2CAgent:
    def __init__(self, model):
        # hyperparameters for loss terms
        self.params = {
            'value': 0.5,
            'entropy': 0.001}
        self.model = model
        self.model.compile(
            optimizer=keras.optimizers.RMSprop(lr=0.0007),
            # define separate losses for policy logits and value estimate
            loss=[self._logits_loss, self._value_loss]
        )

    def test(self, env):
        # unchanged from previous section
        state, done, ep_reward = env.reset(), False, 0
        while not done:
            _, action, _ = self.model.action_value(state[None, :])
            state, reward, done = env.step(action)
            ep_reward += reward
        return ep_reward

    def _value_loss(self, returns, value):
        # value loss is typically MSE between value estimates and returns
        return self.params['value'] * keras.losses.mean_squared_error(returns, value)

    def _logits_loss(self, acts_and_advs, logits):
        # a trick to input actions and advantages through same API
        actions, advantages = tf.split(acts_and_advs, 2, axis=-1)
        # sparse categorical CE loss obj that supports sample_weight arg on call()
        # from_logits argument ensures transformation into normalized probabilities
        weighted_sparse_ce = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        # policy loss is defined by policy gradients, weighted by advantages
        # note: we only calculate the loss on the actions we've actually taken
        actions = tf.cast(actions, tf.int32)
        policy_loss = weighted_sparse_ce(actions, logits, sample_weight=advantages)
        # entropy loss can be calculated via CE over itself
        entropy_loss = keras.losses.categorical_crossentropy(logits, logits, from_logits=True)
        # here signs are flipped because optimizer minimizes
        return policy_loss - self.params['entropy']*entropy_loss

And we're done with the objective functions! Note how compact the code is: there's almost more comment line than code itself.

## Agent Training Loop

In [16]:
class A2CAgent:
    def __init__(self, model):
        # hyperparameters for loss terms
        self.params = {
            'value': 0.5,
            'entropy': 0.001,
            'gamma':0.99}
        self.model = model
        self.model.compile(
            optimizer=keras.optimizers.RMSprop(lr=0.0007),
            # define separate losses for policy logits and value estimate
            loss=[self._logits_loss, self._value_loss]
        )
        
    def train(self, env, batch_size=32, updates=500):
        # storage helpers for a single batch of data
        actions = np.empty((batch_size,), dtype=np.int32)
        rewards, dones, values = np.empty((3, batch_size))
        states = np.empty((batch_size, ) + env.state.shape)
        # training loop: collect samples, send to optimize, repeat updates times
        ep_rews = [0.0]
        next_state = env.reset()
        for update in range(updates):
            for step in range(batch_size):
                states[step] = next_state.copy()
                _, actions[step], values[step] = self.model.action_value(np.expand_dims(next_state, axis=0))
                next_state, rewards[step], dones[step] = env.step(actions[step])

                ep_rews[-1] += rewards[step]
                if dones[step]:
                    ep_rews.append(0.0)
                    next_obs = env.reset()

            _, _, next_value = self.model.action_value(next_state[None, :])
            returns, advs = self._returns_advantages(rewards, dones, values, next_value)
            # a trick to input actions and advantages through same API
            acts_and_advs = np.concatenate([actions[:, None], advs[:, None]], axis=-1)
            # performs a full training step on the collected batch
            # note: no need to mess around with gradients, Keras API handles it
            losses = self.model.train_on_batch(states, [acts_and_advs, returns])
        return ep_rews

    def _returns_advantages(self, rewards, dones, values, next_value):
        # next_value is the bootstrap value estimate of a future state (the critic)
        returns = np.append(np.zeros_like(rewards), next_value, axis=-1)
        # returns are calculated as discounted sum of future rewards
        for t in reversed(range(rewards.shape[0])):
            returns[t] = rewards[t] + self.params['gamma'] * returns[t+1] * (1-dones[t])
        returns = returns[:-1]
        # advantages are returns - baseline, value estimates in our case
        advantages = returns - values
        return returns, advantages

    def test(self, env):
        # unchanged from previous section
        state, done, ep_reward = env.reset(), False, 0
        while not done:
            _, action, _ = self.model.action_value(state[None, :])
            state, reward, done = env.step(action)
            ep_reward += reward
        return ep_reward

    def _value_loss(self, returns, value):
        # value loss is typically MSE between value estimates and returns
        return self.params['value'] * keras.losses.mean_squared_error(returns, value)

    def _logits_loss(self, acts_and_advs, logits):
        # a trick to input actions and advantages through same API
        actions, advantages = tf.split(acts_and_advs, 2, axis=-1)
        # sparse categorical CE loss obj that supports sample_weight arg on call()
        # from_logits argument ensures transformation into normalized probabilities
        weighted_sparse_ce = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        # policy loss is defined by policy gradients, weighted by advantages
        # note: we only calculate the loss on the actions we've actually taken
        actions = tf.cast(actions, tf.int32)
        policy_loss = weighted_sparse_ce(actions, logits, sample_weight=advantages)
        # entropy loss can be calculated via CE over itself
        entropy_loss = keras.losses.categorical_crossentropy(logits, logits, from_logits=True)
        # here signs are flipped because optimizer minimizes
        return policy_loss - self.params['entropy']*entropy_loss

## Training & Results

### Training

In [14]:
import time
st_time = time.time()
agent = A2CAgent(model)
env = Env()
rewards_history = agent.train(env)
print("Training Runtime: %f sec" % (time.time() - st_time))

Training Runtime: 109.562141 sec


### Results

In [15]:
iter_cnt = 100
succ_cnt = 0

for _ in range(iter_cnt):
    rewards = agent.test(env)
    if rewards == 4:
        succ_cnt += 1

print("%d %% succecss" % (succ_cnt / iter_cnt * 100))

55 % succecss


In [176]:
with tf.Graph().as_default():
    print(tf.executing_eagerly()) # False
    
    model = Model(num_actions=2)
    agent = A2CAgent(model)
    
    rewards_history = agent.train(env)
    print("Finished trainning, testing...")
    print("%d out of 4" % agent.test(env))

False
Finished trainning, testing...
4 out of 4
