# Reactive Robot Programming

In this notebook we will develop a library for reactive programming of the Aldebaran Nao robot. We will first let the robot look around for red balls, and make the robot point towards red balls which are within target range.  

First we import the contents of Reactive Extensions (rx), naoqi and the Reactive Extensions for Qi libraries.

In [None]:
from rx import *
from naoqi import *
import datetime
from rx.subjects import Subject
from threading import Thread
import sys, time

Stream = Observable

We will be connecting to a real Nao, for which we need to configure the IP address. Change the line below to match the IP address of your Nao robot.

In [None]:
nao_address = "tcp://192.168.11.162:9559"

We use the Qi framework to create a session for this robot.

In [None]:
session = qi.Session(nao_address)

To debug our code, we can print the last value. An easy way to print a value in Rx is to subscribe a print function to a stream. In Python 2 however we cannot directly use print as a function. We can however import a print function from the future library.

In [None]:
from __future__ import print_function

## Bridging naoqi and Reactive Extensions

The Qi framework provides us with some features for responding to memory events. ReactiveX has its own model for reactive programming. To bridge Qi and Rx, we create a Subject.

Because the subject has to continue monitoring the memory after it has been created, we run it in its own thread.

In [None]:
class QiMemorySubject(Thread):
    def __init__(self, field, session):
        Thread.__init__(self)
        self.session = session
        self.field = field
        self.stopped = False
        self.subject = Subject()
        
    def run(self):
        mem = self.session.service("ALMemory")
        self.subscriber = mem.subscriber(self.field)
        self.signal = self.subscriber.signal.connect(self.subject.on_next)
        while not self.stopped:
            time.sleep(1)
        
    def stop(self):
        self.stopped = True
        self.subscriber.signal.disconnect(self.signal)
        print("Stopping")

## Listening for a red ball

We can now easily create subjects listening to various sensors of the robot, such as red balls and blobs.

In [None]:
class RedBallSubject(QiMemorySubject):
    def __init__(self, session):
        QiMemorySubject.__init__(self, "redBallDetected", session)

Then start a red ball subject which uses the camera of Nao to look for red balls. Because we designed the red ball subject class to extend the Qi memory subject, and Qi memory subject runs in its own thread, we have to start the thread to start listening.

In [None]:
redBallThread = RedBallSubject(session)
redBallThread.daemon = True
redBallThread.start()  # start the thread

Nao should now be listening for red balls. To check if it is actually working, we can sample the subject. Because we do not want to spam our notebook, we set up a special stream which only returns the first red ball seen.

In [None]:
firstRedBall = redBallThread.subject.first()

Let's debug our first red ball stream using print.

In [None]:
firstRedBallSub = firstRedBall.subscribe(lambda red_ball: print("Red ball spotted: " + str(red_ball)))

In robotics, printing values is used mainly for quick testing and debugging. A production program running on an autonomous robot should contain almost no calls to print, as there is no-one controlling the robot who would be interested in reading those prints. 

Before we go on, let's dispose of our temporary print function. We can do this by disposing the subscription we made.

In [None]:
firstRedBallSub.dispose()

## Processing the data

RedBallDetection gives us a list containing the following values:
* Timestamp (seconds, microseconds)
* Ball info (center x, center y, size x, size y, all in radians)
* Camera pose in torso frame
* Camera pose in robot frame (average of both feet)
* Camera id

For convenience we create a class called Ball to hold the ball information. We overwrite the string magic method for pretty printing.

In [None]:
class Ball():
    def __init__(self, x, y, width, height):
        self.x = x
        self.y = y
        self.width = width
        self.height = height
    
    def __str__(self):
        template = "Location of ball: (center x: %s rad, center y: %s rad, size x: %s rad, size y: %s rad)"
        return template % (self.x, self.y, self.width, self.height)

Using a map operator we can get just the information we need, e.g. the ball info. We use our ball class for converting the list to an object. 

In [None]:
ball = redBallThread.subject.map(lambda ballInfo: Ball(*(ballInfo[1])))

We debug the ball stream using a print subscription.

In [None]:
firstBall = ball.first()

firstBallSubscription = firstBall.subscribe(print)

To avoid too much spam, let's dispose of our print function.

In [None]:
firstBallSubscription.dispose()

The ball detector produces a lot of events. For our use case we can instead sample the stream to reduce the amount of events which we need to handle.

In [None]:
ballSamples = ball.sample(500)

To find out the distance between the robot and the ball, we have to do some simple trigonometry. We know that the formula for an angle $\theta$ given the opposite side $A$ and adjacent side $B$ is given by the tangent.

