# Week 5 - AI Lab

Author: Khushee Kapoor

Registration Number: 200968052


## Setting Up

To set up, we first install tf-agents using pip

In [None]:
# installing tf-agents
!pip install tf-agents

Next, we import the following libraries:
- abc: to create abstract classes
- numpy: for computations
- tensorflow: to create environments

In [2]:
# importing libraries
import abc
import numpy as np
import tensorflow as tf

After that, we import the packagaes, classes, and functions required to create the environments

In [3]:
# importing packages, classes, and functions to create the environments
from tf_agents.agents import tf_agent
from tf_agents.drivers import driver
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.policies import tf_policy
from tf_agents.specs import array_spec
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.trajectories import trajectory
from tf_agents.trajectories import policy_step
nest = tf.nest

To create the custom environments, first we create the BanditPyEnvironment which the custom environments will inherit.

The BanditPyEnvironment class inherits from the py_environment.PyEnvironment class, which is a base class provided by TF-Agents for implementing custom environments. The __ init __ method initializes the observation and action specifications of the environment.

The action_spec and observation_spec methods return the respective specifications for the environment. The _empty_observation method returns an empty observation with the same shape and data type as the observation specification.

The _reset and _step methods are defined by the py_environment.PyEnvironment class and should not be overridden by subclasses. _reset returns a time step containing an observation, and _step returns a time step containing the reward for the action taken. The _observe and _apply_action methods are abstract methods that must be implemented in subclasses.

The _observe method returns the current observation of the environment. The _apply_action method applies the given action to the environment and returns the corresponding reward.

Overall, this class provides a framework for defining a custom reinforcement learning environment that can be used with TF-Agents. Subclasses of this class can implement their own _observe and _apply_action methods to define the specific behavior of the environment.

In [4]:
# creating the MultiArmedBandit Environment
class BanditPyEnvironment(py_environment.PyEnvironment):

  # init function to initialize the observation and action specifications
  def __init__(self, observation_spec, action_spec):
    self._observation_spec = observation_spec
    self._action_spec = action_spec
    super(BanditPyEnvironment, self).__init__()

  # intialize the action specification
  def action_spec(self):
    return self._action_spec

  # initialize the observation specification
  def observation_spec(self):
    return self._observation_spec

  # returns an empty observation
  def _empty_observation(self):
    return tf.nest.map_structure(lambda x: np.zeros(x.shape, x.dtype),
                                 self.observation_spec())

  # returns a time step containing an observation
  def _reset(self):
    return ts.restart(self._observe(), batch_size=self.batch_size)

  # returns a time step containing the reward for the action taken
  def _step(self, action):
    reward = self._apply_action(action)
    return ts.termination(self._observe(), reward)

  # abstract method returns an observation
  @abc.abstractmethod
  def _observe(self):
    pass

  # applies action to the Environment and returns the corresponding reward
  @abc.abstractmethod
  def _apply_action(self, action):
    pass

## Exercise 1

To create the custom environment for Exercise 1, we create a subclass of the BanditPyEnvironment class defined previously. The Environment1 class represents a simple reinforcement learning environment in which the agent observes a random integer between -5 and 5 and chooses an action from the set {0, 1, 2}. The reward for each action is the product of the observation and the action value.

The __ init __ method initializes the observation and action specifications for this environment using the BoundedArraySpec class from TF-Agents. The observation specification is a 1D array with a single integer element between -5 and 5, while the action specification is a scalar integer between 0 and 2.

The _observe method generates a random observation within the range [-5, 5] and returns it as a 1D array. The _apply_action method takes an action as input, multiplies it by the current observation, and returns the result as the reward for the action.

Overall, this class provides a simple environment for testing reinforcement learning algorithms that operate on discrete actions and integer observations. Subclasses of this class can modify the observation and action specifications and implement their own _observe and _apply_action methods to define custom environments for their specific use case.

