In [1]:
import tensorflow as tf
import numpy as np

initializer = tf.keras.initializers.RandomUniform(minval=-0.5, maxval=0.5)
model = tf.keras.models.Sequential([
    tf.keras.layers.InputLayer(input_shape=[4]),
    tf.keras.layers.Dense(20, activation="elu", kernel_initializer=initializer,
                          kernel_regularizer=tf.keras.regularizers.l2(0.01)),
    tf.keras.layers.Dense(5, activation="sigmoid", kernel_initializer=initializer,
                          kernel_regularizer=tf.keras.regularizers.l2(0.01)),
])



Совершаем 100 действий. Получаем 100 массивов длины 5 (`outputs`), содержащих прогнозы действий. Каждому действию соответствует отдача `dr`. Часть из них положительна, а часть отрицательна. На основе знаков формируем 100 массивов с эталонными значениями: берется спрогнозированный массив, и его максимум обнуляеся, если отдача отрицательна, либо, приравнивается 1, если отдача положительна.

In [36]:
def get_targets(outputs, drs):
    targets = np.array(outputs)
    for i in range(len(drs)):
        index = int(tf.argmax(outputs[i], axis=0))
        targets[i, index] = 0. if drs[i] < 0.5 else 1.
    return tf.Variable(targets)

x = tf.Variable([[0.1, 0.4, 0.5, 0.3],
                 [0.1, 0.4, 0.5, 0.3]])

get_targets(x, [-2, 3])

<tf.Variable 'Variable:0' shape=(2, 4) dtype=float32, numpy=
array([[0.1, 0.4, 0. , 0.3],
       [0.1, 0.4, 1. , 0.3]], dtype=float32)>

In [39]:
model(x)

<tf.Tensor: shape=(2, 5), dtype=float32, numpy=
array([[0.50738156, 0.5092559 , 0.47957334, 0.51712924, 0.4854553 ],
       [0.50738156, 0.5092559 , 0.47957334, 0.51712924, 0.4854553 ]],
      dtype=float32)>

In [40]:
# @tf.function
def get_grads(model, input_, loss_fn):
    with tf.GradientTape() as tape:
        output = model(input_)
        action = int(tf.argmax(output, axis=1))
        target = np.array(output)
        target[0, action] = 1.
        target = tf.Variable(target)
        loss = tf.reduce_mean(loss_fn(target, output))
    grads = tape.gradient(loss, model.trainable_variables)
    return grads

In [41]:
f = tf.keras.losses.mse
inputs = np.array([[1.3, 2.1, -0.3, 4.2]], dtype=np.float32)
grad = get_grads(model, inputs, f)
grad