$tan(\theta)=\dfrac{A}{B}$

In our case, we are given the angle $\theta$. Given a $size$ which is half the height or width of the ball, i.e. the radius, we can calculate the $distance$ to the ball by using the tangent function:

$distance = \dfrac{\dfrac{size}{2}}{tan(\theta)} = \dfrac{size}{2*tan(\theta)}$

In Python this is implemented as follows:

In [None]:
from math import tan 

def distance_to_ball(ball_size, angle):
    return ball_size / (2 * tan(angle))

Let's test this for a ball with a size of 4 cm, with a reported angle of 45 degrees. If we fill these values into our formula we get:

$distance = \dfrac{4 cm}{2*tan(45^{\circ})}=2cm$

Note that the Python tangent function by default takes as parameter the angle in radians and not in degrees. Hence instead of using 45 degrees, we use $\pi/4 rad$. 

In [None]:
from math import pi, radians

%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

plt.plot(range(1,46), [distance_to_ball(5, x) for x in [radians(l) for l in range(1,46)]])

plt.xlabel('Angle (degree)')
plt.ylabel('Distance (cm)')
plt.title('Distance according to angle')
plt.grid(True)
plt.show()

To improve the accuracy of the distance measurement we will use the average of the observed width and height. We write a function called dist2 which performs this for us.

In [None]:
def dist2(width, height): 
    return (1/4.) * (size/tan(width) + size/tan(height))

Then we write a function ball_distance which uses dist2 for calculating the distance to the ball.

In [None]:
def ball_distance(ball): 
    return dist2(ball.width, ball.height)

Using our new distance function, let's create a stream which gives us the distance to the ball.

In [None]:
distance = ballSamples.map(ball_distance)

We can test our stream using print:

In [None]:
firstRedBallDistanceSub = distance.first().subscribe(lambda distance: print("Distance: %s cm" % (distance)))

Again, let's dispose of our print function.

In [None]:
firstRedBallDistanceSub.dispose()

## Controlling LEDs based on ball distance
We can now calculate the distance to the ball. We can make the robot react differently depending on this distance. For example, let's change the light color of Nao's eyes based on the distance of the ball.

We use the ALLeds module for controlling the LEDs of the robot.

In [None]:
leds = session.service("ALLeds")
leds.off("FaceLeds")

First we write a function to determine the brightness of the LEDs based on distance:

In [None]:
def brightness(distance):
    if distance > 20:
        return 0
    elif distance > 5:
        return (-(distance-5)/15.) + 1
    else:
        return 1

Then we write a function which sets the LED brightness based on the input.

In [None]:
def led_brightness(intensity):
    leds.setIntensity("RightFaceLedsRed", intensity)
    leds.setIntensity("LeftFaceLedsRed", intensity)

We wrote the above as two separate functions so we can test the intensity function separately from the robot. Let's plot our intensity function.

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

plt.plot(range(0,31), [get_intensity(x) for x in range(0,31)])

plt.xlabel('distance (cm)')
plt.ylabel('intensity')
plt.title('LED intensity over distance')
plt.grid(True)
plt.show()

We can now subscribe our brightness function.

In [None]:
ballSamplesSub = ballSamples.subscribe(led_brightness)

We can dispose of our subscription if we don't want our brightness function to remain active.

In [None]:
ballSamplesSub.dispose()

## Tracking the ball
Aldebaran offers a red ball tracker. We will make our own using reactive programming. This tracker gives us more flexibility than the build in tracker, for example we can make the robot transition from looking at the ball to pointing at it. We will still use some functionality of the original tracker.

In [None]:
tracker = session.service("ALTracker")
tracker.registerTarget("RedBall", 0.04)
tracker.track("RedBall")

We initialize the robot in the crouching position and get some sensor readings which we later need to reset the arm after tracking.

In [None]:
posture = session.service("ALRobotPosture")
posture.goToPosture("Crouch", 1.0)

motion = session.service("ALMotion")
sensor_angles = motion.getAngles(["LArm"], True)

We set up a stream for detecting whether balls are close or far using a map operation.

In [None]:
def point_or_track(close):
    if close:
        target = tracker.getTargetPosition(0)
        tracker.pointAt("LArm", target, 0, 0.5)
    else:
        tracker.setMode("Head")
        motion.setAngles(["LArm"], sensor_angles, 0.2)
        
pointRangeBalls = redBallDistanceSamples.map(lambda distance: distance < 20)
pointRangeBallsSub = pointRangeBalls.subscribe(point_or_track)

