### **Checking for tf-agents installation**

In [58]:
!pip install tf-agents

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### **Importing the required libraries**

In [59]:
import numpy as np
import tensorflow as tf

import abc

from tf_agents.agents import tf_agent
from tf_agents.drivers import driver
from tf_agents.bandits.environments.bandit_py_environment import BanditPyEnvironment
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.policies import tf_policy
from tf_agents.specs import array_spec
from tf_agents.specs import tensor_spec
from tf_agents.trajectories import time_step as ts
from tf_agents.trajectories import trajectory
from tf_agents.trajectories import policy_step

nest = tf.nest

### **Simple Environment :**

- **a)**  which  the  observation  is  a  random  integer  between -5  and  5,  there  are  3 possible actions (0, 1, 2), and the reward is the product of the action and the observation.

- **b)** Define an optimal policy manually. The action only depends on the sign of the observation, 0 when is negative and 2 when is positive.

- **c)** Request  for  50  observations  from  the  environment,  compute  and  print  the total reward.

> **Creating the environment**

In [60]:
class CustomBanditEnvironment(BanditPyEnvironment):

  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-5, maximum=5, name='observation')
    super(CustomBanditEnvironment, self).__init__(observation_spec, action_spec)

  def _observe(self):
    self._observation = np.random.randint(-5, 6, (1,), dtype='int32')
    return self._observation

  def _apply_action(self, action):
    return action * self._observation

In [61]:
environment = CustomBanditEnvironment()
observation = environment.reset().observation
print("observation: %d" % observation)

action = 2

print("action: %d" % action)
reward = environment.step(action).reward
print("reward: %f" % reward)

observation: -2
action: 2
reward: -4.000000


> **Wrapping the environment into a TensorFlow environment**

In [62]:
tf_environment = tf_py_environment.TFPyEnvironment(environment)

> **Designing an optimal policy manually**

In [63]:
class SignPolicy(tf_policy.TFPolicy):
  def __init__(self):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-5, maximum=5)
    time_step_spec = ts.time_step_spec(observation_spec)

    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)

    super(SignPolicy, self).__init__(time_step_spec=time_step_spec,
                                     action_spec=action_spec)
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return ()

  def _action(self, time_step, policy_state, seed):
    observation_sign = tf.cast(tf.sign(time_step.observation[0]), dtype=tf.int32)
    action = observation_sign + 1
    return policy_step.PolicyStep(action, policy_state)

> **Computing Total Rewards based on 50 observations**

In [71]:
sign_policy = SignPolicy()

current_time_step = tf_environment.reset()
reward = []
for _ in range(50):
  
  print ('Observation : %d' % current_time_step.observation)
  action = sign_policy.action(current_time_step).action
  print('Action: %d' % action)
  reward.append((tf_environment.step(action).reward).numpy()[0][0])

print('\nTotal Reward : ')
print(np.sum(reward))

Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation : 5
Action: 2
Observation 

### **Complex Environment :**
- **a)** Define an environment will either always give reward = observation * action or reward = -observation * action. This will be decided when the environment is initialized.

- **b)** Define a policy that detects the behaviorof the underlying environment. There are three situations that the policy needs to handle:
 - i. The agent has not detected know yet which version of the environment is running.

 - ii. The  agent  detected  that  the  original  version  of  the  environment  is running.

 - iii. The  agent  detected  that  the  flipped  version  of  the  environment  is running.

- **c)** Define the agent that detects the sign of the environment and sets the policy appropriately.

>**Creating the environment**

In [72]:
class TwoWayPyEnvironment(BanditPyEnvironment):

  def __init__(self):
    action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=-2, maximum=2, name='observation')

    # Flipping the sign with probability 1/2.
    self._reward_sign = 2 * np.random.randint(2) - 1
    print("reward sign:")
    print(self._reward_sign)

    super(TwoWayPyEnvironment, self).__init__(observation_spec, action_spec)

  def _observe(self):
    self._observation = np.random.randint(-2, 3, (1,), dtype='int32')
    return self._observation

  def _apply_action(self, action):
    return self._reward_sign * action * self._observation[0]

two_way_tf_environment = tf_py_environment.TFPyEnvironment(TwoWayPyEnvironment())

reward sign:
1


> **Defining a policy that detects the underlying behaviour of the environment**

