In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import tensorflow as tf
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts

tf.compat.v1.enable_v2_behavior()

In [2]:
class PyEnvironment(object):

  def reset(self):
    """Return initial_time_step."""
    self._current_time_step = self._reset()
    return self._current_time_step

  def step(self, action):
    """Apply action and return new time_step."""
    if self._current_time_step is None:
        return self.reset()
    self._current_time_step = self._step(action)
    return self._current_time_step

  def current_time_step(self):
    return self._current_time_step

  def time_step_spec(self):
    """Return time_step_spec."""

  @abc.abstractmethod
  def observation_spec(self):
    """Return observation_spec."""

  @abc.abstractmethod
  def action_spec(self):
    """Return action_spec."""

  @abc.abstractmethod
  def _reset(self):
    """Return initial_time_step."""

  @abc.abstractmethod
  def _step(self, action):
    """Apply action and return new time_step."""

In [3]:
environment = suite_gym.load('CartPole-v0')
print('action_spec:', environment.action_spec())
print('time_step_spec.observation:', environment.time_step_spec().observation)
print('time_step_spec.step_type:', environment.time_step_spec().step_type)
print('time_step_spec.discount:', environment.time_step_spec().discount)
print('time_step_spec.reward:', environment.time_step_spec().reward)

action_spec: BoundedArraySpec(shape=(), dtype=dtype('int64'), name='action', minimum=0, maximum=1)
time_step_spec.observation: BoundedArraySpec(shape=(4,), dtype=dtype('float32'), name='observation', minimum=[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], maximum=[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38])
time_step_spec.step_type: ArraySpec(shape=(), dtype=dtype('int32'), name='step_type')
time_step_spec.discount: BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
time_step_spec.reward: ArraySpec(shape=(), dtype=dtype('float32'), name='reward')