In [5]:
# creating a custom environment
class Environment1(BanditPyEnvironment):

  # intializing the actions and observations specifications
  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-5, maximum=5, name='observation')
    super(Environment1, self).__init__(observation_spec, action_spec)

  # generates a random observation
  def _observe(self):
    self._observation = np.random.randint(-5, 6, (1,), dtype='int32')
    return self._observation

  # returns the reward = action * observation
  def _apply_action(self, action):
    return action * self._observation

Next, we define a TensorFlow policy called SignPolicy, which is used to map observations to actions in an RL environment.

The __ init __ method initializes the observation and action specifications in the same way as the previous examples.

The _distribution method is not implemented in this code snippet. This method is used to define the probability distribution of actions given the current state of the environment. In this case, since the policy is deterministic, the method is not implemented.

The _variables method is also not implemented in this code snippet. This method is used to return the variables of the policy, which can be used to save or restore the policy. Since this policy does not have any variables, the method returns an empty tuple.

The _action method takes as input a time step, which contains the current observation of the environment, a policy state, and a seed. It computes the sign of the observation using tf.sign and casts it to an integer using tf.cast. It then adds 1 to the sign to obtain the action. Specifically, if the observation is negative, the action is 0, if it is positive, the action is 2, and if it is zero, the action is 1. Finally, it returns a PolicyStep object containing the action and the policy state.

Overall, this policy defines a simple mapping between observations and actions, which is based on the sign of the observation. The policy is deterministic, meaning that it always maps the same observation to the same action. The policy can be used to control the behavior of an agent in an RL environment.

In [6]:
# creating an optimal policy
class SignPolicy(tf_policy.TFPolicy):

  # initializing the observation, timestep, and action specifications
  def __init__(self):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-5, maximum=5)
    time_step_spec = ts.time_step_spec(observation_spec)

    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)

    super(SignPolicy, self).__init__(time_step_spec=time_step_spec,
                                     action_spec=action_spec)
    
  # return the probability distribution
  def _distribution(self, time_step):
    pass

  # to return variables of the policy
  def _variables(self):
    return ()

  # returns action and policy state
  def _action(self, time_step, policy_state, seed):
    observation_sign = tf.cast(tf.sign(time_step.observation[0]), dtype=tf.int32)
    action = observation_sign + 1
    return policy_step.PolicyStep(action, policy_state)

An instance of Environment1 is created and wrapped as a TensorFlow environment using tf_py_environment.TFPyEnvironment(). An instance of SignPolicy is also created.

Then, a for loop runs 50 iterations. In each iteration, the environment is reset using tf_environment.reset(), which returns a time_step object that contains the current observation. This time_step object is passed to the sign_policy object using sign_policy.action(current_time_step).action, which returns an action for the current observation. The action is then passed to the environment using tf_environment.step(action), which returns a time_step object that contains the new observation and the reward obtained from taking the action. The loop calculates the total reward by summing up the reward earned in each iteration.

In [7]:
# creating instance of environment
environment = Environment1()
tf_environment = tf_py_environment.TFPyEnvironment(environment)

# creating instance of policy
sign_policy = SignPolicy()

# calculating total reward for 50 observations
total_reward = 0
for i in range(50):
  current_time_step = tf_environment.reset()
  action = sign_policy.action(current_time_step).action
  reward = tf_environment.step(action).reward
  total_reward += reward

print('Total Reward: ', total_reward)

Total Reward:  tf.Tensor([[136.]], shape=(1, 1), dtype=float32)


As we can see, the total reward is 136.

## Exercise 2

To create a custom environment for Exercise 2, we define a custom environment class called Environment2, which is a subclass of BanditPyEnvironment. It defines an environment where the reward for taking an action depends on the action and the current observation.

The __ init __ method initializes the environment by defining the action_spec and observation_spec. The observation_spec is a bounded array of shape (1,) with integer dtype and minimum value of -5 and maximum value of 5. The action_spec is a bounded array of shape () with integer dtype and minimum value of 0 and maximum value of 2.

The _observe method generates a random observation within the range of -5 to 5 (inclusive) and returns it.

The _apply_action method applies the action taken by the agent to the environment and returns the corresponding reward. The reward is computed by multiplying the observation with the action and a random reward sign generated during initialization. The reward sign is set to be either -1 or 1 with equal probability.

