Instalando o tf_agents caso não esteja instalado

In [None]:
!pip3 install -q tf-agents

Funções para serem utilizadas durante o teste

In [None]:
import numpy as np


# === Multi-modal ===
# D dimensões - Múltiplos mínimos
def ackley(x: np.ndarray, a=20, b=0.2, c=2 * np.math.pi):
    d = x.shape[0]
    return -a * np.exp(-b * np.sqrt(np.sum(x * x, axis=0) / d)) - \
           np.exp(np.sum(np.cos(c * x), axis=0) / d) + a + np.math.e


# D dimensões - Múltiplos mínimos
def griewank(x: np.ndarray):
    sum = np.sum(x ** 2, axis=0) / 4000.0
    den = np.arange(start=1, stop=(x.shape[0] + 1), dtype=x.dtype)
    prod = np.cos(x / np.sqrt(den))
    prod = np.prod(prod, axis=0)
    return sum - prod + 1


# D dimensões - Múltiplos mínimos
def rastrigin(x: np.ndarray):
    d = x.shape[0]
    return 10 * d + np.sum(x ** 2 - 10 * np.cos(x * 2 * np.math.pi), axis=0)


def levy(x: np.ndarray):
    pi = np.math.pi
    d = x.shape[0] - 1
    w = 1 + (x - 1) / 4

    term1 = np.sin(pi * w[0]) ** 2
    term3 = (w[d] - 1) ** 2 * (1 + np.sin(2 * pi * w[d]) ** 2)

    wi = w[0:d]
    sum = np.sum((wi - 1) ** 2 * (1 + 10 * np.sin(pi * wi + 1) ** 2), axis=0)
    return term1 + sum + term3


# === Valley-shaped ===
# D dimensões
def rosenbrock(x: np.ndarray):
    rosen_sum = 0.0
    d = x.shape[0]

    for i in range(d - 1):
        rosen_sum += 100 * (x[i + 1] - x[i] ** 2) ** 2 + (x[i] - 1.0) ** 2

    return rosen_sum


# === Plate-shaped ===
# D dimensões
def zakharov(x: np.ndarray):
    d = x.shape[0]

    sum1 = np.sum(x * x, axis=0)
    sum2 = np.sum(x * np.arange(start=1, stop=(d + 1), dtype=x.dtype) / 2, axis=0)
    return sum1 + sum2 ** 2 + sum2 ** 4


# === Bowl-shaped ===
# 2 Dimensões
def bohachevsky(x: np.ndarray):
    d = x.shape[0]
    assert d == 2

    return x[0] ** 2 + 2 * (x[1] ** 2) - 0.3 * np.cos(3 * np.pi * x[0]) - 0.4 * np.cos(4 * np.pi * x[1]) + 0.7


# D dimensões
def sum_squares(x: np.ndarray):
    mul = np.arange(start=1, stop=(x.shape[0] + 1), dtype=x.dtype)
    return np.sum((x ** 2) * mul, axis=0)


# D dimensões
def sphere(x: np.ndarray):
    return np.sum(x * x, axis=0)


# D dimensões:
def rotated_hyper_ellipsoid(x: np.ndarray):
    d = x.shape[0]

    return np.sum([np.sum(x[0:(i + 1)] ** 2, axis=0) for i in range(d)], dtype=np.float32, axis=0)


# === Funções Utilitárias ===
# Recebe uma função como argumento
# Retorna o 'limite inferior e superior' da função
def get_low_and_high(function):
    if function is sphere or function is rastrigin:
        return -5.12, 5.12
    elif function is ackley:
        return -32.768, 32.768
    elif function is rosenbrock or function is zakharov:
        return -5.0, 10.0
    elif function is bohachevsky:
        return -100.0, 100.0
    elif function is sum_squares or function is levy:
        return -10.0, 10.0
    elif function is griewank:
        return -600.0, 600.0
    elif function is rotated_hyper_ellipsoid:
        return -65.536, 65.536

# Bom dia

Definindo o ambiente de otimização de funções como py_environment



In [None]:
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
from collections import namedtuple

hypercube = namedtuple('hypercube', ['min', 'max'])