After testing our behavior we can dispose of the subscription.

In [None]:
pointRangeBallsSub.dispose()

We want to move towards grabbing the ball if it is within reach. Let's try a naive implementation which closes the hand if the ball is very close, and opens it otherwise.

In [None]:
def open_or_close(graspable):
    if graspable:
        motion.closeHand('LHand')
    else:
        motion.openHand('LHand')

graspRangeBalls = redBallDistanceSamples.map(lambda distance: distance < 15)
graspRangeBallsSub = graspRangeBalls.subscribe(open_or_close)

Again, we should dispose after testing.

In [None]:
graspRangeBallsSub.dispose()

### Determining which effector to use

We want to track objects to the left with the left effector and objects to the right with the right effector. To determine which effector to use, we take the sum of the angle of the head and the angle in the camera frame. We first have to define the means to get the head yaw angle. For this we define a subject which pulls the memory at a predefined interval.

In [None]:
class QiPullingMemorySubject(Thread):
    def __init__(self, field, session, pull_rate):
        Thread.__init__(self)
        self.session = session
        self.field = field
        self.pull_rate = pull_rate / 1000.0
        self.stopped = False
        self.subject = Subject()
        
    def run(self):
        mem = self.session.service("ALMemory")
        #latched_data = 0
        while not self.stopped:
            data = mem.getData(self.field)
            
            #if data < latched_data - 0.02 or data > latched_data + 0.02:
            self.subject.on_next(data)
            #    latched_data = data
            time.sleep(self.pull_rate)
        
    def stop(self):
        self.stopped = True
        print("Stopping")

We then define a specific subject for pulling data from the head yaw position sensor.

In [None]:
class HeadYawSubject(QiPullingMemorySubject):
    def __init__(self, session):
        QiPullingMemorySubject.__init__(self, "Device/SubDeviceList/HeadYaw/Position/Sensor/Value", session, 1000)

And we start the thread for listening to the head yaw sensor.

In [None]:
headYawThread = HeadYawSubject(session)
headYawThread.daemon = True
headYawThread.start()
head_yaw = headYawThread.subject

The logic for defining tracking is as follows:
* Objects further than 50 cm away are only tracked with the head
* Objects within 20 cm and 50 cm are tracked with the head and body
* Objects less than 20 cm away are tracked by pointing at them with the end effector

We first define the behavior for head tracking.

In [None]:
head = distance.filter(lambda d: d >= 50)
head_subscription = head.subscribe(head_tracker)

Then we define the behavior for head and body tracking.

In [None]:
head_body = distance.filter(lambda d: d >= 20 and d < 50)
head_body_subscription = head_body.subscribe(head_body_tracker)

The behavior for end effector is harder to define. We first want to determine which effector is closest to the ball. To realize this we define a function for both storing the angle of the ball and calculating the distance to the ball.

In [None]:
def angle_dist_helper(ball):
    return (ball.x, dist2(ball.width, ball.height))

We use the helper function on the stream of ball samples.

In [None]:
cam_angle_dist = ballSamples.map(angle_dist_helper)

We want to filter out balls which are over 20 cm away.

In [None]:
cam_angle_dist_filtered = cam_angle_dist.filter(lambda pair: pair[1] < 20)

After filtering we can discard the distance information.

In [None]:
cam_angle = cam_angle_dist_filtered.map(lambda pair: pair[0])

Then we define a function which combines data from the ball stream and the head yaw stream. We also want to filter out events which happened to long ago. To do this we first apply a timestamp to events in bith the camera angle and head yaw streams.

In [None]:
cam_angle_timestamped = cam_angle.timestamp()
head_yaw_timestamped = head_yaw.timestamp()

Then we define a function which will be used for combining both streams.

In [None]:
combinator = lambda headYaw, ballAngle: (headYaw.value+ballAngle.value, 
                                         headYaw.timestamp-ballAngle.timestamp)

We combine both streams and apply the combinator function.

In [None]:
robot_angle_timestamped = Stream.combine_latest(head_yaw_timestamped, 
                                                cam_angle_timestamped, 
                                                combinator)

Finally we filter out events which happened too far from each other. We first define a recent function.

In [None]:
recent = lambda pair: pair[1].total_seconds() < 1

Then we apply this function to the combined stream.

In [None]:
robot_angle = robot_angle_timestamped.filter(recent)

Finally we determine which effector to use and subscribe our tracker.

In [None]:
arm = robot_angle.map(lambda a: "LArm" if a[0] > 0 else "RArm")
arm_subscription = arm.subscribe(arm_tracker)