Finally, a Environment2 instance is wrapped in a TensorFlow environment using tf_py_environment.TFPyEnvironment and assigned to the variable two_way_tf_environment.

In [8]:
# creating a custom environment
class Environment2(BanditPyEnvironment):

  # intializing the actions, observations specifications, and reward
  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-5, maximum=5, name='observation')

    self._reward_sign = 2 * np.random.randint(2) - 1
    print("reward sign:")
    print(self._reward_sign)

    super(Environment2, self).__init__(observation_spec, action_spec)

  # generating a random observation
  def _observe(self):
    self._observation = np.random.randint(-5, 6, (1,), dtype='int32')
    return self._observation

  # returns reward = action * observation * reward sign
  def _apply_action(self, action):
    return self._reward_sign * action * self._observation[0]

# creating instance of environment
two_way_tf_environment = tf_py_environment.TFPyEnvironment(Environment2())

reward sign:
-1


As we can see, the environment has been assigned the negative sign by sheer probability.

We define a custom policy class called TwoWaySignPolicy that extends tf_policy.TFPolicy. This policy class takes in a situation parameter, which represents the current situation in the environment. The __ init __ method sets up the observation and action specs, as well as the time step spec. It also sets the situation attribute.

The _distribution and _variables methods do not do anything in this implementation, and simply return None and the situation attribute, respectively.

The _action method is the most important method in this policy. It takes in the current time step, policy state, and random seed, and returns an action. The action is determined based on the situation attribute and the sign of the observation. If the situation is 0, the policy chooses action 1 (unknown). If the situation is 1 and the observation is positive, the policy chooses action 2 (positive). If the situation is 1 and the observation is negative, the policy chooses action 0 (negative). If the situation is 2, the policy chooses the opposite action to what it would choose in situation 1.

Overall, this policy is designed to handle a two-way bandit environment where the reward sign can be flipped. It takes into account the current situation and the sign of the observation to make decisions about which action to take.

In [9]:
# creating a custom policy
class TwoWaySignPolicy(tf_policy.TFPolicy):

  # initializing the observation, action, timestep specifications
  def __init__(self, situation):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-5, maximum=5)
    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)
    time_step_spec = ts.time_step_spec(observation_spec)
    self._situation = situation
    super(TwoWaySignPolicy, self).__init__(time_step_spec=time_step_spec,
                                           action_spec=action_spec)
  # returns the probability distribution
  def _distribution(self, time_step):
    pass

  # returns the variables
  def _variables(self):
    return [self._situation]

  # determines and returns the action based on the sign
  def _action(self, time_step, policy_state, seed):
    sign = tf.cast(tf.sign(time_step.observation[0, 0]), dtype=tf.int32)
    def case_unknown_fn():
      return tf.constant(1, shape=(1,))
    def case_normal_fn():
      return tf.constant(sign + 1, shape=(1,))
    def case_flipped_fn():
      return tf.constant(1 - sign, shape=(1,))
    cases = [(tf.equal(self._situation, 0), case_unknown_fn),
             (tf.equal(self._situation, 1), case_normal_fn),
             (tf.equal(self._situation, 2), case_flipped_fn)]
    action = tf.case(cases, exclusive=True)
    return policy_step.PolicyStep(action, policy_state)

Next, we define a reinforcement learning agent called SignAgent which inherits from the TFAgent class. The agent uses a policy called TwoWaySignPolicy to choose actions based on observations from the environment. The TwoWaySignPolicy class is initialized with a situation variable that is used to determine the behavior of the policy.

The SignAgent constructor initializes the TwoWaySignPolicy policy and sets the time_step_spec and action_spec to match the policy's specs. It also sets the policy, collect_policy, and train_sequence_length attributes by calling the parent class constructor.

The _initialize method initializes the variables of the SignAgent.

The _train method updates the SignAgent by training it on a batch of experience. The method checks if the current situation requires action and updates the situation accordingly using a conditional statement. Finally, the method returns a LossInfo object with empty tensors for the loss and the regularization loss.

