# Introduction to `py_trees` library

Behaviour trees are a decision making engine often used in the gaming and robotics industries. [py_trees](https://github.com/splintered-reality/py_trees) is a python implementation of a Behavior Trees engine that you can use in your projects.

## Documentation

All the py_trees documnetation can be found [here](https://py-trees.readthedocs.io/en/devel/index.html).

As a summary, `py_trees` combine the following elements:

* **Behaviors**: Behaviours are the leaves of the behaviour tree. This is where actions and conditions are done. Some generic behaviors are already implemented in the `py_trees` library. However, most of the particular behaviors must be implemented by inheriting from the `Behavior` class.
* **Composites**: Control flow nodes. Three basic composite nodes are used: *Sequence*, *Selector*, and *Parallel*.
* **Decorators**: Nodes with a single childres that modify their underlying child behaviour. Several decorators are available in `py_trees`. Some of the most popular are: *Count*, *Inverter*, *OneShot*, *Repeat*, *Timeout*, ...

## Example

Next, a super basic example is provided.
We want to implement a behavior tree that checks if we are busy or not. If we are not busy then we can take a beer for a limited time. The whole process can be repeated as many times as necessary.

This task is implemented following this behavior tree.

![](./life_1.png)

It shows a behavior tree with one condition (i.e., *Busy*) and one action (i.e., *Have a Beer*), both implementd as custom behaviors, a couple of decorators (i.e., *Inverter* and *Timeout*), and one composite (i.e., *Sequence*).

### Define the behaviors

The two custom behaviors are implemented as follows:

* The *Busy* condition may return only `SUCCESS` or `FAILURE`. In this naive implementation this is selected randomly.
* The *Have a Beer* action could return `SUCCESS`, `FAILURE`, or `RUNNING` but for simplicity it will always return `RUNNING`.

Because both Behaviors inherit from the `py_trees.behaviour.Behaviour` class, several methods must be implemented. Check the documentation to understand why is each function used for.

In [13]:
import py_trees
import random

# Example of a behavior
class Busy(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(Busy, self).__init__(name)

    def setup(self):
        self.logger.debug("  %s [Busy::setup()]" % self.name)
        
    def initialise(self):
        self.logger.debug("  %s [Busy::initialise()]" % self.name)
        
    def update(self):
        self.logger.debug("  %s [Busy::update()]" % self.name)
        ready_to_make_a_decision = random.choice([True, False])
        decision = random.choice([True, False])
        if not ready_to_make_a_decision:
            self.logger.debug("  %s [Busy::update() RUNNING]" % self.name)
            return py_trees.common.Status.RUNNING
        elif decision:
            self.feedback_message = "We are not bar!"
            self.logger.debug("  %s [Busy::update() SUCCESS]" % self.name)
            return py_trees.common.Status.SUCCESS
        else:
            self.feedback_message = "Uh oh"
            self.logger.debug("  %s [Busy::update() FAILURE]" % self.name)
            return py_trees.common.Status.FAILURE

    def terminate(self, new_status):
        self.logger.debug("  %s [Busy::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))


class HaveABeer(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(HaveABeer, self).__init__(name)

    def setup(self):
        self.logger.debug("  %s [HaveABeer::setup()]" % self.name)
        
    def initialise(self):
        self.logger.debug("  %s [HaveABeer::initialise()]" % self.name)
        
    def update(self):
        self.logger.debug("  %s [HaveABeer::update()]" % self.name)
        self.logger.debug("  %s [HaveABeer::update() RUNNING]" % self.name)
        return py_trees.common.Status.RUNNING
        
    def terminate(self, new_status):
        self.logger.debug("  %s [HaveABeer::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))


### Create the behavior tree

Now, we are going to connect the previously generated behaviors using decorators and composite structures to build the ddesired tree. Check the available documentation for more information.

In [14]:
import py_trees.decorators
import py_trees.display

def create_tree():
    busy = Busy(name="Busy?")
    
    inverter = py_trees.decorators.Inverter(
        name="Inverter",
        child=busy
    )
    
    have_beer = HaveABeer(name="Have a Beer!")
    
    timeout = py_trees.decorators.Timeout(
        name="Timeout",
        duration=3,
        child=have_beer
    )

    root = py_trees.composites.Sequence(name="Life", memory=True)
    root.add_children([inverter, timeout])
    
    return root

root = create_tree()
py_trees.display.render_dot_tree(root) # generates a figure for the 'root' tree.

Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg


{'dot': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot',
 'png': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png',
 'svg': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg'}

## Run the behavior Tree

To run the behavior, first we need to call the `setup()` method for all the behaviors in the tree. Then we can call the `tick_once()` method to play it.

In [3]:
import time
py_trees.logging.level = py_trees.logging.Level.DEBUG

def run(it=10):
    root = create_tree()

    try:
        print("Call setup for all tree children")
        root.setup_with_descendants() 
        print("Setup done!\n\n")
        py_trees.display.ascii_tree(root)
        
        for _ in range(it):
            root.tick_once()
            time.sleep(1)
    except KeyboardInterrupt:
        pass


In [5]:
# Run the bBehavior Tree for 10 steps
run()

Call setup for all tree children
[DEBUG] Busy?                :   Busy? [Busy::setup()]
[DEBUG] Have a Beer!         :   Have a Beer! [HaveABeer::setup()]
Setup done!


[DEBUG] Life                 : Sequence.tick()
[DEBUG] Inverter             : Inverter.tick()
[DEBUG] Busy?                : Busy.tick()
[DEBUG] Busy?                :   Busy? [Busy::initialise()]
[DEBUG] Busy?                :   Busy? [Busy::update()]
[DEBUG] Busy?                :   Busy? [Busy::update() RUNNING]
[DEBUG] Life                 : Sequence.tick()
[DEBUG] Inverter             : Inverter.tick()
[DEBUG] Busy?                : Busy.tick()
[DEBUG] Busy?                :   Busy? [Busy::update()]
[DEBUG] Busy?                :   Busy? [Busy::update() SUCCESS]
[DEBUG] Busy?                : Busy.stop(Status.RUNNING->Status.SUCCESS)
[DEBUG] Busy?                :   Busy? [Busy::terminate().terminate()][Status.RUNNING->Status.SUCCESS]
[DEBUG] Inverter             : Inverter.stop(Status.FAILURE)
[DEBUG] Life        

## Introduction to the Blackboard

Blackboards are a common mechanism for sharing data between behaviours in the tree.
In `py_trees`, nodes have to register which vars they want to `READ` or `WRITE`.

```python
self.blackboard = self.attach_blackboard_client(name=self.name)
self.blackboard.register_key("var_name", access=py_trees.common.Access.WRITE)
self.blackboard.register_key("var_name", access=py_trees.common.Access.READ)
```

Lets modify the previous behavior tree to count how many beers we have had, and just have a beer if we have had less than 3 beers.

In [9]:

# Modify the HaveABeer behavior to store in tha black Board how many beers it have had.
class HaveABeer(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(HaveABeer, self).__init__(name)
        self.blackboard = self.attach_blackboard_client(name=self.name)
        self.blackboard.register_key("n_beers", access=py_trees.common.Access.WRITE)
        self.blackboard.register_key("n_beers", access=py_trees.common.Access.READ)
        self.beers = None
        
    def setup(self):
        self.logger.debug("  %s [HaveABeer::setup()]" % self.name)
        self.blackboard.n_beers = 0
        
    def initialise(self):
        self.logger.debug("  %s [HaveABeer::initialise()]" % self.name)
        self.beers = self.blackboard.n_beers
        print("Beers already had ", self.beers)

    def update(self):
        self.logger.debug("  %s [HaveABeer::update()]" % self.name)
        return py_trees.common.Status.RUNNING
        
    def terminate(self, new_status):
        self.logger.debug("  %s [HaveABeer::terminate().terminate()][%s->%s]" % (self.name, self.status, new_status))
        self.blackboard.n_beers = self.beers + 1

Now, lets modify the tree structure to check how many beers it have had, according to the value in the Black Board.

In [15]:
import operator 

def create_tree():

    busy = Busy(name="Busy?")
    inverter = py_trees.decorators.Inverter(
        name="Inverter",
        child=busy
    )

    # Special py_trees behavior
    n_beers_lt_2 = py_trees.behaviours.CheckBlackboardVariableValue(
        name="n_beers_lt_2",
        check=py_trees.common.ComparisonExpression(
            variable="n_beers",
            value=2,
            operator=operator.lt
        )
    )

    have_beer = HaveABeer(name="Have a Beer!")
    timeout = py_trees.decorators.Timeout(
        name="Timeout",
        duration=3,
        child=have_beer
    )

    check_n_beers = py_trees.composites.Sequence(name="check_n_beers", memory=True)
    check_n_beers.add_children([n_beers_lt_2, timeout])
    
    root = py_trees.composites.Sequence(name="Life", memory=True)    
    root.add_children([inverter, check_n_beers])
    
    return root


root = create_tree()
py_trees.display.render_dot_tree(root)

Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg


{'dot': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot',
 'png': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png',
 'svg': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg'}

In [2]:
import py_trees
import rospy
import tf
import numpy as np
import operator 

from std_srvs.srv import Trigger, TriggerRequest
from nav_msgs.msg import Odometry
from geometry_msgs.msg import PoseStamped


# Behavior for calling `check_object` task and if True, store object name to Blackboard
class CheckObject(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(CheckObject, self).__init__(name)
        self.blackboard = self.attach_blackboard_client(name=self.name)
        self.blackboard.register_key(
            "object_name", access=py_trees.common.Access.WRITE)

    def setup(self):
        self.logger.debug("  %s [CheckObject::setup()]" % self.name)
        rospy.wait_for_service('/manage_objects/check_object')
        try:
            self.server = rospy.ServiceProxy(
                '/manage_objects/check_object', Trigger)
            self.logger.debug(
                "  %s [CheckObject::setup() Server connected!]" % self.name)
        except rospy.ServiceException as e:
            self.logger.debug("  %s [CheckObject::setup() ERROR!]" % self.name)

    def initialise(self):
        self.logger.debug("  %s [CheckObject::initialise()]" % self.name)

    def update(self):
        try:
            self.logger.debug(
                "  {}: call service /manage_objects/check_object".format(self.name))
            resp = self.server(TriggerRequest())
            if resp.success:
                self.blackboard.object_name = resp.message
                print("set to blackboard: ", resp.message)
                return py_trees.common.Status.SUCCESS
            else:
                return py_trees.common.Status.FAILURE
        except:
            self.logger.debug(
                "  {}: Error calling service /manage_objects/check_object".format(self.name))
            return py_trees.common.Status.FAILURE

    def terminate(self, new_status):
        self.logger.debug("  %s [CheckObject::terminate().terminate()][%s->%s]" %
                          (self.name, self.status, new_status))

# Behavior for calling `get_object`
class GetObject(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(GetObject, self).__init__(name)

    def setup(self):
        self.logger.debug("  %s [GetObject::setup()]" % self.name)
        rospy.wait_for_service('/manage_objects/get_object')
        try:
            self.server = rospy.ServiceProxy(
                '/manage_objects/get_object', Trigger)
            self.logger.debug(
                "  %s [GetObject::setup() Server connected!]" % self.name)
        except rospy.ServiceException as e:
            self.logger.debug("  %s [GetObject::setup() ERROR!]" % self.name)

    def initialise(self):
        self.logger.debug("  %s [GetObject::initialise()]" % self.name)

    def update(self):
        try:
            self.logger.debug(
                "  {}: call service /manage_objects/get_object".format(self.name))
            resp = self.server(TriggerRequest())
            if resp.success:
                return py_trees.common.Status.SUCCESS
            else:
                return py_trees.common.Status.FAILURE
        except:
            self.logger.debug(
                "  {}: Error calling service /manage_objects/get_object".format(self.name))
            return py_trees.common.Status.FAILURE

    def terminate(self, new_status):
        self.logger.debug("  %s [GetObject::terminate().terminate()][%s->%s]" %
                          (self.name, self.status, new_status))


# Behavior for calling `let_object`
class LetObject(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(LetObject, self).__init__(name)

    def setup(self):
        self.logger.debug("  %s [LetObject::setup()]" % self.name)
        rospy.wait_for_service('/manage_objects/let_object')
        try:
            self.server = rospy.ServiceProxy(
                '/manage_objects/let_object', Trigger)
            self.logger.debug(
                "  %s [LetObject::setup() Server connected!]" % self.name)
        except rospy.ServiceException as e:
            self.logger.debug("  %s [LetObject::setup() ERROR!]" % self.name)

    def initialise(self):
        self.logger.debug("  %s [LetObject::initialise()]" % self.name)

    def update(self):
        try:
            self.logger.debug(
                "  {}: call service /manage_objects/let_object".format(self.name))
            resp = self.server(TriggerRequest())
            if resp.success:
                return py_trees.common.Status.SUCCESS
            else:
                return py_trees.common.Status.FAILURE
        except:
            self.logger.debug(
                "  {}: Error calling service /manage_objects/let_object".format(self.name))
            return py_trees.common.Status.FAILURE

    def terminate(self, new_status):
        self.logger.debug("  %s [LetObject::terminate().terminate()][%s->%s]" %
                          (self.name, self.status, new_status))

# TODO: Create any other required behavior like those to move the robot to a point, 
#       add or check elements in the blackboard, ...

# Behavior for calling `let_object`
class MoveRobot(py_trees.behaviour.Behaviour):
    def __init__(self, name):
        super(MoveRobot, self).__init__(name)

    def setup(self):
        self.logger.debug("  %s [MoveRobot::setup()]" % self.name)

        # PUBLISHERS
        # Publisher for sending goal position to the planning node
        self.goal_publisher = rospy.Publisher('/move_base_simple/goal', PoseStamped, queue_size=10)

        # SUBSCRIBERS
        #subscriber to robot pose from odometry  
        self.odom_sub = rospy.Subscriber('/odom', Odometry, self.get_odom) 

        
        try:
            self.server = rospy.ServiceProxy(
                '/manage_objects/let_object', Trigger)
            self.logger.debug(
                "  %s [LetObject::setup() Server connected!]" % self.name)
        except rospy.ServiceException as e:
            self.logger.debug("  %s [LetObject::setup() ERROR!]" % self.name)

    def initialise(self):
        self.logger.debug("  %s [MoveRobot::initialise()]" % self.name)

        # Define goal position
        self.goal = PoseStamped()
        self.goal.header.frame_id = "map"
        self.goal.pose.position.x = 1.5          # Example x coordinate
        self.goal.pose.position.y = 1.5          # Example y coordinate
        # goal.pose.orientation.w = 1.0       # Example orientation (no rotation)

        # Publish goal position
        self.goal_publisher.publish(self.goal)
        rospy.loginfo("Goal position published")

    def update(self):

        if np.linalg.norm(self.current_pose[0:2] - np.array([self.goal.pose.position.x, self.goal.pose.position.y])) < 0.1:
            self.logger.debug("  %s [MoveRobot::Update() SUCCESS]" % self.name)
            return py_trees.common.Status.SUCCESS
        else:
            self.logger.debug("  %s [MoveRobot::Update() RUNNING]" % self.name)
            return py_trees.common.Status.RUNNING


    def terminate(self, new_status):
        self.logger.debug("  %s [LetObject::terminate().terminate()][%s->%s]" %
                          (self.name, self.status, new_status))
        
    # Odometry callback: Gets current robot pose and stores it into self.current_pose
    def get_odom(self, odom):
        _, _, yaw = tf.transformations.euler_from_quaternion([odom.pose.pose.orientation.x, 
                                                            odom.pose.pose.orientation.y,
                                                            odom.pose.pose.orientation.z,
                                                            odom.pose.pose.orientation.w])
        self.current_pose = np.array([odom.pose.pose.position.x, odom.pose.pose.position.y, yaw])



In [20]:
import operator 

def create_tree():
    # Create Behaviors
    move_to_point = MoveRobot(name="move_to_point")

    check_object = CheckObject(name="check_object")

    get_object = GetObject(name="get_object")

    move_to_destination = MoveRobot(name="move_to_destination")

    let_object = LetObject(name="let_object")

    # inverter = py_trees.decorators.Inverter(
    #     name="Inverter",
    #     child=object_found
    # )

    # Special py_trees behavior
    n_points_gt_5 = py_trees.behaviours.CheckBlackboardVariableValue(
        name="n_point_gt_5",
        check=py_trees.common.ComparisonExpression(
            variable="n_points",
            value=5,
            operator=operator.gt
        )
    )


    n_collected_objects_gt_2 = py_trees.behaviours.CheckBlackboardVariableValue(
        name="n_collected_objects_gt_2",
        check=py_trees.common.ComparisonExpression(
            variable="n_collected_objects_gt_2",
            value=2,
            operator=operator.gt
        )
    )

    check_end = py_trees.composites.Sequence(name="check_end", memory=True)
    check_end.add_children([n_collected_objects_gt_2, n_points_gt_5])

    root = py_trees.composites.Sequence(name="Life", memory=True)    
    root.add_children([check_end, move_to_point, check_object, get_object, move_to_destination, let_object])
    
    return root


root = create_tree()
py_trees.display.render_dot_tree(root)

Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png
Writing /home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg


{'dot': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.dot',
 'png': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.png',
 'svg': '/home/leopham/catkin_ws/src/pick_up_objects_task/notebooks/life.svg'}

In [18]:
# Run the Behavior Tree for 30 steps
run(it=30)

Call setup for all tree children
[DEBUG] Busy?                :   Busy? [Busy::setup()]
[DEBUG] Have a Beer!         :   Have a Beer! [HaveABeer::setup()]
Setup done!


[DEBUG] Life                 : Sequence.tick()
[DEBUG] Inverter             : Inverter.tick()
[DEBUG] Busy?                : Busy.tick()
[DEBUG] Busy?                :   Busy? [Busy::initialise()]
[DEBUG] Busy?                :   Busy? [Busy::update()]
[DEBUG] Busy?                :   Busy? [Busy::update() SUCCESS]
[DEBUG] Busy?                : Busy.stop(Status.INVALID->Status.SUCCESS)
[DEBUG] Busy?                :   Busy? [Busy::terminate().terminate()][Status.INVALID->Status.SUCCESS]
[DEBUG] Inverter             : Inverter.stop(Status.FAILURE)
[DEBUG] Life                 : Sequence.tick()
[DEBUG] Inverter             : Inverter.stop(Status.INVALID)
[DEBUG] Busy?                : Busy.stop(Status.SUCCESS->Status.INVALID)
[DEBUG] Busy?                :   Busy? [Busy::terminate().terminate()][Status.SUCCESS->Status.IN

In [48]:
import py_trees
from py_trees.blackboard import Blackboard
import time

class SetKeyBehavior(py_trees.behaviour.Behaviour):
    def __init__(self, name="Set Key Behavior"):
        super(SetKeyBehavior, self).__init__(name)
        self.blackboard = self.attach_blackboard_client(name="Foo Global")
        # Register the key 'my_key' in the attached Blackboard client
        self.blackboard.register_key(key="my_key", access=py_trees.common.Access.READ)
        self.blackboard.register_key(key="my_key", access=py_trees.common.Access.WRITE)

    def setup(self):
        self.logger.debug("  %s [LetObject::setup()]" % self.name)
        self.blackboard.my_key = 0

    # def initialise(self):
    #     self.blackboard.my_key += 1

    def update(self):
        # Set the value of the key 'my_key' in the attached Blackboard client
        # self.blackboard.set('my_key', self.blackboard.get('my_key') + 1)
        # print(self.blackboard)
        return py_trees.common.Status.SUCCESS

class ReadKeyBehavior(py_trees.behaviour.Behaviour):
    def __init__(self, name="Read Key Behavior"):
        super(ReadKeyBehavior, self).__init__(name)
        self.blackboard = self.attach_blackboard_client(name="Foo1 Global")
        # Register the key 'my_key' in the attached Blackboard client
        self.blackboard.register_key(key="my_key", access=py_trees.common.Access.WRITE)

    # def initialise(self):
        

    def update(self):
        # Read the value of the key 'my_key' from the attached Blackboard client
        # print(self.blackboard)
        self.blackboard.my_key += 1
        if self.blackboard.my_key is not None:
            print("Value of 'my_key':", self.blackboard.my_key)
        else:
            print("Key 'my_key' not found in the Blackboard")

        return py_trees.common.Status.SUCCESS

# Create a behavior tree
root = py_trees.composites.Sequence("Root", memory=True)
set_key_behavior = SetKeyBehavior()
read_key_behavior = ReadKeyBehavior()

n_points_lt_2 = py_trees.behaviours.CheckBlackboardVariableValue(
        name="n_points_lt_2",
        check=py_trees.common.ComparisonExpression(
            variable="my_key",
            value=2,
            operator=operator.lt
        )
    )

root.add_children([set_key_behavior, n_points_lt_2, read_key_behavior])

root.setup_with_descendants() 
print("Setup done!\n\n")
# py_trees.display.render_dot_tree(root)
# Execute the behavior tree
# tree = py_trees.trees.BehaviourTree(root)
# tree.setup(timeout=15)
for _ in range(5):
    root.tick_once()
    time.sleep(1)




Setup done!


Value of 'my_key': 1
Value of 'my_key': 2