class FunctionEnv(py_environment.PyEnvironment):
    def __init__(self, function, domain: hypercube, dims) -> None:
        super().__init__()
        self._function = function
        self._domain = domain
        self._dims = dims
        self._best_solution = np.finfo(np.float32).max
        self._episode_ended = False
        self._steps_taken = 0
        self._state = np.random.uniform(size=(dims,), low=domain.min, high=domain.max) \
            .astype(dtype=np.float32, copy=False)

        self._action_spec = array_spec.BoundedArraySpec(
            shape=(dims,), dtype=np.float32,
            minimum=-1.0, maximum=1.0, name='action')

        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(dims,), dtype=np.float32,
            minimum=domain.min, maximum=domain.max, name='observation')

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def get_info(self):
        return self._best_solution

    def get_state(self):
        state = (self._state, self._steps_taken, self._episode_ended)
        return state

    def set_state(self, state):
        _state, _steps_taken, _episode_ended = state
        self._state = _state
        self._steps_taken = _steps_taken
        self._episode_ended = _episode_ended

    def _step(self, action):
        if self._episode_ended:
            return self.reset()

        self._state = self._state + 2.5*action
        self._state = np.clip(self._state, a_min=self._domain.min, a_max=self._domain.max)

        self._steps_taken += 1
        if self._steps_taken > 2000:
            self._episode_ended = True

        reward = -self._function(self._state)
        if reward < self._best_solution:
            self._best_solution = reward

        if self._episode_ended:
            return ts.termination(self._state, reward)
        else:
            return ts.transition(self._state, reward)

    def _reset(self):
        self._state = np.random.uniform(size=(self._dims,), low=self._domain.min, high=self._domain.max) \
            .astype(dtype=np.float32, copy=False)
        self._episode_ended = False
        self._steps_taken = 0
        return ts.restart(self._state)

    def _render(self):
        # TODO: Implementar método para renderizar
        pass


Função para testar o agente treinado (Plota fitness x iteração)

In [None]:
import matplotlib.pyplot as plt

def evaluate_agent(eval_env, policy_eval, function, dims):
    time_step = eval_env.reset()
    
    pos = time_step.observation.numpy()[0]
    best_solution = function(pos)

    best_solution_at_it = []
    best_solution_at_it.append(best_solution)
    best_it = 0
    it = 0

    while not time_step.is_last():
        it += 1
        action_step = policy_eval.action(time_step)
        time_step = eval_env.step(action_step.action)

        obj_value = -time_step.reward.numpy()[0]

        if obj_value < best_solution:
            best_solution = obj_value
            pos = time_step.observation.numpy()[0]
            best_it = it
            
        best_solution_at_it.append(best_solution)
          
    fig, ax = plt.subplots()
    ax.plot(range(len(best_solution_at_it)), best_solution_at_it)
    ax.set(xlabel="Iteration", ylabel="Best objective value", title="TD3 on {0} ({1} Dims)".format(function.__name__, dims))
    ax.grid()
    plt.show()
    print('best_solution: ', best_solution)
    print('found at it: ', best_it)
    print('at position: ', pos)

Imports para Main (Agente, Redes, etc)

In [None]:
import tensorflow as tf
from tf_agents.environments.wrappers import TimeLimit
from tf_agents.environments import tf_py_environment
from tf_agents.agents import Td3Agent
from tf_agents.agents.ddpg.actor_network import ActorNetwork
from tf_agents.agents.ddpg.critic_network import CriticNetwork
from tf_agents.drivers import dynamic_step_driver
from tf_agents.policies.random_tf_policy import RandomTFPolicy
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.utils import common

Hiperparâmetros

In [None]:
# Hiperparametros de treino
num_episodes = 800 # @param {type:"integer"}
initial_collect_episodes = 10 # @param {type:"integer"}
collect_steps_per_iteration = 1 # @param {type:"integer"}

# Hiperparametros da memória de replay
buffer_size = 1000000 # @param {type:"integer"}
batch_size = 64 # @param {type:"number"}

# Hiperparametros do Agente
actor_lr = 1e-4 # @param {type:"number"}
critic_lr = 2e-4 # @param {type:"number"}
tau = 5e-4 # @param {type:"number"}
discount = 0.99 # @param {type:"number"}
exploration_noise_std = 0.2 # @param {type:"number"}
target_policy_noise = 0.2 # @param {type:"number"}
target_policy_noise_clip = 0.5 # @param {type:"number"}
actor_update_period = 2 # @param {type:"integer"}
target_update_period = 2 # @param {type:"integer"}
reward_scale_factor = 0.75 # @param {type:"number"}

# --- Arquitetura da rede ---
# Actor
fc_layer_params = [400, 300] # FNN's do Actor
# Critic
observation_fc_layer_params = [400] # FNN's apenas para observações
joint_fc_layer_params=[300] # FNN's depois de concatenar (observação, ação)



Criando o Env