In [10]:
# creating an agent
class SignAgent(tf_agent.TFAgent):
  def __init__(self):
    self._situation = tf.Variable(0, dtype=tf.int32)
    policy = TwoWaySignPolicy(self._situation)
    time_step_spec = policy.time_step_spec
    action_spec = policy.action_spec
    super(SignAgent, self).__init__(time_step_spec=time_step_spec,
                                    action_spec=action_spec,
                                    policy=policy,
                                    collect_policy=policy,
                                    train_sequence_length=None)

  # intializing the variables of the agent
  def _initialize(self):
    return tf.compat.v1.variables_initializer(self.variables)

  # updating the agent by training it
  def _train(self, experience, weights=None):
    observation = experience.observation
    action = experience.action
    reward = experience.reward
    needs_action = tf.logical_and(tf.equal(self._situation, 0),
                                  tf.not_equal(reward, 0))

    def new_situation_fn():
      return (3 - tf.sign(tf.cast(observation[0, 0, 0], dtype=tf.int32) *
                          tf.cast(action[0, 0], dtype=tf.int32) *
                          tf.cast(reward[0, 0], dtype=tf.int32))) / 2

    new_situation = tf.cond(needs_action,
                            new_situation_fn,
                            lambda: self._situation)
    new_situation = tf.cast(new_situation, tf.int32)
    tf.compat.v1.assign(self._situation, new_situation)
    return tf_agent.LossInfo((), ())

sign_agent = SignAgent()

The below function trajectory_for_bandit creates a Trajectory object for a bandit problem given an initial time step, an action time step, and a final time step.

The Trajectory object is a container for the different elements that make up a sequence of observations, actions, rewards, and other information during an interaction with an environment. In this case, the Trajectory object is created with the following arguments:

- observation: A tensor of shape (1, 1) representing the initial observation.
- action: A tensor of shape (1,) representing the action taken at the action step.
- policy_info: A dictionary containing any additional information about the action taken, if available.
- reward: A tensor of shape (1,) representing the reward received at the final step.
- discount: A tensor of shape (1,) representing the discount factor at the final step.
- step_type: A tensor of shape (1,) representing the type of the initial step (i.e., whether it is a FIRST or MID step).
- next_step_type: A tensor of shape (1,) representing the type of the final step (i.e., whether it is a MID or LAST step).

By creating a Trajectory object, the function creates a container that can be used to store and manipulate the data associated with a single interaction with the environment. This can be useful for constructing datasets for training reinforcement learning agents.

In [11]:
# returns a trajectory for the bandit
def trajectory_for_bandit(initial_step, action_step, final_step):
  return trajectory.Trajectory(observation=tf.expand_dims(initial_step.observation, 0),
                               action=tf.expand_dims(action_step.action, 0),
                               policy_info=action_step.info,
                               reward=tf.expand_dims(final_step.reward, 0),
                               discount=tf.expand_dims(final_step.discount, 0),
                               step_type=tf.expand_dims(initial_step.step_type, 0),
                               next_step_type=tf.expand_dims(final_step.step_type, 0))

To test out the enivronment, we simulate the interaction between the SignAgent and the Environment2 for 10 steps.

- The initial step is obtained by resetting the environment, and stored in the variable step.
- The agent's collect_policy is used to select an action based on the current step.
- The action is taken in the environment using the step function, and the resulting next_step is obtained.
- A Trajectory object is created using the current step, the selected action, and the resulting next_step.
- The agent's train method is called with the Trajectory object as input.
- The next_step becomes the new step, and the process repeats from step 2 for the next iteration.

The purpose of this code is to train the SignAgent to learn a policy for selecting actions in the Environment2. The experience generated from each interaction is used to update the agent's policy.

In [12]:
# simulating the trajectory of the agent in the environment
step = two_way_tf_environment.reset()
for _ in range(10):
  action_step = sign_agent.collect_policy.action(step)
  next_step = two_way_tf_environment.step(action_step.action)
  experience = trajectory_for_bandit(step, action_step, next_step)
  print(experience)
  sign_agent.train(experience)
  step = next_step

Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-5]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[5.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[-1]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1)