In [73]:
class TwoWaySignPolicy(tf_policy.TFPolicy):
  def __init__(self, situation):
    observation_spec = tensor_spec.BoundedTensorSpec(
        shape=(1,), dtype=tf.int32, minimum=-2, maximum=2)
    action_spec = tensor_spec.BoundedTensorSpec(
        shape=(), dtype=tf.int32, minimum=0, maximum=2)
    time_step_spec = ts.time_step_spec(observation_spec)
    self._situation = situation
    super(TwoWaySignPolicy, self).__init__(time_step_spec=time_step_spec,
                                           action_spec=action_spec)
  def _distribution(self, time_step):
    pass

  def _variables(self):
    return [self._situation]

  def _action(self, time_step, policy_state, seed):
    sign = tf.cast(tf.sign(time_step.observation[0, 0]), dtype=tf.int32)
    def case_unknown_fn():
      # Choose 1 so that we get information on the sign.
      return tf.constant(1, shape=(1,))

    # Choose 0 or 2, depending on the situation and the sign of the observation.
    def case_normal_fn():
      return tf.constant(sign + 1, shape=(1,))
    def case_flipped_fn():
      return tf.constant(1 - sign, shape=(1,))

    cases = [(tf.equal(self._situation, 0), case_unknown_fn),
             (tf.equal(self._situation, 1), case_normal_fn),
             (tf.equal(self._situation, 2), case_flipped_fn)]
    action = tf.case(cases, exclusive=True)
    return policy_step.PolicyStep(action, policy_state)

> **Creating the Agent**

In [74]:
class SignAgent(tf_agent.TFAgent):
  def __init__(self):
    self._situation = tf.Variable(0, dtype=tf.int32)
    policy = TwoWaySignPolicy(self._situation)
    time_step_spec = policy.time_step_spec
    action_spec = policy.action_spec
    super(SignAgent, self).__init__(time_step_spec=time_step_spec,
                                    action_spec=action_spec,
                                    policy=policy,
                                    collect_policy=policy,
                                    train_sequence_length=None)

  def _initialize(self):
    return tf.compat.v1.variables_initializer(self.variables)

  def _train(self, experience, weights=None):
    observation = experience.observation
    action = experience.action
    reward = experience.reward

    # We only need to change the value of the situation variable if it is
    # unknown (0) right now, and we can infer the situation only if the
    # observation is not 0.
    needs_action = tf.logical_and(tf.equal(self._situation, 0),
                                  tf.not_equal(reward, 0))


    def new_situation_fn():
      """This returns either 1 or 2, depending on the signs."""
      return (3 - tf.sign(tf.cast(observation[0, 0, 0], dtype=tf.int32) *
                          tf.cast(action[0, 0], dtype=tf.int32) *
                          tf.cast(reward[0, 0], dtype=tf.int32))) / 2

    new_situation = tf.cond(needs_action,
                            new_situation_fn,
                            lambda: self._situation)
    new_situation = tf.cast(new_situation, tf.int32)
    tf.compat.v1.assign(self._situation, new_situation)
    return tf_agent.LossInfo((), ())

sign_agent = SignAgent()

> **Defining the Trajectory**

In [75]:
# We need to add another dimension here because the agent expects the
# trajectory of shape [batch_size, time, ...], but in this tutorial we assume
# that both batch size and time are 1. Hence all the expand_dims.

def trajectory_for_bandit(initial_step, action_step, final_step):
  return trajectory.Trajectory(observation=tf.expand_dims(initial_step.observation, 0),
                               action=tf.expand_dims(action_step.action, 0),
                               policy_info=action_step.info,
                               reward=tf.expand_dims(final_step.reward, 0),
                               discount=tf.expand_dims(final_step.discount, 0),
                               step_type=tf.expand_dims(initial_step.step_type, 0),
                               next_step_type=tf.expand_dims(final_step.step_type, 0))

> **Training the agent to learn from experience over 50 timesteps**

In [76]:
step = two_way_tf_environment.reset()
for _ in range(10):
  action_step = sign_agent.collect_policy.action(step)
  next_step = two_way_tf_environment.step(action_step.action)
  experience = trajectory_for_bandit(step, action_step, next_step)
  print(experience)
  sign_agent.train(experience)
  step = next_step

Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[1]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[2.]], dtype=float32)>,
 'step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[0]], dtype=int32)>})
Trajectory(
{'action': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'discount': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.]], dtype=float32)>,
 'next_step_type': <tf.Tensor: shape=(1, 1), dtype=int32, numpy=array([[2]], dtype=int32)>,
 'observation': <tf.Tensor: shape=(1, 1, 1), dtype=int32, numpy=array([[[2]]], dtype=int32)>,
 'policy_info': (),
 'reward': <tf.Tensor: shape=(1, 1), 