In [None]:
# Envs
steps = 500  # @param {type:"integer"}
steps_eval = 2000 # @param {type:"integer"}
dims = 2  # @param {type:"integer"}
function = ackley # @param ["sphere", "ackley", "griewank", "levy", "zakharov", "rotated_hyper_ellipsoid", "rosenbrock"]{type: "raw"}
low, high = get_low_and_high(function)

env = FunctionEnv(function=function, domain=hypercube(min=low, max=high), dims=dims)

env_training = TimeLimit(env=env, duration=steps)
env_eval = TimeLimit(env=env, duration=steps_eval)
tf_env_training = tf_py_environment.TFPyEnvironment(environment=env_training)
tf_env_eval = tf_py_environment.TFPyEnvironment(environment=env_eval)

obs_spec = tf_env_training.observation_spec()
act_spec = tf_env_training.action_spec()
time_spec = tf_env_training.time_step_spec()


Criando as redes

In [None]:
# Creating networks
actor_network = ActorNetwork(input_tensor_spec=obs_spec,
                             output_tensor_spec=act_spec,
                             fc_layer_params=fc_layer_params,
                             activation_fn=tf.keras.activations.relu)
critic_network = CriticNetwork(input_tensor_spec=(obs_spec, act_spec),
                               observation_fc_layer_params=observation_fc_layer_params,
                               joint_fc_layer_params=joint_fc_layer_params,
                               activation_fn=tf.keras.activations.relu,
                               output_activation_fn=tf.keras.activations.linear)


Criando o agente

In [None]:
# Creating agent
actor_optimizer = tf.keras.optimizers.Adam(learning_rate=actor_lr)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate=critic_lr)

agent = Td3Agent(
    time_step_spec=time_spec,
    action_spec=act_spec,
    actor_network=actor_network,
    critic_network=critic_network,
    actor_optimizer=actor_optimizer,
    critic_optimizer=critic_optimizer,
    target_update_tau=tau,
    exploration_noise_std=exploration_noise_std,
    target_policy_noise=target_policy_noise,
    target_policy_noise_clip=target_policy_noise_clip,
    actor_update_period=actor_update_period,
    target_update_period=target_update_period,
    reward_scale_factor=reward_scale_factor,
    train_step_counter=tf.Variable(0),
    gamma=discount)

agent.initialize()

Replay Buffer

In [None]:
# Replay buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(data_spec=agent.collect_data_spec,
                                                               batch_size=tf_env_training.batch_size,
                                                               max_length=buffer_size)

Criando o Driver e realizando coleta inicial

In [None]:
# Data Collection (Collect for initial episodes)
driver = dynamic_step_driver.DynamicStepDriver(env=tf_env_training,
                                               policy=agent.collect_policy,
                                               observers=[replay_buffer.add_batch],
                                               num_steps=collect_steps_per_iteration)
driver.run = common.function(driver.run)


initial_collect_driver = dynamic_step_driver.DynamicStepDriver(env=tf_env_training,
                                               policy=agent.collect_policy,
                                               observers=[replay_buffer.add_batch],
                                               num_steps=collect_steps_per_iteration)

initial_collect_driver.run = common.function(initial_collect_driver.run)

for _ in range(initial_collect_episodes):
    done = False
    while not done:
        time_step, _ = initial_collect_driver.run()
        done = time_step.is_last()

Criando o dataset

In [None]:
# Creating a dataset
dataset = replay_buffer.as_dataset(
    sample_batch_size=batch_size,
    num_steps=2)

iterator = iter(dataset)

Treinamento do Agente

In [None]:
# Training
agent.train = common.function(agent.train)
agent.train_step_counter.assign(0)

for ep in range(num_episodes):
    done = False
    best_solution = tf.float32.max
    ep_rew = 0.0
    while not done:
        time_step, _ = driver.run()
        experience, unused_info = next(iterator)
        agent.train(experience)

        obj_value = -time_step.reward.numpy()[0]

        if obj_value < best_solution and not time_step.is_first():
            best_solution = obj_value

        ep_rew += -obj_value
        done = time_step.is_last()

    print('episode = {0} Best solution on episode: {1} Return on episode: {2}'.format(ep, best_solution, ep_rew))

Realizando os testes do agente depois que sendo chamado

In [None]:
evaluate_agent(tf_env_eval, agent.policy, function, dims)

In [None]:
evaluate_agent(tf_env_eval, agent.collect_policy, function, dims)

Salvando ambas policies e agente

In [None]:
from tf_agents.policies.policy_saver import PolicySaver

tf_policy_saver = PolicySaver(agent.policy)
tf_policy_collect_saver = PolicySaver(agent.collect_policy)

tf_policy_saver.save('policy')
tf_policy_collect_saver.save('policy_collect')