In [4]:
action = np.array(1, dtype=np.int32)
time_step = environment.reset()
print(time_step)
while not time_step.is_last():
  time_step = environment.step(action)
  print(time_step)

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([-0.02136502,  0.01631076,  0.04721795,  0.03403996], dtype=float32),
 'reward': array(0., dtype=float32),
 'step_type': array(0)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([-0.0210388 ,  0.2107249 ,  0.04789874, -0.24337931], dtype=float32),
 'reward': array(1., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([-0.01682431,  0.40513113,  0.04303116, -0.5205774 ], dtype=float32),
 'reward': array(1., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([-0.00872168,  0.5996217 ,  0.03261961, -0.799396  ], dtype=float32),
 'reward': array(1., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([ 0.00327075,  0.7942814 ,  0.01663169, -1.0816417 ], dtype=float32),
 'reward': array(1., dtype=float32),
 'step_typ

In [5]:
class CardGameEnv(py_environment.PyEnvironment):

  def __init__(self):
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=0, name='observation')
    self._state = 0
    self._episode_ended = False

  def action_spec(self):
    return self._action_spec

  def observation_spec(self):
    return self._observation_spec

  def _reset(self):
    self._state = 0
    self._episode_ended = False
    return ts.restart(np.array([self._state], dtype=np.int32))

  def _step(self, action):

    if self._episode_ended:
      # The last action ended the episode. Ignore the current action and start
      # a new episode.
      return self.reset()

    # Make sure episodes don't go on forever.
    if action == 1:
      self._episode_ended = True
    elif action == 0:
      new_card = np.random.randint(1, 11)
      self._state += new_card
    else:
      raise ValueError('`action` should be 0 or 1.')

    if self._episode_ended or self._state >= 21:
      reward = self._state - 21 if self._state <= 21 else -21
      return ts.termination(np.array([self._state], dtype=np.int32), reward)
    else:
      return ts.transition(
          np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

In [6]:

environment = CardGameEnv()
utils.validate_py_environment(environment, episodes=5)

In [7]:
get_new_card_action = np.array(0, dtype=np.int32)
end_round_action = np.array(1, dtype=np.int32)

environment = CardGameEnv()
time_step = environment.reset()
print(time_step)
cumulative_reward = time_step.reward

for _ in range(3):
  time_step = environment.step(get_new_card_action)
  print(time_step)
  cumulative_reward += time_step.reward

time_step = environment.step(end_round_action)
print(time_step)
cumulative_reward += time_step.reward
print('Final Reward = ', cumulative_reward)

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([0]),
 'reward': array(0., dtype=float32),
 'step_type': array(0)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([5]),
 'reward': array(0., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([15]),
 'reward': array(0., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([16]),
 'reward': array(0., dtype=float32),
 'step_type': array(1)})
TimeStep(
{'discount': array(0., dtype=float32),
 'observation': array([16]),
 'reward': array(-5., dtype=float32),
 'step_type': array(2)})
Final Reward =  -5.0


In [8]:

env = suite_gym.load('Pendulum-v0')
print('Action Spec:', env.action_spec())

discrete_action_env = wrappers.ActionDiscretizeWrapper(env, num_actions=5)
print('Discretized Action Spec:', discrete_action_env.action_spec())

Action Spec: BoundedArraySpec(shape=(1,), dtype=dtype('float32'), name='action', minimum=-2.0, maximum=2.0)
Discretized Action Spec: BoundedArraySpec(shape=(), dtype=dtype('int32'), name='action', minimum=0, maximum=4)


In [9]:
class TFEnvironment(object):

  def time_step_spec(self):
    """Describes the `TimeStep` tensors returned by `step()`."""

  def observation_spec(self):
    """Defines the `TensorSpec` of observations provided by the environment."""

  def action_spec(self):
    """Describes the TensorSpecs of the action expected by `step(action)`."""

  def reset(self):
    """Returns the current `TimeStep` after resetting the Environment."""
    return self._reset()

  def current_time_step(self):
    """Returns the current `TimeStep`."""
    return self._current_time_step()

  def step(self, action):
    """Applies the action and returns the new `TimeStep`."""
    return self._step(action)

  @abc.abstractmethod
  def _reset(self):
    """Returns the current `TimeStep` after resetting the Environment."""

  @abc.abstractmethod
  def _current_time_step(self):
    """Returns the current `TimeStep`."""

  @abc.abstractmethod
  def _step(self, action):
    """Applies the action and returns the new `TimeStep`."""

In [10]:
env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

print(isinstance(tf_env, tf_environment.TFEnvironment))
print("TimeStep Specs:", tf_env.time_step_spec())
print("Action Specs:", tf_env.action_spec())

True
TimeStep Specs: TimeStep(
{'discount': BoundedTensorSpec(shape=(), dtype=tf.float32, name='discount', minimum=array(0., dtype=float32), maximum=array(1., dtype=float32)),
 'observation': BoundedTensorSpec(shape=(4,), dtype=tf.float32, name='observation', minimum=array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32), maximum=array([4.8000002e+00, 3.4028235e+38, 4.1887903e-01, 3.4028235e+38],
      dtype=float32)),
 'reward': TensorSpec(shape=(), dtype=tf.float32, name='reward'),
 'step_type': TensorSpec(shape=(), dtype=tf.int32, name='step_type')})
Action Specs: BoundedTensorSpec(shape=(), dtype=tf.int64, name='action', minimum=array(0, dtype=int64), maximum=array(1, dtype=int64))


In [11]:
env = suite_gym.load('CartPole-v0')

tf_env = tf_py_environment.TFPyEnvironment(env)
# reset() creates the initial time_step after resetting the environment.
time_step = tf_env.reset()
num_steps = 3
transitions = []
reward = 0
for i in range(num_steps):
  action = tf.constant([i % 2])
  # applies the action and returns the new TimeStep.
  next_time_step = tf_env.step(action)
  transitions.append([time_step, action, next_time_step])
  reward += next_time_step.reward
  time_step = next_time_step

np_transitions = tf.nest.map_structure(lambda x: x.numpy(), transitions)
print('\n'.join(map(str, np_transitions)))
print('Total reward:', reward.numpy())

[TimeStep(
{'discount': array([1.], dtype=float32),
 'observation': array([[ 0.00688582, -0.02496718,  0.02946392,  0.02648936]],
      dtype=float32),
 'reward': array([0.], dtype=float32),
 'step_type': array([0])}), array([0]), TimeStep(
{'discount': array([1.], dtype=float32),
 'observation': array([[ 0.00638647, -0.22049901,  0.02999371,  0.32832092]],
      dtype=float32),
 'reward': array([1.], dtype=float32),
 'step_type': array([1])})]
[TimeStep(
{'discount': array([1.], dtype=float32),
 'observation': array([[ 0.00638647, -0.22049901,  0.02999371,  0.32832092]],
      dtype=float32),
 'reward': array([1.], dtype=float32),
 'step_type': array([1])}), array([1]), TimeStep(
{'discount': array([1.], dtype=float32),
 'observation': array([[ 0.00197649, -0.02581661,  0.03656013,  0.04524551]],
      dtype=float32),
 'reward': array([1.], dtype=float32),
 'step_type': array([1])})]
[TimeStep(
{'discount': array([1.], dtype=float32),
 'observation': array([[ 0.00197649, -0.02581661, 

In [12]:
env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

time_step = tf_env.reset()
rewards = []
steps = []
num_episodes = 5

for _ in range(num_episodes):
  episode_reward = 0
  episode_steps = 0
  while not time_step.is_last():
    action = tf.random.uniform([1], 0, 2, dtype=tf.int32)
    time_step = tf_env.step(action)
    episode_steps += 1
    episode_reward += time_step.reward.numpy()
  rewards.append(episode_reward)
  steps.append(episode_steps)
  time_step = tf_env.reset()

num_steps = np.sum(steps)
avg_length = np.mean(steps)
avg_reward = np.mean(rewards)

print('num_episodes:', num_episodes, 'num_steps:', num_steps)
print('avg_length', avg_length, 'avg_reward:', avg_reward)

num_episodes: 5 num_steps: 107
avg_length 21.4 avg_reward: 21.4