[<tf.Tensor: shape=(4, 5), dtype=float32, numpy=
 array([[-0.00412505, -0.0157588 ,  0.00377326,  0.0240509 , -0.00125274],
        [-0.00666355, -0.02545653,  0.00609527,  0.03885146, -0.00202366],
        [ 0.00095194,  0.00363665, -0.00087075, -0.00555021,  0.00028909],
        [-0.0133271 , -0.05091306,  0.01219054,  0.07770292, -0.00404731]],
       dtype=float32)>,
 <tf.Tensor: shape=(5,), dtype=float32, numpy=
 array([-0.00317312, -0.01212216,  0.00290251,  0.0185007 , -0.00096365],
       dtype=float32)>,
 <tf.Tensor: shape=(5, 5), dtype=float32, numpy=
 array([[ 0.        ,  0.        ,  0.02506815,  0.        ,  0.        ],
        [ 0.        ,  0.        , -0.05141634,  0.        ,  0.        ],
        [ 0.        ,  0.        ,  0.02633543,  0.        ,  0.        ],
        [ 0.        ,  0.        , -0.00256117,  0.        ,  0.        ],
        [ 0.        ,  0.        ,  0.03359893,  0.        ,  0.        ]],
       dtype=float32)>,
 <tf.Tensor: shape=(5,), dtype=f

In [3]:
# @tf.function
def play_one_step(env, obs, model, loss_fn):
    prisoner_inputs = tf.convert_to_tensor([obs["prisoner"]])
    with tf.GradientTape() as tape:
        output = model(prisoner_inputs)
        probabilities = np.array([0.075, 0.075, 0.075, 0.075, 0.075])
        predicted_action = int(tf.argmax(output, axis=1))
        # add randomness for exploration:
        probabilities[predicted_action] = 0.7
        possible_actions = np.array([0, 1, 2, 3, 4])
        prisoner_action = np.random.choice(possible_actions, p=probabilities)
        print(f"{output = }")
        target = np.array(output)
        target[0, prisoner_action] = 1.
        target = tf.Variable(target)
        loss = tf.reduce_mean(loss_fn(target, output))
        
    grads = tape.gradient(loss, model.trainable_variables)

    actions = {"prisoner": prisoner_action}
    # print(f"{actions = }")
    obs, rewards, term, trunc, infos = env.step(actions)
    return obs, rewards, term, trunc, grads

# @tf.function
def play_one_episode(env, model, loss_fn):
    obs, info = env.reset()
    # obs, info = env.reset(seed=24)
    rewards_list, grads_list = [], []

    while env.agents:
        obs, rew, term, trunc, grads = play_one_step(env, obs, model, loss_fn)
        rewards_list.append(rew["prisoner"])
        grads_list.append(grads)
    return rewards_list, grads_list

def discount_rewards(rewards, discount_factor):
    """Функция возвращает отдачу (return)"""
    discounted = np.array(rewards)
    for step in range(len(rewards)-2, -1 , -1):
        discounted[step] += discounted[step+1] * discount_factor
    return discounted

def get_final_grads(grads, advantages):
    mean_grads = []
    for layer in range(len(grads[0])):
        layer_grads = []
        for iter in range(len(grads)):
            layer_grads.append(grads[iter][layer] * advantages[iter])
        mean_grads.append(tf.reduce_mean(layer_grads, axis=0))
    return mean_grads

In [4]:
from escape import env

n_episodes = 1000000
max_cycles = 102
discount_factor = 0.92
lr = 0.03
loss_fn = tf.keras.losses.mse
optimizer = tf.optimizers.Adam(learning_rate=lr)

pygame 2.3.0 (SDL 2.24.2, Python 3.11.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [19]:
environment = env(render_mode=None, max_cycles=max_cycles)
for i in range(n_episodes):
    # print(f"\r{(100 * i / n_episodes)}%", end='')
    print(f"\r{(i)}", end='')
    rewards, grads = play_one_episode(environment, model, loss_fn)
    # dr = discount_rewards(rewards, discount_factor)
    advantages = discount_and_normalize_rewards(rewards, 0.92)
    mean_grads = get_final_grads(grads, advantages)
    # print(f"{mean_grads = }")
    optimizer.apply_gradients(zip(mean_grads, model.trainable_variables))

0output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.95682096, 0.13627857, 0.99812907, 0.8855009 , 0.73919064]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.9519197 , 0.1430323 , 0.9978276 , 0.87988055, 0.7255219 ]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.9449668 , 0.15029228, 0.9973517 , 0.87105846, 0.70863897]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.9353292 , 0.1448773 , 0.99653774, 0.8487347 , 0.69236946]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.92301625, 0.15062931, 0.99543047, 0.82815564, 0.6723224 ]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.9049183 , 0.15622164, 0.99351466, 0.79693085, 0.6483759 ]],
      dtype=float32)>
