# <div align="left">**`agentflow` tutorial**</div>
# <div align="left">[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/deepmind/dm_robotics/blob/main/py/agentflow/tutorial.ipynb)</div>

> <p><small><small>Copyright 2021 The dm_robotics Authors.</small></p>
> <p><small><small>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at <a href="http://www.apache.org/licenses/LICENSE-2.0">http://www.apache.org/licenses/LICENSE-2.0</a>.</small></small></p>
> <p><small><small>Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.</small></small></p>

# Preliminaries

To run this notebook you will have to install agentflow (possibly in a [virtualenv](https://docs.python.org/3/library/venv.html)), and then build and run a Jupyter notebook.

See [here](https://www.dataquest.io/blog/jupyter-notebook-tutorial/) for a tutorial on how to work with Jupyter notebooks, and [here](https://janakiev.com/blog/jupyter-virtual-envs/) for using virtualenv with Jupyter.


# Imports

In [None]:
import abc
from typing import Mapping, Optional

import dm_env
from dm_env import specs
from dm_robotics.agentflow import action_spaces
from dm_robotics.agentflow import core
from dm_robotics.agentflow import subtask
from dm_robotics.agentflow.loggers import print_logger
from dm_robotics.agentflow.loggers import subtask_logger
from dm_robotics.agentflow.meta_options.control_flow import cond
from dm_robotics.agentflow.meta_options.control_flow import loop_ops
from dm_robotics.agentflow.meta_options.control_flow import sequence
from dm_robotics.agentflow.meta_options.control_flow.examples import common
from dm_robotics.agentflow.rendering import graphviz_renderer
from dm_robotics.agentflow.rendering import intermediate
from dm_robotics.agentflow.rendering import subgraph

import numpy as np
from IPython.display import Image
from pprint import pprint

# Introduction

`AgentFlow` is the task-specification framework for DeepMind Robotics. Fundamentally, it's a library for composing Reinforcement-Learning agents. The core features that AgentFlow provides are:

1.  tools for slicing, transforming, and composing *specs*.
2.  tools for encapsulating and composing *RL-tasks*.

Unlike the standard RL setup, which assumes a single environment and an agent,
`AgentFlow` is designed for the single-embodiment, multiple-task regime. This
was motivated by the robotics use-case, which frequently requires training RL
modules for various skills, and then composing them (possibly with non-learned
controllers too).

Instead of having to implement a separate RL environment for each skill and
combine them ad hoc, with `AgentFlow` you can define one or more `SubTasks`
which *modify* a timestep from a single top-level environment, e.g. adding
observations and defining rewards, or isolating a particular sub-system of the
environment, such as a robot arm.

You then *compose* SubTasks with regular RL-agents to form modules, and use a
set of graph-building operators to define the flow of these modules over time
(hence the name `AgentFlow`).

The graph-building step is entirely optional, and is intended only for use-cases
that require something like a (possibly learnable, possibly stochastic)
state-machine.



# Core Concepts
This colab introduces the core concepts in AgentFlow, and illustrates the two common ways to use AgentFlow to build a robotics experiment loop.  

## Policies and Options

A Policy is the base-class for all agents in AgentFlow. Its primary method is:

```python
def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
  ...
```

All other methods on `Policy` are for book-keeping and visualization.

*** 

An `Option` is a `Policy` which can also:

*   Decide if it's eligible to stop (expressed via a termination-probability `pterm`)
*   Consume a runtime-argument (E.g. a pose to reach towards)
*   Produce a result (`OptionResult`)

These enable it to be used within an AgentFlow graph by other `MetaOptions`,
which all know about the basic life-cycle semantics of options. E.g. `af.While` will repeat a given option until the condition is `False`, or the option's `pterm` samples to `True`.  Composition of multiple agentflow Options will be discussed below.

The name "Option" is inspired from the seminal work Sutton, Precup, & Singh on temporal abstraction in Reinforcement Learning:

`Sutton, Richard S., Doina Precup, and Satinder Singh. "Between MDPs and semi-MDPs: A framework for temporal abstraction in reinforcement learning." Artificial intelligence 112.1-2 (1999): 181-211.`

AgentFlow's `Option` is compatible with this notion of option*, but agentflow does not implement any learning algorithms. Instead it should be thought of as an API abstraction that allows expressing various algorithms involving options, along with tools for composing them to solve larger tasks.

*Modulo the omission of a `pinit` method defining the initiation-set, which may be added in the future.

## SubTask

`SubTask` is the AgentFlow mechanism for defing an RL problem *in-situ*,
i.e. in the context of a larger problem on the same underlying embodiment. 

The `SubTask` defines everything that an `Option` needs to run, except the policy itself. This decoupling of option-methods from policy-methods allows a subtask to train an arbitrary RL agent, which doesn't have to know it's actually living
in an AgentFlow graph. From the agent's perspective, it just wakes up in a particular state and runs an episode until seeing a LAST timestep signifying the end of episode.

**Specs.** Like a regular RL-task, `SubTask` must  define observation and action specs that define the task for the agent that it runs against. There is also an `arg_spec` mechanism on `SubTask` which defines a protocol for passing signals to children via the observation, but this is outside the scope of this colab (details can be found [here](https://github.com/deepmind/dm_robotics/blob/main/py/agentflow/docs/components.md)).

**Timestep transformation.** Rather than having to generate observations from
scratch, a `SubTask` receives the timestep from the parent (or base
environment), and modifies it via `parent_to_agent_timestep` to suit the needs
of the child `Policy` it is training.

After stepping the policy with this timestep, it post-processes the policy's
action with `agent_to_parent_action` to comply with the action_spec of the
context in which the `SubTask` is defined.

A SubTask can be learned or engineered, or a mixture of both. E.g. in the
[DPGfD](https://sites.google.com/corp/view/dpgfd-insertion/home) paper we used a
SubTask that had a learned CNN-based reward classifier, injected bottle-neck
activations from this model for visual features (both via
`TimestepPreprocessors`), and a hand-defined cartesian-velocity action space.

**Composition.**
A `SubTask` and `Policy` are then composed via `SubTaskOption` into an option that can be used like a regular `Option` in an AgentFlow graph (see below).

**A note on logging.** An agentflow agent typically includes one more `SubTaskOption`s, each of which define an RL problem. This top-level agent is run against a ["base" environment](https://github.com/deepmind/dm_robotics/blob/main/py/moma/base_task.py) which exposes raw sensor observations but no rewards or termination. 

There are important implications of this design choice in how data are logged. Agentflow defines data-logging at the *subtask level* via `SubTaskLogger`, which logs the *raw sensor data* that the subtask sees, i.e. `parent_timestep`, along with the action returned by the `Policy`.
This allows researchers to iterate on the subtask and agent, e.g. changing the reward function, features, etc, all without changing the underlying dataset.  This has proven to be a powerful feature, in particular for offline-RL workflows.


## SubTaskOption

`SubTaskOption` is a container that combines a `SubTask` and a `Policy` into a full-fledged `Option` that can be run in an AgentFlow graph.

Given a subtask and a policy, a SubTaskOption can be built and used as follows:
```python
learned_skill = SubTaskOption(subtask, agent)
Sequence([scripted_action, learned_skill, ...])
```

In this example we used `Sequence` to define a linear sequence of options. Control-flow over options will be discussed in greater detail below.

## Action Spaces

A `SubTask` allows the action space for the policy it interacts with to be different than the action space of the parent task. This is achieved by mapping the action an agent returns to it to an action that the larger task (environment) can consume.

In AgentFlow the role of action-mapping in a SubTask is handled by an `ActionSpace`. This class has a `project` method which works as follows:

`parent_action = space.project(child_action)`.

AgentFlow comes with a small but powerful [action_space library](https://github.com/deepmind/dm_robotics/blob/main/py/agentflow/action_spaces.py) that's designed to work with action_specs that contain tab-delimited `name` field listing all the actuated-DOF (as we get from [Composer](https://github.com/deepmind/dm_control/tree/master/dm_control/composer)).  These actions can be sliced and recombined using `prefix_slicer` and `CompositeActionSpace`.

In [None]:
full_spec = specs.BoundedArray(
    shape=(7,),
    minimum=[-1] * 7,
    maximum=[1] * 7,
    dtype=np.float32,
    name='arm/l0\tarm/l1\tarm/l2\tarm/l3\tarm/l4\tarm/l5\tgripper')

# Define a sub-space for the arm.
arm_space = action_spaces.prefix_slicer(full_spec, prefix='arm')

# The spec for this space is just 6DOF.
print(f'Action spec for just the arm:\n\t{arm_space.spec()}')

# We can take a 6D action satisfying this spec, and project it to the original
# space. Note the NaN where the gripper action is missing.
arm_action = np.ones(6) * 0.2
arm_action_in_full_space = arm_space.project(arm_action)
print(f'Arm action in full space: {arm_action_in_full_space}')

# Now let's define another sub-space for the gripper.
gripper_space = action_spaces.prefix_slicer(full_spec, prefix='gripper')
# Note the gripper's action_spec is just 1-DOF
print(f'Gripper action shape: {gripper_space.spec().shape}')

# We can now project the gripper action in the full space. Note that it is
# properly aligned at the end of the array, padded by NaNs for the arm dims.
gripper_action = np.ones(1) * 0.5
gripper_action_in_full_space = gripper_space.project(gripper_action)
print(f'Gripper action in full space: {gripper_action_in_full_space}')

# How do we put these together? That's where `CompositeActionSpace` comes in.
# It's an action space that composes action spaces of equal shape, using `NaN`
# as a mask.
composite_space = action_spaces.CompositeActionSpace(
    [arm_space, gripper_space],  # Order of spaces determines order of actions.
    'full_action_space')
full_action = composite_space.project(
    np.concatenate((arm_action, gripper_action)).astype(np.float32))
print(f'Full action: {full_action}')


## Parameterized SubTask

The `SubTask` API has a single method `parent_to_agent_timestep` for transforming an observation from the parent (or environment) into an action for the agent.  

This is sufficient for defining a workflow for one or more agents, but can often lead to a proliferation of subtasks to handle small changes in the observation or reward.  AgentFlow therefore includes a special subtask called `ParameterizedSubtask`, which allows users to build subtasks out of simpler, reusable components.

### Timestep Preprocessors
"Timestep Preprocessor" is a mechanism for defining timestep-transformations via a chain of simpler timestep-transformations. AgentFlow comes with a library of useful preprocessors, and new ones can be added by subclassing `TimestepPreprocessor`

In conjunction with `SequentialActionSpace` (which defines the same concept for actions), we can build a rich space of subtasks without ever subclassing `agentflow.ParameterizedSubTask`.

## Control-Flow

One of the main design requirements of AgentFlow is the ability to define multiple `Options`, all of which adhere to the `dm_env` API, and then compose them to produce a larger agent.  

In AgentFlow this is achieved by a "MetaOption", which is simply an option that can control the execution of one or more children.

AgentFlow ships with a small but expressive set of control-flow "operators" including `Cond`, `Sequence`, `While`, and `Repeat`, which can be used to build [behavior-tree](https://en.wikipedia.org/wiki/Behavior_tree_(artificial_intelligence,_robotics_and_control))-like agents. It is also possible to define more general state-machine `MetaOption`s, but these are not included in the core library.

### Example
Let's say we're modeling an insertion problem, and we have a learned `SubTaskOption` which we only want to run if we're close to the socket.  We can define a `Cond` which branches on this condition, and calls the agent only if the predicate `near_socket` is true:
```python
reach_or_insert_op = af.Cond(
    cond=near_socket,
    true_branch=learned_insert_option,
    false_branch=reach_option,
    name='Reach or Insert')
```
This option will terminate when whichever branch is selected terminates.

Let's say we then want to try this option 5 times.  We can achieve this by using a `Repeat` option:
```python
reach_and_insert_5x = af.Repeat(
    5, reach_or_insert_op, name='Retry Loop')
```

Finally, we can compose this with a scripted reset policy and define our overall run-loop:
```python
loop_body = af.Sequence([
    scripted_reset,
    reach_and_insert_5x,
    af.Cond(
        cond=last_option_successful,
        true_branch=extract_option,
        false_branch=recovery_option,
        name='post-insert')
])
main_loop = af.While(lambda _: True, loop_body)
```

This option can be run against a "base" RL environment, which typically provides raw sensor observations but doesn't otherwise define rewards or reset, which is instead handled by our `SubTask`. 


## Graph Rendering

AgentFlow ships with a small graphviz-based rendering library for visualizing agent-graphs.

The cell below visualizes the control-flow example above.  For this example to work pydot must be installed on the notebook machine.

In [None]:
# @title {form-width: "25%"}

def near_socket(unused_timestep: dm_env.TimeStep,
                unused_result: Optional[core.OptionResult]) -> bool:
  return False


def last_option_successful(unused_timestep: dm_env.TimeStep,
                           result: core.OptionResult):
  return result.termination_reason == core.TerminationType.SUCCESS

env = common.DummyEnv()

# Define a subtask that exposes the desired RL-environment view on `base_task`
my_subtask = common.DummySubTask(env.observation_spec(), 'Insertion SubTask')

# Define a regular RL agent against this task-spec.
my_policy = common.DummyPolicy(my_subtask.action_spec(),
                                my_subtask.observation_spec(), 'My Policy')

# Compose the policy and subtask to form an Option module.
learned_insert_option = subtask.SubTaskOption(
    my_subtask, my_policy, name='Learned Insertion')

reach_option = common.DummyOption(env.action_spec(), env.observation_spec(),
                                  'Reach for Socket')
scripted_reset = common.DummyOption(env.action_spec(), env.observation_spec(),
                                    'Scripted Reset')
extract_option = common.DummyOption(env.action_spec(), env.observation_spec(),
                                    'Extract')
recovery_option = common.DummyOption(env.action_spec(),
                                      env.observation_spec(), 'Recover')

# Use some AgentFlow operators to embed the agent in a bigger agent.
# First use Cond to op run learned-agent if sufficiently close.
reach_or_insert_op = cond.Cond(
    cond=near_socket,
    true_branch=learned_insert_option,
    false_branch=reach_option,
    name='Reach or Insert')

# Loop the insert-or-reach option 5 times.
reach_and_insert_5x = loop_ops.Repeat(
    5, reach_or_insert_op, name='Retry Loop')

loop_body = sequence.Sequence([
    scripted_reset,
    reach_and_insert_5x,
    cond.Cond(
        cond=last_option_successful,
        true_branch=extract_option,
        false_branch=recovery_option,
        name='post-insert')
])
main_loop = loop_ops.While(lambda _: True, loop_body)

# Render the agentflow graph
render_graph = False  # @param {type: "boolean"}
if render_graph:
  # `dot` must be installed on host machine for this to work.
  graph = intermediate.render(main_loop)
  graphviz_graph = graphviz_renderer.render(graph)
  Image(graphviz_renderer.to_png(graphviz_graph))

# Direct-Dispatch Example

This example demonstrates a way to build scripted and learned AgentFlow components that directly wrap aribitrary python callables for communicating with an *external* system (e.g. a robot). 

The primary advantage of this workflow, vs. the environment-dispatch below, is simplicity and modularity.  It's entirely up to the user how to obtain observation and dispatch actions, and all
data is local to the individual agentflow modules (Policy, Option).

For the alternative "environment-dispatch" model see `environment_dispatch_workflow.py`

High level workflow:
1. Implement stubs for receiving observation and sending actions.
1. Create an AgentFlow policy that generates valid actions, e.g. from a neural-net.
1. Create a subtask that holds the agent and the I/O callbacks, and allows us to attach a logging observer to record data.
1. Create an logging observer and attach to agent.
1. Create an AgentFlow `Option` implementing the desired reset behaviour.
1. Create a run loop and go.

Notes:
  * This workflow is blocking iff the observation or action stubs block.  For an
     RPC-style interface consider dm_env_rpc (useful if env and agent live in
    different processes) or a custom-RPC service.
  * The `ActionCallback` currently lives in the environment, but it could easily
    be moved closer to the agent, e.g. in the SubTask (as an ActionSpace) or the
    Policy itself.

## I/O Stubs
We begin by defining callbacks which send actions and receive sensor observations. These are stubs in the example, but are where a user would typically interface with the robot.

In [None]:
def observation_update_stub() -> np.ndarray:
  observation = np.random.rand(4)
  print(f"observation_update_stub called! Returning observation {observation}")
  return observation


def send_action_stub(action: np.ndarray) -> None:
  print(f"send_action_stub called with {action}!")

## Callbacks

Here we wrap these callbacks with types to associate specs with the input and output.

In [None]:
class ObservationCallback(abc.ABC):
  """Base class for observation-callbacks which pull data from the world."""

  @abc.abstractmethod
  def __call__(self) -> Mapping[str, np.ndarray]:
    pass

  @abc.abstractmethod
  def observation_spec(self) -> Mapping[str, specs.Array]:
    pass


class ActionCallback(abc.ABC):
  """Base class for action-callbacks which send actions to the world."""

  @abc.abstractmethod
  def __call__(self, action: np.ndarray) -> None:
    pass

  @abc.abstractmethod
  def action_spec(self) -> specs.BoundedArray:
    pass


class ExampleObservationUpdater(ObservationCallback):
  """Example Observation-Update callback."""

  def __call__(self) -> Mapping[str, np.ndarray]:
    return {"stub_observation": observation_update_stub()}

  def observation_spec(self) -> Mapping[str, specs.Array]:
    return {
        "stub_observation":
            specs.Array((4,), dtype=np.float64, name="stub_observation")
    }


class ExampleActionSender(ActionCallback):
  """Example SendAction callback."""

  def __call__(self, action: np.ndarray) -> None:
    send_action_stub(action)

  def action_spec(self) -> specs.BoundedArray:
    return specs.BoundedArray((2,),
                              dtype=np.float64,
                              minimum=-np.ones(2),
                              maximum=np.ones(2),
                              name="stub action")

## Defining a Scripted Behavior

Now that we have action and observation callbacks we are ready to build our first "agent".  This agent will be a scripted module that generates random actions, but the control logic should be whatever the task requires (e.g. a "reset" behavior).

In [None]:
class ExampleScriptedOption(core.Option):
  """Stub option for running scripted controller."""

  def __init__(self,
               observation_cb: ObservationCallback,
               action_cb: ActionCallback,
               name: str,
               max_steps: int):
    super().__init__(name)
    self._observation_cb = observation_cb
    self._action_cb = action_cb
    self._action_spec = action_cb.action_spec()
    self._max_steps = max_steps
    self._step_idx = 0

  def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
    # In the direct-dispatch case the timestep argument serves only to indicate
    # first/mid/last step.  The observation (and rewards & discount) should be
    # overridden by the user.
    obs = self._observation_cb()
    timestep = timestep._replace(observation=obs)

    if timestep.first():
      self._step_idx = 0

    self._step_idx += 1

    # Run controller.
    # << write scripted agent logic here >>
    action = np.random.rand(2).astype(self._action_spec.dtype)

    # Dispatch action.
    self._action_cb(action)

    return action  # Return value unused in direct-dispatch case.

  def pterm(self, timestep: dm_env.TimeStep) -> float:
    del timestep
    if self._step_idx >= self._max_steps:
      print(f"Terminating option because max_steps reached {self._step_idx}.")
      return 1.
    return 0.

  def result(self, unused_timestep: dm_env.TimeStep) -> core.OptionResult:
    return core.OptionResult(termination_reason=core.TerminationType.SUCCESS)



## Define a SubTask

We want to build a trainable policy next, but first we need to build a subtask for it.  You can think of a subtask as a modular version of `dm_env.Environment` -- like an Environment it determines the observations that the agent sees, and consumes its actions.  However, unlike an Environment, SubTasks *modify* observations from a higher-level environment or subtask, and *postprocess* actions to be returned to that environment.  By combining a subtask with a policy into a `SubTaskOption`, the policy can be used like any other agentflow Option.  In this fashion, users are free to build complex agents containing mixtures of scripted and learned policies, each of which see different parts of the observation space and control different parts of the action space.

In [None]:
class ExampleSubTask(subtask.SubTask):
  """A subtask that pulls state and sends actions directly via callbacks."""

  def __init__(self,
               observation_cb: ObservationCallback,
               action_cb: ActionCallback,
               max_steps: int):
    super().__init__()
    self._observation_cb = observation_cb
    self._action_cb = action_cb
    self._observation_spec = observation_cb.observation_spec()
    self._action_spec = action_cb.action_spec()
    self._max_steps = max_steps
    self._step_idx = 0.

  def observation_spec(self) -> Mapping[str, specs.Array]:
    """Defines the observation seen by agents for trained on this subtask."""
    return self._observation_spec

  def reward_spec(self) -> specs.Array:
    return specs.Array(shape=(), dtype=np.float64, name="reward")

  def discount_spec(self) -> specs.Array:
    return specs.BoundedArray(
        shape=(), dtype=np.float64, minimum=0., maximum=1., name="discount")

  def arg_spec(self) -> Optional[specs.Array]:
    """Defines the arg to be passed by the parent task during each step."""
    return  # This example doesn't use parameterized-options.

  def action_spec(self) -> specs.BoundedArray:
    """Defines the action spec seen by agents that run on this subtask."""
    return self._action_spec

  def agent_to_parent_action(self, agent_action: np.ndarray) -> np.ndarray:
    """Receives agent action and dispatches to the action callback."""
    self._action_cb(agent_action)
    return agent_action  # Return value unused in direct-dispatch case.

  def parent_to_agent_timestep(self, parent_timestep: dm_env.TimeStep,
                               own_arg_key: str) -> dm_env.TimeStep:
    """Pulls the latest observation and packs to timestep for the agent."""
    if parent_timestep.first():
      self._step_idx = 0.

    obs = self._observation_cb()
    agent_timestep = parent_timestep._replace(
        observation=obs, reward=self._step_idx, discount=1.)

    self._step_idx += 1
    return agent_timestep

  def pterm(self, parent_timestep: dm_env.TimeStep,
            own_arg_key: str) -> float:
    if self._step_idx >= self._max_steps:
      print(f"Terminating subtask because max_steps reached {self._step_idx}.")
      return 1.
    return 0.



## Define a Policy

A `Policy` is the lowest-level agent type in AgentFlow.  It only defines a `step` function, and has no notion of "pterm", "subtask", "option", etc.
A policy is typically where a user would implement a learning algorithm, e.g. an [acme](https://github.com/deepmind/acme) Actor that logs timesteps to replay and runs a Learner to update parameters.

This learner need not know its context in an agentflow graph.  From its perspective, it "wakes up" when its SubTaskOption is invoked, seeing a FIRST timestep, and runs until a LAST timestep is seen.

It is only by combing a `Policy` with a `SubTask` that we can build `Options` which can be composed using agentflow graph operators, e.g. `Cond`, `Sequence`, etc. 

In [None]:
class ExamplePolicy(core.Policy):
  """Stub policy for running learning machinery."""

  def __init__(self, action_spec: specs.BoundedArray, name: str):
    super().__init__(name)
    self._action_spec = action_spec

  def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
    return np.random.rand(2).astype(self._action_spec.dtype)

## Dummy Environment
Any RL workflow needs an environment, but in the direct-dispatch workflow it's rather superfluous. All the observations and actions are handled by callable that sit inside the agentflow graph.  This environment exists purely to satisfy the run-loop contract.

In [None]:
class DummyEnvironment(dm_env.Environment):
  """A dummy environment to use in a run-loop."""

  def reset(self) -> dm_env.TimeStep:
    """Returns the first `TimeStep` of a new episode."""
    return dm_env.restart({})

  def step(self, unused_action: np.ndarray) -> dm_env.TimeStep:
    """Updates the environment according to the action."""
    return dm_env.transition(0., {})

  def observation_spec(self) -> Mapping[str, specs.Array]:
    """Returns the observation spec."""
    return {}

  def action_spec(self) -> specs.Array:
    """Returns the action spec."""
    return specs.Array((), dtype=np.float64, name="dummy_action")

## Main Loop

We can finally build our graph and run it as follows:

In [None]:
# Stubs for pulling observation and sending action to some external system.
observation_cb = ExampleObservationUpdater()
action_cb = ExampleActionSender()

# Create an environment that forwards the observation and action calls.
env = DummyEnvironment()

# Stub policy that runs the desired agent.
policy = ExamplePolicy(action_cb.action_spec(), "agent")

# Wrap policy into an agent that logs to the terminal.
task = ExampleSubTask(observation_cb, action_cb, 10)
logger = print_logger.PrintLogger()
aggregator = subtask_logger.EpisodeReturnAggregator()
logging_observer = subtask_logger.SubTaskLogger(logger, aggregator)
agent = subtask.SubTaskOption(task, policy, [logging_observer])

reset_op = ExampleScriptedOption(observation_cb, action_cb, "reset", 3)
main_loop = loop_ops.Repeat(5, sequence.Sequence([reset_op, agent]))

# Run the episode.
timestep = env.reset()
while True:
  action = main_loop.step(timestep)
  timestep = env.step(action)

  # Terminate if the environment or main_loop requests it.
  if timestep.last() or (main_loop.pterm(timestep) > np.random.rand()):
    if not timestep.last():
      termination_timestep = timestep._replace(step_type=dm_env.StepType.LAST)
      main_loop.step(termination_timestep)
    break


# Environment-Dispatch Example


This example demonstrates a way to build an AgentFlow experiment that combine a scripted and RL policy.  In this workflow, all robot I/O is handled by the `Environment`, which is backed by arbitrary python
callables.  This workflow may be more familiar to users accustomed to dm_env[`dm_env`](https://github.com/deepmind/dm_env), and is the way [MoMa](https://github.com/deepmind/dm_robotics/tree/main/py/moma) is written.

The primary advantage of this workflow is that all I/O with the robot is
centralized in the `ProxyEnvironment`.  This can simplify logging and debugging,
and also allow for system designs in which agents can operate on the output of
other agents (by nesting).

For the alternative "direct-dispatch" model see `direct_dispatch_workflow.py`

High level steps:
1. Implement stubs for receiving state and sending actions.
2. Provide stubs to a proxy env that manages I/O.
3. Create an AgentFlow policy that generates valid actions, e.g. from a neural-network.
4. Create a no-op subtask to hold the agent so we can attach a logging observer.
5. Create an logging observer and attach to agent.
6. Create an AgentFlow `Option` implementing the desired reset behaviour.
7. Create a run loop and go.

Notes:
  * This workflow is blocking iff the state or action stubs block.  For an RPC-
    style interface consider dm_env_rpc (useful if env and agent live in
    different processes) or a custom-RPC service.
  * The `ActionCallback` currently lives in the environment, but it could easily
    be moved closer to the agent, e.g. in the SubTask (as an ActionSpace) or the
    Policy itself.

## Callbacks

The callbacks mechanism in this example is identical to the direct-dispatch example.  They differ in how the callbacks are used below.

In [None]:
class ObservationCallback(abc.ABC):
  """Base class for state-callbacks."""

  @abc.abstractmethod
  def __call__(self) -> Mapping[str, np.ndarray]:
    pass

  @abc.abstractmethod
  def observation_spec(self) -> Mapping[str, specs.Array]:
    pass


class ActionCallback(abc.ABC):
  """Base class for action-callbacks."""

  @abc.abstractmethod
  def __call__(self, action: np.ndarray) -> None:
    pass

  @abc.abstractmethod
  def action_spec(self) -> specs.BoundedArray:
    pass


def observation_update_stub() -> np.ndarray:
  observation = np.random.rand(4)
  print(f"observation_update_stub called! Returning observation {observation}")
  return observation


def send_action_stub(action: np.ndarray) -> None:
  print(f"send_action_stub called with {action}!")


class ExampleObservationUpdater(ObservationCallback):
  """Example State-Update callback."""

  def __call__(self) -> Mapping[str, np.ndarray]:
    return {"stub_observation": observation_update_stub()}

  def observation_spec(self) -> Mapping[str, specs.Array]:
    return {
        "stub_observation":
            specs.Array((4,), dtype=np.float64, name="stub_observation")
    }


class ExampleActionSender(ActionCallback):
  """Example SendAction callback."""

  def __call__(self, action: np.ndarray) -> None:
    send_action_stub(action)

  def action_spec(self) -> specs.BoundedArray:
    return specs.BoundedArray((2,),
                              dtype=np.float64,
                              minimum=-np.ones(2),
                              maximum=np.ones(2),
                              name="stub action")


## Define a Subtask

A subtask's job is to generate observations for the agent and receive its actions.  Whereas the  direct-dispatch workflow achieved this by passing the I/O callbacks directly to the subtask, in the environment-dispatch model the environment will own these callbacks.  Instead, the SubTask operates as an intermediate between the environment and the agent, allowing the agent's *view* of the environment to be customized.  In a typical use-case this would involve defining various timestep-transformations, and perhaps slicing a subset of the action space.  However, in this example we simply forward the observation and action specs of the environment.

In [None]:
class ExampleSubTask(subtask.SubTask):
  """A No-op subtask that allows us to attach a logging observer."""

  def __init__(self, observation_spec: Mapping[str, specs.Array],
               action_spec: specs.BoundedArray,
               max_steps: int):
    super().__init__()
    self._observation_spec = observation_spec
    self._action_spec = action_spec
    self._max_steps = max_steps
    self._step_idx = 0.

  def observation_spec(self) -> Mapping[str, specs.Array]:
    """Defines the observation seen by agents for trained on this subtask."""
    return self._observation_spec

  def reward_spec(self) -> specs.Array:
    return specs.Array(shape=(), dtype=np.float64, name="reward")

  def discount_spec(self) -> specs.Array:
    return specs.BoundedArray(
        shape=(), dtype=np.float64, minimum=0., maximum=1., name="discount")

  def arg_spec(self) -> Optional[specs.Array]:
    """Defines the arg to be passed by the parent task during each step."""
    return  # This example doesn't use parameterized-options.

  def action_spec(self) -> specs.BoundedArray:
    """Defines the action spec seen by agents that run on this subtask."""
    return self._action_spec

  def agent_to_parent_action(self, agent_action: np.ndarray) -> np.ndarray:
    """Convert an action from the agent to the parent task."""
    return agent_action

  def parent_to_agent_timestep(self, parent_timestep: dm_env.TimeStep,
                               own_arg_key: str) -> dm_env.TimeStep:
    if parent_timestep.first():
      self._step_idx = 0.
    self._step_idx += 1
    return parent_timestep

  def pterm(self, parent_timestep: dm_env.TimeStep,
            own_arg_key: str) -> float:
    if self._step_idx >= self._max_steps:
      print(f"Terminating subtask because max_steps reached {self._step_idx}.")
      return 1.
    return 0.

## Define a Policy

The policy and scripted-behavior are no different in this workflow as in the direct-dispatch workflow.

In [None]:
class ExamplePolicy(core.Policy):
  """Stub policy for running learning machinery."""

  def __init__(self, action_spec: specs.BoundedArray, name: str):
    super().__init__(name)
    self._action_spec = action_spec

  def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
    return np.random.rand(2).astype(self._action_spec.dtype)  # Bounds...



In [None]:
class ExampleScriptedOption(core.Option):
  """Stub option for running scripted controller."""

  def __init__(self, action_spec: specs.BoundedArray,
               name: str,
               max_steps: int):
    super().__init__(name)
    self._action_spec = action_spec
    self._max_steps = max_steps
    self._step_idx = 0

  def step(self, timestep: dm_env.TimeStep) -> np.ndarray:
    if timestep.first():
      self._step_idx = 0
    self._step_idx += 1
    return np.random.rand(2).astype(self._action_spec.dtype)  # Run controller

  def pterm(self, timestep: dm_env.TimeStep) -> float:
    del timestep
    if self._step_idx >= self._max_steps:
      print(f"Terminating option because max_steps reached {self._step_idx}.")
      return 1.
    return 0.

  def result(self, unused_timestep: dm_env.TimeStep) -> core.OptionResult:
    return core.OptionResult(termination_reason=core.TerminationType.SUCCESS)


## Define Environment

Unlike the direct-dispatch workflow, the environment actually has an important role here. We pass the observation and action callbacks to a `ProxyEnvironment`, which allows it to provide an observation and action spec for our agent.  

`ProxyEnvironment` is nearly a complete RL environment, except that it's missing rewards, termination, and a reset behavior.

In [None]:
class ProxyEnvironment(dm_env.Environment):
  """An environment that receives observation from a callback."""

  def __init__(self,
               observation_cb: ObservationCallback,
               action_cb: Optional[ActionCallback] = None):
    """Initializes ProxyEnvironment.

    Args:
      observation_cb: An object for retrieving observation from somewhere.
      action_cb: An object for dispatching agent actions. If `None` actions are
        discarded
    """
    self._observation_cb = observation_cb
    self._action_cb = action_cb

  def reset(self) -> dm_env.TimeStep:
    """Returns the first `TimeStep` of a new episode."""
    return dm_env.restart(observation=self._observation_cb())

  def step(self, action: np.ndarray) -> dm_env.TimeStep:
    """Updates the environment according to the action."""
    if self._action_cb:
      self._action_cb(action)
    return dm_env.transition(reward=0.,
                             discount=1.,
                             observation=self._observation_cb())

  def observation_spec(self) -> Mapping[str, specs.Array]:
    """Returns the observation spec."""
    return self._observation_cb.observation_spec()

  def action_spec(self) -> specs.BoundedArray:
    """Returns the action spec."""
    if self._action_cb:
      return self._action_cb.action_spec()
    return specs.BoundedArray((),
                              minimum=[],
                              maximum=[],
                              dtype=np.float64,
                              name="dummy_action")


## Main Loop

We can finally build our graph and run it as follows:

In [None]:
# Stubs for pulling observation and sending action to some external system.
observation_cb = ExampleObservationUpdater()
action_cb = ExampleActionSender()

# Create an environment that forwards the observation and action calls.
env = ProxyEnvironment(observation_cb, action_cb)

# Stub policy that runs the desired agent.
policy = ExamplePolicy(action_cb.action_spec(), "agent")

# Wrap policy into an agent that logs to the terminal.
task = ExampleSubTask(env.observation_spec(), action_cb.action_spec(), 10)
logger = print_logger.PrintLogger()
aggregator = subtask_logger.EpisodeReturnAggregator()
logging_observer = subtask_logger.SubTaskLogger(logger, aggregator)
agent = subtask.SubTaskOption(task, policy, [logging_observer])

reset_op = ExampleScriptedOption(action_cb.action_spec(), "reset", 3)
main_loop = loop_ops.Repeat(5, sequence.Sequence([reset_op, agent]))

# Run the episode.
timestep = env.reset()
while True:
  action = main_loop.step(timestep)
  timestep = env.step(action)

  # Terminate if the environment or main_loop requests it.
  if timestep.last() or (main_loop.pterm(timestep) > np.random.rand()):
    if not timestep.last():
      termination_timestep = timestep._replace(step_type=dm_env.StepType.LAST)
      main_loop.step(termination_timestep)
    break