output = <tf.Tensor: shape=(1, 5), dtype=float32, numpy=
array([[0.8796697 , 0.16208905, 0.9902735 , 0.75

KeyboardInterrupt: 

In [11]:
environment = env(render_mode="human", max_cycles=600)
r, g = play_one_episode(environment, model, loss_fn)

KeyboardInterrupt: 

In [6]:
def discount_and_normalize_rewards(rewards, discount_factor):
    """Функция возвращает преимущества действий (action advantages)"""
    discounted_rewards = discount_rewards(rewards, discount_factor)
    reward_mean = discounted_rewards.mean()
    reward_std = discounted_rewards.std()
    
    return (discounted_rewards - reward_mean) / reward_std

////

In [2]:
def play_one_step(env, obs, model, loss_fn):
    prisoner_inputs = tf.convert_to_tensor([obs["prisoner"]])
    with tf.GradientTape() as tape:
        output = model(prisoner_inputs)
        probabilities = np.array([0.075, 0.075, 0.075, 0.075, 0.075])
        predicted_action = int(tf.argmax(output, axis=1))
        # add randomness for exploration:
        probabilities[predicted_action] = 0.7
        possible_actions = np.array([0, 1, 2, 3, 4])
        prisoner_action = np.random.choice(possible_actions, p=probabilities)
        # print(f"{output = }")
        target = np.array(output)
        target[0, prisoner_action] = 1.
        target = tf.Variable(target)
        loss = tf.reduce_mean(loss_fn(target, output))
        
    grads = tape.gradient(loss, model.trainable_variables)

    actions = {"prisoner": prisoner_action}
    # print(f"{actions = }")
    obs, rewards, term, trunc, infos = env.step(actions)
    return obs, rewards, term, trunc, grads

def play_one_episode(env, model, loss_fn):
    # print("episode!")
    obs, info = env.reset()
    # obs, info = env.reset(seed=24)
    rewards_list, grads_list = [], []

    while env.agents:
        obs, rew, term, trunc, grads = play_one_step(env, obs, model, loss_fn)
        rewards_list.append(rew["prisoner"])
        grads_list.append(grads)
    return rewards_list, grads_list

def play_multiple_episodes(env, model, loss_fn, n_episodes):
    all_rewards, all_grads = [], []
    for episode in range(n_episodes):
        rewards, grads = play_one_episode(env, model, loss_fn)
        all_rewards.append(rewards)
        all_grads.append(grads)
    return all_rewards, all_grads

In [3]:
def discount_rewards(rewards, discount_factor):
    """Функция возвращает отдачу (return)"""
    discounted = np.array(rewards)
    for step in range(len(rewards)-2, -1 , -1):
        discounted[step] += discounted[step+1] * discount_factor
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
    """Функция возвращает преимущества действий (action advantages)"""
    all_discounted_rewards = [discount_rewards(rewards, discount_factor)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

def get_final_grads(grads, advantages):
    mean_grads = []
    for layer in range(len(grads[0])):
        layer_grads = []
        for iter in range(len(grads)):
            layer_grads.append(grads[iter][layer] * advantages[iter])
        mean_grads.append(tf.reduce_mean(layer_grads, axis=0))
    return mean_grads

In [6]:
from escape import env

n_iterations = 15000
n_episodes_per_update = 10
max_cycles = 200
discount_factor = 0.95

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.binary_crossentropy

In [7]:
def modify_grads(all_grads, all_final_rewards):
    all_mean_grads = []
    for var_idx in range(len(model.trainable_variables)):
        modified_grads = []
        for episode_idx, final_rewards in enumerate(all_final_rewards):
            for step, final_rwd in enumerate(final_rewards):
                modified_grad = final_rwd * all_grads[episode_idx][step][var_idx]
                modified_grads.append(modified_grad)
        mean_grads = tf.reduce_mean(modified_grads)
    all_mean_grads.append(mean_grads)
    return all_mean_grads

In [8]:
environment = env(render_mode=None, max_cycles=max_cycles)

for iteration in range(n_iterations):
    # print(f"\r{iteration+1}", end='')
    all_rewards, all_grads = play_multiple_episodes(
        environment, model, loss_fn, n_episodes_per_update)

    # extra code – displays some debug info during training
    total_rewards = sum(map(sum, all_rewards))
    print(f"\rIteration: {iteration + 1}/{n_iterations},"
          f" mean rewards: {total_rewards / n_episodes_per_update:.1f}", end="")

    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_factor)
    # all_mean_grads = []
    # for var_index in range(len(model.trainable_variables)):
    #     mean_grads = tf.reduce_mean(
    #         [final_reward * all_grads[episode_index][step][var_index]
    #          for episode_index, final_rewards in enumerate(all_final_rewards)
    #              for step, final_reward in enumerate(final_rewards)], axis=0)
    #     all_mean_grads.append(mean_grads)
    all_mean_grads = modify_grads(all_grads, all_final_rewards)

    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

  warn("You are calling render method without specifying any render mode. "


Iteration: 1245/15000, mean rewards: -312.2

KeyboardInterrupt: 

In [12]:
environment = env(render_mode="human", max_cycles=600)
r, g = play_one_episode(environment, model, loss_fn)

In [73]:
discount_and_normalize_rewards(r, 0.92)

array([ 0.18812776,  0.03559174, -0.11694428, -0.2694803 , -0.57455234,
       -0.87962438, -1.18469642, -1.48976845, -1.79484049, -2.25244855,
        0.64573581,  0.4931998 ,  0.34066378,  0.18812776,  0.03559174,
       -0.11694428, -0.42201632, -0.72708836, -1.0321604 , -1.33723243,
        1.71348795,  1.56095193,  1.40841591,  1.25587989,  1.10334387,
        0.95080785,  0.79827183,  0.64573581,  0.4931998 ,  0.34066378])

In [29]:
model.layers[1].get_weights()

[array([[-0.18837774, -0.45670196, -0.286776  , -0.44300035, -0.0597306 ],
        [ 0.1053976 , -0.22743143, -0.57153386, -1.0661036 ,  0.33794   ],
        [ 0.6171772 ,  0.47629446,  1.1461575 , -0.01090528,  0.79948384],
        [ 0.8132222 , -0.50877386, -0.53381246, -0.72459054,  0.905814  ],
        [-0.61633545, -0.33643255, -1.0380995 , -0.92383546, -0.27554572]],
       dtype=float32),
 array([-0.21911143, -0.90269554, -0.11212421,  0.09738877, -0.4334309 ],
       dtype=float32)]