# Section VII. ROBOTIC SYSTEMS IN PRACTICE

# Chapter 26. System Integration

Almost nothing brings roboticists more quickly to heated conversation than system infrastructure. System infrastructure is the "language" by which we integrate bits of computer code into coherent robot behaviors, and hence roboticists will hold passionate debates about what middleware, communication paradigms, and logging systems one should use.  These debates resemble those held about programming languages and coding style, and everyone seems to have an opinion. (Seasoned veterans will tell you that these discussions don't really matter as much as system architecture, but the arguments continue nevertheless...)

This chapter will give an overview of the general goals and issues faced during system integration, and will present several infrastructure paradigms without being too prescriptive about which infrastructure to use.  There are a dizzying array of integration frameworks out there, ROS 1, ROS 2, LCM, OROCOS, Google Protocol Buffers, RabbitMQ, Redis, CORBA, OpenJAUS, Websockets, raw TCP/IP or UDP sockets... what do they all mean? How do we use them to build robots?  Which one is the best?  Just like choosing the "best" programming language, every integration frameworks has some limits to its functionality, and strengths and weaknesses, and it is not the point of an education in system integration to pick the best.  In fact, in a career in robotics you will likely face several frameworks and may need to integrate them together.  So, it's more important to understand the concepts and design patterns used in such systems rather than all of the particulars of a given package.  But in any case, examples in this chapter will be provided in ROS as well as simpler infrastructures.

Programming robots require that we address the problem of *real-time* performance.  Real-time programming is a bit foreign compared to standard procedural programming, because you can't step through a program in a debugger, read through print statements (easily), or use a notebook as is so popular in machine learning nowadays.  So, in the [first section](#real-time-systems) in this chapter we'll provide explicit training and examples of how to [program such systems](#real-time-systems).  Next, we turn to the question [how our programs *communicate*](#inter-process-communication) with other programs.  Although we would like to think of communication simply as "data D goes from program A to program B", there are several caveats and implementation details that matter when choosing an infrastructure system.  Finally, we will discuss [development tools](#system-development-tools) that system integration more convenient and systematic.

## Real-time systems

In real-time systems, a program completes repetitive computations, and the value of a program depends on meeting real (wall-clock) time demands.  Nearly every component of the robotics software stack involves real-time programming to some extent, including motor controllers, state estimators, model predictive controllers, and even user interfaces.  The speed at which these calculations varies, with low-level control loops operating at hundreds of Hz, whereas mission planners may need to respond to input every few seconds.  

Such systems can be categorized at a high-level into hard and soft real-time:
- *Hard real-time*: failing to meet deadline has serious consequences
- *Soft real-time*: failing to meet deadline is disadvantageous (and consequences get worse as delays grow)

For most work in robotics we deal with soft real-time systems, unless you are working on very low-level motor controllers and sensor drivers.  That is, completing a computation late is better than never providing the result at all, but being late could have negative consequences such as the robot losing responsiveness or freezing up.  So, our examples will be focused on this case.  If you are indeed working on hard real-time systems, you will want to consult more information about real-time operating systems (RTOS) and compiled languages like C or C++.

### Programming with time

Many microprocessors like Arduinos are uniprocessors, in which there is nothing running on the device except for your code. Hence, your code will be run every cycle in a loop.  This gives you tight control over timing.

All other computers you are likely to run into are multiprocessors, which allow running multiple processes at once, including the terminal, the operating system (OS) services, the OS graphical user interface (GUI), hardware drivers, etc.  Unlike uniprocessors, your code does not have complete control over its execution. Instead, the OS handles all aspects of timing the execution of your code, and your code only makes requests to sleep or delay computations and make system calls.  The manner in which the OS chooses to execute program code is known as *scheduling*.   The scheduler has a certain scheduling precision (approximately 1ms on Linux, variable on Windows), handles contention between multiple processes for CPU and other resources, and orders processes according to their priority levels.  As a result, your code may not be able to have tight control over its timing.

<a id="processes"></a>
(A bit of definition: technically, a *process* is an instantiated *program*, and the OS can run multiple processes corresponding to the same program. Complicating things further, a process can run multiple concurrent *threads*, and a process can spawn multiple *sub-processes*.  For now let's assume that we are generating a single process of a single program that runs a single thread and no sub-processes. )

<a id="execution-loop"></a>
Let's try to write a simple program that performs a repetitive task at 20Hz.  Let's encapsulate the task in a single function called `do_work`.  Now, you will need to write an *execution loop* that repeatedly calls the task.  To get this code to run, your first attempt might be something like this.

In [1]:
import time  # time module

dt = 1.0/20.0     # run at 20Hz
iteration_count = 0

def do_work():
    print(".",end='',flush=True)

def done():
    return iteration_count >= 20

start_time = time.time()

#main execution loop
while not done():
    do_work()
    time.sleep(dt)
    iteration_count += 1

end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")

.................... finished in 1.0458383560180664 seconds.



This completed 20 iterations and finished in approximately 1 second, as desired.  However, our function did very little computation (just printing a dot!), so it finished essentially instantaneously.  And even yet, the loop did not finish perfectly on time!  Let's address these two issues one by one.

First, let's assume that our work takes a bit more time, say, doing some object recognition.  Let's simulate this by sorting a bunch of random numbers.

In [2]:
import random

def do_work():
    elements = [random.random() for i in range(50000)]
    elements_sorted = sorted(elements)
    print(".",end='',flush=True)

iteration_count = 0
start_time = time.time()  
while not done():
    do_work()
    time.sleep(dt)
    iteration_count += 1
end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")

.

................... finished in 1.3899846076965332 seconds.


Ah... the timing is getting worse. As you may have anticipated, the problem here is that we are sleeping 1/20 s regardless of how long `do_work` takes.  The solution here is to determine how long `do_work` takes and only sleep the remainder of the 1/20 s that is left, so that the next iteration of the loop starts at approximately the right time.  

In [3]:
iteration_count = 0
start_time = time.time()  
while not done():
    t1 = time.time()
    do_work()
    t2 = time.time()
    time.sleep(max(dt-(t2-t1),0))  #determine how much time we have left
    iteration_count += 1
end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")

.................... finished in 1.0905873775482178 seconds.


Great, now we are back to where we were when `do_work` was nearly instantaneous.  But, we are still off the 1s target by 10-100ms (depending on your OS).  What is happening here?

The culprit is that the `time.sleep(secs)` call does not return exactly `secs` seconds after it is called.  This *timing jitter* then adds up over a large number of iterations.  So, if we want to run this loop more precisely, we must calculate the ideal time at which we should be finishing a loop (minus the jitter), and then sleep a duration to get as close to that time as possible.

In the following implementation of the execution loop, we maintain a target time `t_sleep`.  This is advanced by `dt` every iteration, and we sleep the difference between this target time and the time after `do_work()` is called. 

In [9]:
iteration_count = 0
start_time = time.time()  
t_sleep = time.time()+dt
while not done():
    do_work()
    t = time.time()
    if t <= t_sleep:
        time.sleep(t_sleep-t)
        t_sleep += dt
    elif t <= t_sleep + dt:  #no sleep, try to catch up on next iteration
        t_sleep += dt
    else: #overran target by 2dt or more.  Need to shift the desired sleep time
        print("Significant time overrun")
        t_sleep = t+dt
    iteration_count += 1
end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")

.................... finished in 1.0072121620178223 seconds.


Now, this is pretty good! We are close to running the code at 10Hz as we can get with our `time.sleep()` call.

Note that there is some more complex logic here about time *overruns*.  When the task is just a little too slow for the execution loop (the first `elif` condition), we don't sleep at all, and instead let `do_work` run again.  We hope that this was just a fluke and we can catch up on the next iteration.  Now, when the task is significantly delayed (the `else` condition), then it is likely that it will continue to be delayed.  So, we print a bit of a warning and shift the sleep target entirely to `dt` plus the current time.

The reason why we do this is that sometimes a program will encounter a burst of slow work.  If you just shift by `k*dt` where `k` is the number of slow iterations, the amount of time elapsed will be `k*dt_slow` >> `k*dt`.  In response, unless we shift the target time entirely, the loop will run many times immediately in succession rather than recovering to the desired rate.  The below code simulates a burst of `k=5` slow bits of work, followed by a recovery to normal speeds.  You can try switching the comment in the below code to enable `t_sleep += dt`.  You will see that not only will the program think there's a large time overrun for all of the remaining fast bits of work, but it will try to complete all the rest without sleeping.

In [25]:
def do_work_bursty():
    N = 50000 if iteration_count >= 5 else 500000
    elements = [random.random() for i in range(N)]
    elements_sorted = sorted(elements)
    print(".",end='',flush=True)

iteration_count = 0
num_sleeps = 0
start_time = time.time()  
t_sleep = time.time()+dt
while not done():
    do_work_bursty()
    t = time.time()
    if t <= t_sleep:
        time.sleep(t_sleep-t)
        num_sleeps += 1
        t_sleep += dt
    elif t <= t_sleep + dt:  #no sleep, try to catch up on next iteration
        t_sleep += dt
    else: #overran target by 2dt or more.  Need to shift the desired sleep time
        print("Significant time overrun")
        t_sleep = t+dt
        #t_sleep += dt   #if we shift the target, then when we recover from the burst, a bunch of calls will be performed in succession
    iteration_count += 1
end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")
print(num_sleeps,"sleeps called")

.Significant time overrun
.Significant time overrun
.Significant time overrun
.Significant time overrun
.Significant time overrun
.............Significant time overrun
.. finished in 1.811042308807373 seconds.
14 sleeps called


Now, this is all very well and good, and hopefully you have learned something from working through this process.  However, it's a lot of boilerplate code to rewrite every time that you want to write an execution loop.  Luckily, most systems provide a timing primitive that will let you work with loops more straightforwardly.

In ROS, you would use the `rospy.Rate()` object.  The equivalent of the above code would look something like this in ROS1:

```python
r = rospy.Rate(20) # 20hz 
while not rospy.is_shutdown():
    do_work()
    r.sleep()
```

whereas in ROS 2 you would create the rate using `r = node.create_rate(20)` and `rclpy` instead of `rospy`.

In Klampt, which you hopefully have installed on your system to run the notebooks in this book, we have the `TimedLooper` object.  The above code would look something like this:

In [27]:
from klampt.control import TimedLooper

iteration_count = 0
start_time = time.time()  
looper = TimedLooper(dt=dt)  #or TimedLooper(rate=20)
while looper and not done():
    do_work()
    iteration_count += 1
end_time = time.time()  
print(" finished in",end_time-start_time,"seconds.")

.................... finished in 1.0119831562042236 seconds.


Operating systems, GUI frameworks, and asynchronous programming frameworks will all have some variant of these timing primitives.  However, you will probably need to work in an event-driven programming framework, which introduces a little added complexity.

One final remark is that you may be concerned that our timing is limited by the resolution of the system scheduler.  You might ask, well, why don't I get better timing out of a loop like this?

In [39]:
def mysleep(dt):
    t_start = time.time()
    i = 0
    while time.time()-t_start < dt:
        i += 1

This is known as **busy-waiting** and is generally bad practice.  There are two reasons why it is bad. First, it uses CPU resources rather than relinquishing them to the OS.  Second, it actually doesn't work very well! The reason is that the OS can take control away from your program at any point, and then return to it an arbitrary point in the future.  So unless you do a lot of scheduler priority hacking, this will not work terribly well.  Try switching out `mysleep` with `time.sleep` to see for yourself.

In [42]:
for test in range(20):
    t1 = time.time()
    #time.sleep(0.001)
    mysleep(0.001)
    t2 = time.time()
    print("Slept 0.001, actual time",t2-t1)

Slept 0.001, actual time 0.0019893646240234375
Slept 0.001, actual time 0.001993894577026367
Slept 0.001, actual time 0.0010018348693847656
Slept 0.001, actual time 0.001131296157836914
Slept 0.001, actual time 0.002009153366088867
Slept 0.001, actual time 0.0012111663818359375
Slept 0.001, actual time 0.001001596450805664
Slept 0.001, actual time 0.0010042190551757812
Slept 0.001, actual time 0.0010008811950683594
Slept 0.001, actual time 0.001990795135498047
Slept 0.001, actual time 0.001013040542602539
Slept 0.001, actual time 0.001997232437133789
Slept 0.001, actual time 0.0013000965118408203
Slept 0.001, actual time 0.0010004043579101562
Slept 0.001, actual time 0.0016357898712158203
Slept 0.001, actual time 0.0019927024841308594
Slept 0.001, actual time 0.0012860298156738281
Slept 0.001, actual time 0.0013346672058105469
Slept 0.001, actual time 0.0011472702026367188
Slept 0.001, actual time 0.013626813888549805



### Interacting with other processes 

A process usually runs concurrently with other processes and will exchange data / signals / function calls with them.  This could include I/O to sensor and motor drivers, system calls, computer displays, and other threads and processes as part of your behavior stack.  A system integration framework will provide some standardized way of making such exchanges, and we will discuss these in more detail when we cover [inter-process communication](#inter-process-communication).  But, we still need to structure our code so that it can interleave computations and interactions.  In order to do so, we need to know which kind of interaction *paradigms* are supported by the integration framework. We will consider three here.

#### Blocking requests

The first interaction paradigm we consider is **blocking** requests.  In this paradigm, our process makes an API function call that initiates communication with a receiving process, that process completes a function, and then the result is returned to our process.  This is conceptually very simple, because the interaction is treated just like a normal function call. As an example, if the API provides a function called `get_next_input` that handles all of the communication for you, then the execution loop would look like this:

```python
while not done():
    data = get_next_input()
    do_work(data)
    time.sleep(dt)
```

You might also be able to provide arguments to the function call, so that interacting with the other process is really just like calling a native function.  This is the basis of many [remote procedure call (RPC)](#remote-procedure-call) frameworks.  Here, the `call_remote_function` call is a blocking call that initiates contact with the other process, calls the `foo` function with arguments `arg1` and `arg2`, and then sends the result back.

```python
while not done():
    data = call_remote_foo(arg1,arg2)
    do_work(data)
    time.sleep(dt)
```

We can also chain these together with multiple interacting processes. Suppose `foo` is implemented on remote process 1, and `bar` is implemented on remote process 2, then we can arbitrarily chain calls like so:

```python
baz = call_remote1_foo(arg1,arg2)   #handles communication with remote process 1
data = call_remote2_bar(baz,arg3)   #handles communication with remote process 2
do_work(data)
```

Simple as can be, right?  Well, there are two concerns here.  The first is that a number of complex things can happen during communication, such as the disconnections, the process going down, errors, and incorrect argument types.  Proper error handling would require your program to continue properly under these anomolous cases. 

Second, our process is essentially paused ("blocked") while the request is being handled.  If the request takes a long time due to a slow network or a complex remote function call, then performance of our execution loop will falter.  Worse yet, there are cases in which the interaction framework does not know when the request will terminate, and hence it can hang for an arbitrarily long amount of time.  As an example, see the [`socket.recv`](https://docs.python.org/3/library/socket.html#socket.socket.recv) low-level network function provided in Python. Deep inside the documentation, you would see that this by default is a blocking call.  For these cases, it is helpful for the framework provider to provide a *timeout* parameter which lets the request wait for a certain number of seconds, and if no response is provided yet, a timeout error will be returned.  For the socket case, you would use the [`socket.settimeout`](https://docs.python.org/3/library/socket.html#socket.socket.settimeout).  Setting appropriate timeouts becomes especially problematic when several different requests need to be made simultaneously.

For this reason, blocking is generally a problematic paradigm for tight execution loops unless you can essentially guarantee that the receiver will respond quickly.  However, due to its convenience it can be a good choice for slower processes communicating over local networks.

#### Polling loops

The second paradigm we consider is **polling**.  In polling, our process repeatedly checks for the current value of an external variable, or whether a new value has been updated.  This is a popular paradigm for low-level hardware drivers because functions can interact with (read from, write to) hardware registers directly.

A polling check is performed by calling an API function provided by the interaction framework, and this function will return immediately.   As an example, suppose that the API gives us a function `check_for_input` that returns `None` if no input is available, but returns non-none if input is available.  We would then implement a polling loop that looks like the following:

```python
while not done():
    data = check_for_input()
    if data is not None:
        do_work(data)
    time.sleep(dt)
```

Here, our loop checks for a new value every `dt` seconds, and if a value is present, then it calls a *handler* `do_work` that performs some unit of application-specific work.  You will define the handler function to implement your program logic.

The smaller we make `dt`, the more frequently we check, and the more quickly our program can respond to the arrival of an input.  If our process is entirely service-driven and simply responds to input, we will typically want to make this as small as possible.  However, if our process is normally busy doing something in its execution loop and just needs to respond to input when it arrives, then it is better to set `dt` according to the normal loop rate.  As an example, if the input affects the configuration of the normal work to be done, we would structure a polling loop as follows:

```python
settings = {'some_setting':5}  #some kind of settings used in do_work

def do_work():
    global settings
    print("Setting is",settings['some_setting'],'doing work')
    ...

looper = TimedLooper(dt)   #as before, this is more consistent than manual sleeping
while looper and not done():
    new_setting = check_for_settings_input() 
    if new_setting is not None:
        settings['some_setting'] = new_setting
    do_work()
```

Polling has a strong advantage over blocking calls in that our process is free to do work while the communication framework handles whatever it needs to do to receive the incoming message.  As an example, we can set [non-blocking mode](https://docs.python.org/3/library/socket.html#socket.socket.setblocking) on a network socket and then `socket.recv` would return -1 if no data is available.  

Now if we have multiple possible interactions with multiple other processes, we would need to poll for all of these interactions in some order.  If we are interacting with two processes, plus doing background work, the polling loop would look something like this:

```python
while not done():
    data = check_for_input1()
    if data is not None:
        do_work1(data)
    data = check_for_input2()
    if data is not None:
        do_work2(data)
    do_background_work()
    time.sleep(dt)
```

This might be fine if each of our work units was lightweight, but if we have long-running tasks, or requests coming in quickly from one of the interacting processes, then we won't necessarily have an optimal distribution of work. We will always serve input 1, then input 2, then background work, in that order.  Another option would be to prioritize which tasks to attend to, using a nested handler as follows:

```python
while not done():
    data = check_for_input1()
    if data is not None:
        do_work1(data)
    else:
        data = check_for_input2()
        if data is not None:
            do_work2(data)
        else:
            do_background_work()
    time.sleep(dt)
```

Now, this could work reasonably well if one of the inputs doesn't overwhelm our loop, but let's suppose that both input 1 and input 2 are sending information more quickly than our process can handle.  What will happen is that responding to input 1 will *starve* the handler for input 2 as well as the background task.  For cases like these, a better approach is to use an event-driven paradigm, described below.

Another typical issue with multiple polling is that some items should be polled at a different frequency than others.  The design pattern that should govern this case is that you take small steps according to the least common multiple of the desired polling frequencies, and check whether polling should be performed at a given iteration.  The code would look something like this:

```python
dt1 = 0.25     #4Hz
dt2 = 0.1      #10Hz
dt_bg = 0.1    #10Hz
dt_step = 0.05 #dt corresponding to least common multiple of 4Hz and 10Hz
next_trigger1 = time.time()
next_trigger2 = time.time()
next_trigger_bg = time.time()
while not done():
    t = time.time()
    if t >= next_trigger1:
        data = check_for_input1()
        if data is not None: do_work1(data)
        next_trigger1 += dt1
    if t >= next_trigger2:
        data = check_for_input2()
        if data is not None: do_work2(data)
        next_trigger2 += dt2
    if t >= next_trigger_bg:
        do_background_work()
        next_trigger_bg += dt_bg
    time.sleep(dt_step)
```

Also, in this case an event-driven paradigm would make this logic much easier to program.

#### Event loops

Finally, we will consider the **event-driven** paradigm.  Here, the framework tells you which kinds of events have occurred and the execution loop will respond to them.  This is a very popular paradigm for GUIs, high-performance networking servers, and is used in ROS as well.  In the *dispatcher* paradigm, it is your program's job to write an execution loop and then implement the appropriate response to whatever event has occurred.  As an example, in an event-driven framework the above program would be rewritten as follows:

```python
while not done():
    event_type,event_data = check_for_event()
    if event_type is None:
        do_background_work()
    elif event_type == 'input1':
        do_work1(event_data)
    elif event_type == 'input2':
        do_work2(event_data)
```

Here the `check_for_event` function would return both the event type and the event data, returning `None` if no event is available.   This program now runs the background task as quickly as possible.  If the framework has a timer event, you can implement a background task by creating a timer event.  The event loop now looks like the following: 

```python
begin_timer_event(dt)
while not done():
    event_type,event_data = wait_for_event()
    if event_type == 'timer':
        do_background_work()
    elif event_type == 'input1':
        do_work1(event_data)
    elif event_type == 'input2':
        do_work2(event_data)
```

For full-featured event loop frameworks, this lets you put pretty much your entire program into the event loop!

<a id="callback"></a>
In the *callback* paradigm, the framework allows you to *bind* an event to perform a *callback function* that gets triggered whenever the event occurs.  We will indeed bind events to our handlers, so "handler" and "callback" are synonymous in this paradigm.  ROS and many GUI packages use callbacks to implement their event loops.  The general implementation will look something like this, where the `set_callback` and  `run_event_loop` functions are provided by the framework.

```python
begin_timer_event(dt,'timer')
set_callback('timer',do_background_work)
set_callback('input1',do_work1)
set_callback('input2',do_work2)
run_event_loop()
```

This looks quite elegant!  However, this example is too simple to represent most real-world processes. The two big downsides to having a full-featured event-loop framework are that 1) if you need to rearrange callbacks and keep track of the state of the system, this leads to very complex and hard-to-parse logic (callback spaghetti) [characteristic of JavaScript networking code](http://callbackhell.com/), and 2) tracking the execution of code requires intimate knowledge of the event-loop framework. 

As an example, consider building a process that waits for another process to start it or stop it, and starting means that the external process calls `start(items)` to ask our program to do a batch of work.  The desired calculation is `[foo(item) for item in items]`, where `foo()` is a remote procedure call that can run up to 4 worker processes.  To provide the response to a third process, our program should call `return_outputs` when `stop` is called.  Should be straightforward enough, right?  But can you trace what's going on by reading this code?

```python
inputs_left = []
outputs = []
NUM_WORKERS = 4

def foo_called(args,result):
    global outputs
    (index,data) = args
    outputs[index] = result
    
def do_work(): 
    global inputs_left
    for w in NUM_WORKERS:
        if len(inputs_left) > 0:
            index,data = inputs_left.pop(0)
            call_remote('foo',(index,data)).callback(foo_called)   #in our hypothetical framework, call_remote generates an RPC call and triggers the callback when done

def start(items)
    global inputs_left,outputs
    inputs_left = list(enumerate(items))
    outputs = [None]*len(items)
    set_callback('handle_rpc_response',handle_rpc_response)
    set_callback('timer',do_work)

def stop():
    global inputs_left,outputs
    call_remote('return_outputs',outputs)
    inputs_left = []
    outputs = []

set_callback('start',start)
set_callback('stop',stop)
run_event_loop()
```

Now, for the real professionals out there... *Can you spot the bug?* (It's not obvious from reading the code at all, but there's a potential access error when an RPC response is returned after another program has called `stop`.  The `stop()` function clears the outputs, but if another RPC response comes in after stop was called, the callback attempts to fill in the stale output. ) 

Now, perhaps you would like to have proper error handling when a worker fails to complete an RPC call.  To do this, you'd need to define other callbacks that are invoked during an error.  It would look something like this:

```python
def foo_error(args,error):
    global inputs_left
    #return to the worker queue
    index,data = args
    inputs_left.append((index,data))

...

            call_remote('foo',(index,data)).callback(foo_called).error(foo_error)
```

Adding further to the mayhem, perhaps you'd like to try returning the outputs up to 5 times before quitting.  An implementation would look like the following, adding to a total of 7 functions and an extra global variable to maintain countdown state:

```python
...

error_countdown = 0
def return_outputs_error():
    global outputs,error_countdown
    error_countdown -= 1
    if error_countdown <= 0:
        print("Couldn't return the outputs")
        return_outputs_success() #just clear the response
    else:
        time.sleep(1.0)  #wait a second before trying again
        call_remote('return_outputs',outputs).error(return_outputs_error)

def return_outputs_success():
    global inputs_left,outputs
    inputs_left = []
    outputs = []

def stop():
    global inputs_left,outputs,error_countdown
    error_countdown = 5
    call_remote('return_outputs',outputs).error(return_outputs_error)
```

So much for elegant!


#### Separation of application and communication

One thing that you will have noticed in the above examples is that we are making heavy use of global variables to share data between our handlers.  If you've taken even basic computer science courses they will typically tell you that this is very bad practice.  To a large extent, they are right.  After the number of global variables exceeds a small handful, you are dealing with near-spaghetti code that is hard to debug and maintain. 

The best practice here, whether you are working with ROS, socket code, XML-RPC, LCM, or whatever, is *separation of application and communication*. Specifically, the best thing to do is to keep your shared variables, tasks, and handlers in a single class object that handles all calculations.  With such a framework, your class defines the API by which your program operates.  You will also be able to develop unit tests and switch communication frameworks more easily when the time comes.

As a concrete example, consider an application that reads images from another process and extracts a target point from each one, controlling a robot to servo to the extracted point.  The extraction process can be configured by some settings, which should also be read from an external process.  To implement this functionality, you should define a single class as follows (in a separate file from the main file):

```python
class MyProgram:
    def __init__(self):
        self.settings = {'some_setting': 5}
        self.target = None
    def background_task(self):
        if self.target is not None:
            # Send motor commands to servo to the extracted point
        ... #do something...
    def set_setting(self, new_setting):
        self.settings['some_setting'] = new_setting
    def extract_target(self, img: np.ndarray):
        # Run some algorithm to extract `new_target` from `img`
        self.target = new_target
    def done(self):
        ... #do something...
```

Then, in the main file, you should do the following (assuming a polling-based paradigm):

```python
from myprogram import MyProgram

program = MyProgram()
while not program.done():
    new_settings = check_for_settings()
    if new_settings is not None:
        program.set_setting(new_settings)
    img_bytes = check_for_image()
    if img_bytes is not None:
        img_numpy = decode_image(img_bytes)
        program.extract_target(img_numpy)
    program.background_task()
    time.sleep(dt)
```

Note here that the program's methods were named *independently* of the communication channels or other processes that invoke them, and instead are named *for the function they perform*.  This is a very good practice, and is a mark of well-organized code.  Note that our methods in this case are not 1-to-1 mapped to handlers: the communication processing required to turn an incoming byte string into a numpy array representing an image is handled in the main file since the implementation of `MyProgram` shouldn't necessarily be working with the details of how the image is encoded.

You might also imagine in the future working with someone who specializes in the workings of some algorithm, while you specialize in system integration.  You would ask your partner to write a class providing familiar interfaces to their algorithm, while job would be writing the communication "wrapper" around it.  With this kind of organization, you allow the partner to specialize in their part without needing to know the ins and outs of the system integration system, while you won't need to know all the details of the algorithm, just its top level interface!  Separation of application and communication is just the start of sensible code organization for working on teams, as we shall see later in the [system engineering chapter](SystemsEngineering.ipynb#software-organization).


#### Mixed-paradigm loops and multithreading

In sufficiently complex systems you will inevitably come to a point in which you may need to write code that connects multiple components together, and yet their frameworks won't match.  In such a case, you may need to mix blocking requests, polling, and event-based execution loops within your code. 

*Polling within an event loop* is fairly straightforward.  The best practice is to set a timer, whose callback function performs the polling request.  Simply enough, your code would look something like this:

```python
begin_timer_event(dt,'timer')
set_callback('timer',do_polling)
set_callback('input1',do_work1)
set_callback('input2',do_work2)
run_event_loop()
```

*Handling events within a polling loop* can sometimes be done, but this requires that the event-driven framework provide you with the ability to drive the event loop.  Supposing it provides a function `handle_events()`, you can run this in your polling loop.

```python
while not done():
    #do polling work
    data = check_for_input1()
    if data is not None:
        do_work1(data)
    ...
    do_background_work()
    #do event-loop handling
    handle_events()
    time.sleep(dt)
```

Unfortunately, many event-driven frameworks do not provide such functionality, and instead ask you to implement your entire program within the `run_event_loop` executor function.  In this case, embedding polling into the event loop is a better option.

<a id="threading"></a>
To mix blocking calls or accomplish very complex integrations between polling and event-based loops, you may need to resort to *multithreading*.  Multithreading uses the OS to interweave the computations between multiple execution loops in a single program.  In this way, you can implement separate loops, and integrate their interactions in your handlers.  Threads share memory and can access the same variables in your program, so inter-thread communication is a simple matter of changing variable values.  However, the tricky thing with multithreading is to ensure that reads and writes do not case *race conditions*, which are unexpected behavior due to the thread scheduler interleaving code between multiple threads in interesting ways.  A full discussion of multithreading is beyond the scope of this book, but suffice to say that this is usually handled by *locking* critical sections.  If you are using the separation of calcuation and communication paradigm that we described above, this can be handled globally using a shared lock and wrapping handlers in the lock, as follows:

```python
import threading
from myprogram import MyProgram

program = MyProgram()
lock = threading.Lock()

def polling_main(program,lock):
    while not program.done():
        data = check_for_input1()
        if data is not None:
            with lock:
                program.set_setting(data)
        data = check_for_input2()
        if data is not None:
            with lock:
                program.perform_query(data)
        with lock:
            program.background_task()
        time.sleep(dt)

def blocking_main(program,lock):
    while not program.done():
        data = request_input3()  #blocks
        #DO NOT LOCK AROUND A BLOCKING CALL! only lock handlers
        with lock:
            program.send_another_input(data)

polling_thread = threading.Thread(polling_main,args=(program,lock))

polling_thread.run() # Start a child thread to execute the polling loop
blocking_main() # Run the main program in the continuation of the parent thread
```

This architecture allows for the interweaving of the two loops, and can effectively overlap the latency incurred by polling for data.

NOTE: In many implementations of Python (CPython, PyPy), the [Global Interpreter Lock (GIL)](https://wiki.python.org/moin/GlobalInterpreterLock) prevents multiple threads from executing simultaneously, which can create bottlenecks in compute-bound workloads. I/O operations take place outside the GIL, so their latency can be overlapped using Python threads. To achieve truely concurrent execution across several CPU cores, a separate [process](https://docs.python.org/3/library/multiprocessing.html) (rather than thread) must be spawned. This incurs much more startup and communication overhead than using threads.

### State machines

Now, the last thing we will talk about is how to implement procedures that have multiple stages within an execution loop.  A *state machine* is the classical way of doing this.  It may seem unnatural at first, but this is an extremely common design pattern and it should be quite natural for robot system integrators to whip up state machines within their code.

Let's take a simple example of a stop light controller.  It operates on a 40 second cycle, and should stay on red for 21 seconds, green for 16 seconds, and yellow for 3 seconds.  (We are ignoring the coordination logic for the cross street light, but this gives a buffer of 2 seconds in which both our light and the cross street light are both red.)  Our program has access to a call `set_color(x)` to set the light to one of the three colors.  Our first implementation might look like the following:

```python
set_light('red')
while True:
    sleep(21.0)
    set_light('green')
    sleep(16.0)
    set_light('yellow')
    sleep(3.0)
    set_light('red')
```

This should work reasonably well.  Now, let's consider that the road controlled by our light is a small road crossing a larger busy road, and so we add a magnetic loop that detects whether a car is waiting at the stop sign.  If there is no car, we let our light wait for 61 seconds on red rather than 21.  The binary input from the loop is provided by a call `loop_detects_car` which provides the instantaneous detection.  An attempt at writing this logic may look like the following:

```python
set_light('red')
while True:
    if loop_detects_car():
        sleep(21.0)
    else:
        sleep(61.0)
    set_light('green')
    sleep(16.0)
    set_light('yellow')
    sleep(3.0)
    set_light('red')
```

But there's a serious problem here.  The detection call is only run at the instant that the light turns red!  So even if a car arrives a split second after the light turns red, it will wait the full minute until it can proceed.  A better approach is to write a state machine that has separate logic for each phase of this light. 

A state machine maintains a state variable, and the state-dependent logic is handled by a switch statement with multiple cases (if-else branches in Python).  In each branch of the switch statement, there is logic that performs I/O and possibly *transitions* to another state.  Our loop now looks like this:

```python
set_light('red')
state = 'red_long'     # can be 'red_long', 'red_short', 'green', or 'yellow'
dt = 0.5               # run at 2Hz
time_in_state = 0      # an auxiliary variable that will be accumulated in each state
while True:
    if state == 'red_long':
        if loop_detects_car() or time_in_state >= 40:  #car is waiting or we have 21s left
            state = 'red_short'
            time_in_state = 0
    elif state=='red_short':
        if time_in_state >= 21:
            set_light('green')
            state = 'green'
            time_in_state = 0
    elif state == 'green':
        if time_in_state >= 16:
            set_light('yellow')
            state = 'yellow'
            time_in_state = 0
    else: #state == 'yellow'
        if time_in_state >= 3:
            set_light('red')
            state = 'red_long'
            time_in_state = 0
    time_in_state += dt
    sleep(dt)
```

Note that we have the `time_in_state` auxiliary variable which provides us with the very common utility of transitioning states after a given duration has elapsed. The incrementing of this variable on every iteration and its resetting on every state transition must be performed, and it is easy to forget to perform one of these steps or to call `continue` and short-circuit the incrementing and sleep.  Also, we might make a typo and set `state` to an invalid value.  So, it may be better to define subroutines to help catch bugs.  These *state machine libraries* usually have a bit of a learning curve, but for large state machines it is often worthwhile to use them.

As a small example, you can use the following code to define simple state machines.  (If you like it and want to use it in your projects, you can use find this code in `rsbook_code.integration.statemachine`.)

In [27]:
import time
from typing import List,Tuple,Callable,Optional

class StateMachine:
    """A simple state machine class.  Add your states, transitions, and auxiliary data.
    Then, repeatedly call step(). """
    def __init__(self,states : List[str]):
        self.states = states
        self.state_logic = [[] for s in states]
        self.state_transitions = [[] for s in states]
        self.current_state = 0
        self.current_entry_time = None
        self.auxiliary_data = {'start_time':None,'time':None,'state':None,'duration_in_state':None}
    
    def reset(self, init_state : Optional[str] = None):
        """Sets the state machine back to an initial state."""
        if init_state is None:
            self.current_state = 0
        else:
            s = self.states.index(init_state)
            if s < 0:
                raise ValueError("Invalid state")
            self.current_state = s
        self.current_entry_time = None
        self.auxiliary_data['state']=None
        self.auxiliary_data['start_time']=None
        self.auxiliary_data['time']=None
        self.auxiliary_data['duration_in_state']=None

    def add_data(self, name : str, value) -> None:
        """Adds a new item of data to be passed to callbacks."""
        self.auxiliary_data[name] = value

    def add_transition(self,source : str, target : str, test : Callable) -> None:
        """Test is a function f(data) where data is a dict of names
        (added by add_data()) mapped to values.  'time', 'start_time',
        and 'duration_in_state' are also available.

        If trigger is not None, then it is a function f(data) that gets
        called if test(data) returns True.

        Neither should modify data.
        """
        s = self.states.index(source)
        if s < 0:
            raise ValueError("Invalid source state")
        t = self.states.index(target)
        if t < 0:
            raise ValueError("Invalid target state")
        self.state_transitions[s].append((t,test))
    
    def add_logic(self, state : str,
                  enter : Optional[Callable]=None,
                  loop : Optional[Callable]=None,
                  exit : Optional[Callable]=None) -> None:
        """Adds enter, loop, or exit callbacks to a state.  The format of a callback
        f(data) is the same as in add_transition.  However, these function are allowed
        to modify the data dictionary. 
        
        Note: Changes to the defaults 'state', 'time', etc do not actually change
        these quantities in the state machine.

        Note: each callback should not block or take longer than your desired
        time step, so it is not appropriate for long-running tasks.
        """
        s = self.states.index(state)
        if s < 0:
            raise ValueError("Invalid state")
        self.state_logic[s].append((enter,loop,exit))
    
    def all_logic_all(self, enter : Optional[Callable]=None,
                      loop : Optional[Callable]=None,
                      exit : Optional[Callable]=None) -> None:
        """Adds a callback to every state. See all_logic()"""
        for l in self.state_logic:
            l.append((enter,loop,exit))

    def step(self, current_time = None) -> None:
        """Steps forward the state machine logic.  If current_time is given, you
        can control the internal timer.  Otherwise, it uses time.time().
        """
        if current_time is None:
            current_time = time.time()
        if self.current_entry_time is None:
            self.current_entry_time = current_time
            self.auxiliary_data['start_time'] = current_time
        self.auxiliary_data['state'] = self.states[self.current_state]
        self.auxiliary_data['time'] = current_time
        self.auxiliary_data['duration_in_state'] = current_time - self.current_entry_time
        for en,l,ex in self.state_logic[self.current_state]:
            if l is not None:
                l(self.auxiliary_data)
        for (t,test) in self.state_transitions[self.current_state]:
            if test(self.auxiliary_data):
                #transition from s to t
                for en,l,ex in self.state_logic[self.current_state]:
                    if ex is not None:
                        ex(self.auxiliary_data) 
                for en,l,ex in self.state_logic[t]:
                    if en is not None:
                        en(self.auxiliary_data) 
                self.current_entry_time = current_time
                self.current_state = t
                break
    
    def absorbing_states(self) -> List[str]:
        """Returns the list of absorbing (terminal) states."""
        return [self.states[s] for s in range(len(self.states)) if len(self.state_transitions[s])==0]

    def transitions(self) -> List[Tuple[str,str]]:
        """Gathers all transitions in a list of state pairs"""
        res = [set() for s in self.states]
        for s in range(len(self.states)):
            for t,test in self.state_transitions[s]:
                res[s].add(t)
        reslist = []
        for s,ts in enumerate(res):
            for t in ts:
                reslist.append((self.states[s],self.states[t]))
        return reslist

    def duration(self):
        """Returns how long the state machine has been running"""
        if self.auxiliary_data['start_time'] is None:
            return 0
        return self.auxiliary_data['time'] - self.auxiliary_data['start_time']

    def duration_in_state(self):
        """Returns how long the state machine has been in the current state."""
        if self.auxiliary_data['duration_in_state'] is None:
            return 0
        return self.auxiliary_data['duration_in_state']

An implementation of our friendly stop light with this library is now given below.

In [28]:
#simulating I/O, but these would usually be defined externally
def set_light(color,data):
    time_since_start = data['time'] - data['start_time']
    print("Setting light to",color,"at time",time_since_start)

def loop_detects_car(data):
    #fake a loop detection between 100 and 125 s
    time_since_start = data['time'] - data['start_time']
    if time_since_start > 100 and time_since_start <= 125:
        return True
    return False

In [32]:
#now we define our "program"
light = StateMachine(['init','red_long','red_short','green','yellow'])
light.add_logic('yellow',enter=lambda data:set_light('yellow',data))
light.add_logic('red_long',enter=lambda data:set_light('red',data))
light.add_logic('green',enter=lambda data:set_light('green',data))
light.add_transition('green','yellow', lambda data:data['duration_in_state'] >= 16)
light.add_transition('yellow','red_long', lambda data:data['duration_in_state'] >= 3)
light.add_transition('red_long','red_short', lambda data:data['duration_in_state'] >= 40 or loop_detects_car(data))
light.add_transition('red_short','green', lambda data:data['duration_in_state'] >= 21)
light.add_transition('init','red_long', lambda data:True)

print("Absorbing states:", light.absorbing_states())
print("Possible transitions:", ', '.join('->'.join(ab) for ab in light.transitions()))

Absorbing states: []
Possible transitions: init->red_long, red_long->red_short, red_short->green, green->yellow, yellow->red_long


In [30]:
#now, here's a simulation of the stop light running for 200s.  

dt = 0.5
fake_time = 0.0
light.reset()
while fake_time < 200:
    if round(fake_time,1) == 100:
        print("A car is detected")
    elif round(fake_time,1) == 125:
        print("The car is gone")
    
    light.step(fake_time)
    
    fake_time += dt

Setting light to red at time 0.0
Setting light to green at time 61.0
Setting light to yellow at time 77.0
Setting light to red at time 80.0
A car is detected
Setting light to green at time 121.5
The car is gone
Setting light to yellow at time 137.5
Setting light to red at time 140.5


In [31]:
#now, here's a simulation of the stop light running for 200s, running at 100x real time speed 
from klampt.control import TimedLooper

dt = 0.5
time_scale = 100
looper = TimedLooper(dt/time_scale)
light.reset()
while light.duration() < 200 and looper:
    light.step(time.time()*time_scale)


Setting light to red at time 0.0
Setting light to green at time 61.05499267578125
Setting light to yellow at time 77.56610107421875
Setting light to red at time 81.02090454101562
Setting light to green at time 121.06982421875
Setting light to yellow at time 137.57403564453125
Setting light to red at time 140.63671875


Now this is a pretty simple state machine "framework", and more sophisticated frameworks like [SMACH](https://wiki.ros.org/smach) in ROS 1 can do much more, such as allow hierarchical state machines, visually display the state machine graph, generate logs, etc.

Of course, every sophisticated library takes some time to get used to.  For example, from the first line of the documentation: "For all SMACH containers, the interface to contained states is defined via state outcomes. A state's potential outcomes are a property of the state instance, and must be declared before it is executed. If a SMACH plan is written by hand, all potential outcomes are declared on construction and the consistency of the state transitions can easily be checked without executing it."  Um. Ok. Not the most user-friendly documentation. Basically, this means that the possible transitions out of a state are defined by the state as "outcomes", they are returned by the state's logic, and when you define your state machine you specify how an outcome transitions to another state.  Also, SMACH allows your logic to block, where our above implementation requires the logic to "tick" every time the `step()` function is called.  As of writing, SMACH has not yet ported to ROS 2.  Fortunately, for simple state machines, it's relatively easy to write your own.  

### Launching concurrent processes

TODO

Multiple terminal windows: Terminator

GUI programs: Windowed OS 

ROS launch files

Process managers: [Procman/Sheriff](https://github.com/ashuang/procman)


## Inter-process communication

In the prior section, we assumed there were some "calls" available that your process could use to communicate with another process.  The umbrella term for this process is *inter-process communication* (IPC), and there is a massive diversity of protocols and paradigms that implement IPC in robotics. A good communication package like ROS will abstract a lot of the details of IPC away so you can focus on writing programs. However, you will find that you won't be able to escape bugs, issues, and performance drops without a deeper understanding of an IPC stack.  Also, when you are rushing to make a deadline, you don't have the luxury of waiting for someone else to provide ROS bindings to whatever device or program that you want to use.  You may also be the programmer responsible for providing those bindings.  If any of these cases sound familiar, read on!  This section gives a rapid tour of the basic topics that you'd study in a computer science networking class, to give you enough understanding to master IPC.

We'll focus on the programming side of communication, because this is what concerns most system integrators, and we assume that you'll purchase whatever networking equipment or cables will be necessary to connect the hardware pieces of your system. Now, there are several physical mechanisms over which a process on your computer can communicate with processes running on the same computer, running on devices attached to your computer, or running on devices over the Internet.  The most common mechanisms are: 

- *APIs*: a program imports another program as a library and directly calls its functions.
- *Shared memory*: two nodes on the same machine that have access to the same RAM.
- *Pipes*: two nodes on the same machine communicate as though they are reading from stdin / writing to stdout.
- *Serial port*: a device directly connected to another by a USB cable, RS-XXX, or similar interface.  The port can then be connected to all sorts of devices, such as Arduinos, a CAN bus, a radio transmitter, almost anything
- *System bus*: you're likely to encounter this in CPU - GPU program communication, but also used in supercomputers.
- *Ethernet network*: used to connect machines on a local network.
- *Internet network*: used to connect any two machines across the world. 
- *Loopback network*: two nodes on the same machine communicating over the same socket interface used in Ethernet and Internet networks.

(Note that we speak of *nodes*, which are similar to processes, but are more general to encompass any communication endpoint.)

The mechanism can be abstracted away by a communication *interface*. Most notably, the standard (POSIX) socket interface can be used to communicate between two nodes on the same machine, two nodes connected over Ethernet, or two extremely distant nodes over the Internet. For this reason, sockets are very popular. 

The key question in system integration is this: how do I send an object in my application's native language, such as a Numpy RGB-D image, to another node, such as a 3D mapping algorithm, and receive something that I can use, such as an Open3D mesh representing the map?  How does my program get connected with that node, and what impact do these choices have?  Also, each communication operation triggers a cascade of real-time processing at different layers from the network interface to the OS down to the network hardware, and lots can go wrong.  If you want to confidently choose IPC strategies, program with IPC, and debug problems that may occur, you'll need to understand how these layers operate.

### OSI communication model

The Open Systems Interconnection (OSI) model describes every major interprocess communication system.  It consists of the following 7 system layers, listed from lowest-level to highest-level:

1. Physical layer
2. Data link layer
3. Network layer
4. Transport layer
5. Session layer
6. Presentation layer
7. Application layer

The first three layers refer to how bytes are transmitted as electrical signals over the network, and we will not cover them much here.  But we will touch on layers 4-7, since these will affect our application. A system architect will need to make several choices at these levels to configure their communication system. 

Layer 4 lies on top of the network interface and implements the *transport protocol*, which defines how connections are established and data is sent across the network.  Usually, a large amount of data is split into smaller *packets* transmitted one at a time.  Protocols handle errors differently, such as UDP in which packets that are lost are simply dropped, whereas TCP/IP will retry lost packets. At layer 5, you may need to know a bit about establishing sessions (such as clients, servers, IP addresses, and ports in socket programming) but some mechanisms make this easy for you (such as a properly configured ROS system). In layer 6, a communication system may perform *serialization*, which turns objects in your programming language of choice into raw bytes, and back again on the other side.  Layers 6-7 also implement the communication *paradigms*, such as publish-subscribe or remote procedure call, by which your application invokes code that calls the communication system. 

We will elaborate on these layers in the following sections, starting at the top and moving down the stack.

### Application-level paradigms

In communicating with another node, we program our application to construct and use a communication *channel*.  Your program and the other node become the *endpoints* of the channel, and we typically speak of the other node as *the endpoint*.  Your application needs to be constructed to perform 4 tasks.
1. Open the channel.
2. Perform communication, which could involve reading data, writing data, or both (bidirectional).
3. Handle errors that might occur during communication.
4. Close the channel.

The lifetime of the communication channel is typically a single *session* involving one open and one close operation in steps 1 and 4, but more sophisticated interfaces offer automatic "reconnect" or "keepalive" functionality that attempts to re-open the session if it is closed due to a bad connection or the endpoint shuts down. 

Most paradigms assign one of the endpoints to the role of the *server* and the other endpoint is the *client*.  The client needs to know the *address* of the server, whereas the converse may or may not hold. To open the connection, there are a variety of mechanism-specific ways that the client can specify the address of the server, such as a file descriptor for serial ports like `/dev/ttyUSB0` on Linux, or an IP address for a socket like `192.168.0.10`, a world-wide-web URL like `www.google.com`, or a ROS topic like `/move_base/global_costmap/costmap`.  Servers, on the other hand, "listen" for client connections and then start communication channels dynamically. 

For the main communication functionality, there are generally five different communication paradigms in use in modern systems: 
1. *Stream*: one endpoint writes data as an unlimited series of items, either raw bytes or structured objects, and the other endpoint reads those items, usually in the order in which they were written.  Examples: POSIX sockets, Websockets, serial protocols like USB, pipes.
2. *Publish-subscribe* (pub-sub): one endpoint, the publisher, publishes an object and the other endpoint, the subscriber, is notified via a callback when the object arrives. Communication is one-way.  Examples: ROS, LCM.
3. *Request-reply* (req-rep): one endpoint (usually the client) writes a request to the channel, and the other (usually the server) replies to the request with a certain response.
4. *Get-set*: one endpoint (usually the server) stores a collection of data, and the other (usually the client) reads or writes data from a specific index (such as a memory address, database key, or record ID) into the collection.  A form of request-reply. Examples: databases, shared memory, I2C registers.
5. *Remote procedure call* (RPC): one endpoint (usually the client) performs a remote function invokation on the other endpoint (usually the server), and the other endpoint (often) returns the result of the function to the caller.  A form of request-reply. Examples: ROS services, gRPC, XML-RPC, SOAP, Apache Thrift.

Each paradigm has certain strengths and weaknesses that make them more or less suited for different tasks.  Streams and pub-sub do not anticipate a response or action on the endpoint, so these are good choices for sending or receiving large amounts of data with low latency.  Streams are often more convenient if the endpoint occasionally needs to respond to the sender, e.g., to acknowledge delivery, report error conditions, or control what data is being sent.  They are also closer to how the network stack actually transmits data, with the other paradigms typically implemented on top of streams.  Get-set is good for storing collections of data on a server in which one or more clients access portions of the data.  However, it does require a round-trip to retrieve values, which can limit the data transfer rate.  Finally, RPC is a good model for interacting with another node as though it implements a class with many methods.  When everything goes well, is offers a very familiar object-oriented design pattern where other nodes can be thought of as "remote objects" that support method invokation.

Let's look at some pseudocode as well as real example code.

#### Stream paradigm

In the stream paradigm, the communication channel is typically bidirectional and can support reads and writes.  At the lowest level, you will be reading and writing *bytes* rather than object, and so [serialization](#serialization) and [framing](#framing) will be up to the protocol.  If you wish to stream objects, read ahead to those sections, but for now let us assume that we know that we simply want to communicate raw bytes.

A writer's code will be structured as follows:

```python
stream = stream_open(endpoint_address)
while not done():
    data = generate_data()  #something application-specific
    print("Writing",len(data),"bytes")
    stream_write(stream,data)
stream_close(stream)        #optional, but good practice
```

A reader's code will be structured as follows:

```python
stream = stream_open(endpoint_address)
N = 1000  #some buffer size
while True:
    data = stream_read(stream,N)   #note: take care to understand whether this is blocking or nonblocking
    if len(data) > 0:
        print("Read",len(data),"bytes")
    ingest_data(data)       #something application-specific
stream_close(stream)        #optional, but good practice
```

You can also mix reading and writing on the same stream, if it supports it:
```python
stream = stream_open(endpoint_address)
N = 1000  #some buffer size
while True:
    data = stream_read(stream,N)   #note: take care to understand whether this is blocking or nonblocking
    if len(data) > 0:
        print("Read",len(data),"bytes")
    result = ingest_data(data)       #something application-specific
    if result is not None and len(result) > 0:
        stream_write(stream,result)  #write response
stream_close(stream)        #optional, but good practice
```

Note that the `stream_read` API may be nonblocking and return nothing if no data is available (e.g., Arduino serial port), or it can block until something arrives (default settings for POSIX sockets).  Sockets can also be configured to be nonblocking or to have a timeout, and this is very important to configure properly.

As far as setup goes, this can be very simple in the case of serial port (e.g., Arduino), or more complex in the case of sockets.  For a device connected by a serial port, the CPU code needs to identify the right port (e.g., `/dev/ttyUSB0`) and configure the baud rate, while the device just needs to configure a compatible baud rate.

For POSIX sockets, a bit more work needs to be done to set up an appropriate *port* for a server to listen on.  The client will identify the server's IP address and the port to communicate with.  Note that in the POSIX socket convention, reads and writes are called `recv` and `send`.

The boilerplate server code for TCP/IP sockets looks like the following `run_server` function:

In [2]:
import socket
import threading
import time
from typing import Callable

# Let's implement a simple application that just handles messages by printing them
TIME_TO_RUN = 30.0

class MyServerLogic:
    def __init__(self):
        self.t_start = None
    def on_message(self, addr, msg):
        print("Server: Received chunk",msg,"from",addr,flush=True)
    def done(self):   # run for 30s
        if self.t_start is None:
            self.t_start = time.time()
        return time.time() - self.t_start > TIME_TO_RUN
    def reset(self):
        self.t_start = None

# The rest is communication code

def run_server(port : int, client_hander : Callable, done : Callable):
    """Boilerplate code.  client_handler is a function f(socket) that handles 
    all communication with the client.  done() returns True if the server should
    stop. 
    
    (for a clean shutdown, the client handler should also return if done() is True)"""
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('localhost', port))                       #accepts connections only from localhost
    #serversocket.bind((socket.gethostname(), port))    #use gethostname if you want to expose your server to the outside world
    # become a server socket
    serversocket.listen(1)                              #allow at most 1 client connection
    while not done():
        # accept connections from outside -- this call blocks
        (clientsocket, address) = serversocket.accept()
        print("Server: Accepted client from address",address,flush=True)
        # now start a thread to handle communication with the clientsocket
        ct = threading.Thread(target=client_hander,args=(clientsocket,))
        ct.daemon = True
        ct.run()
    serversocket.close()

PORT = 5678
server_logic = MyServerLogic()
server_comm_error = False
    
def handle_server_communication(clientsocket : socket.socket):
    global server_comm_error
    addr = clientsocket.getpeername()
    while not server_comm_error and not server_logic.done():
        N = 1000
        chunk = clientsocket.recv(N)  #this will need to be modified to properly handle framing
        if len(chunk)==0:
            print("Server: Client socket broken",flush=True)
            server_comm_error = True
            break
        else:
            server_logic.on_message(addr, chunk.decode('utf8'))
    clientsocket.close()

def server_reset():
    global server_comm_error
    server_logic.reset()
    server_comm_error = False

def server_start():
    """Handles all of the server running."""
    server_reset()
    run_server(PORT,handle_server_communication, lambda : server_logic.done() or server_comm_error)


And the boilerplate code for running a client looks like the following:

In [1]:
import socket
import time
from typing import Callable

# Let's implement a simple application that samples strings
import random
TIME_TO_RUN = 30.0
ANIMAL_STRINGS = ['dog','cat','alligator','horse']

class MyClientLogic:
    def __init__(self):
        self.t_start = None
    def rate(self):
        return 1.0   #runs at 1hz
    def update(self) -> str:
        if self.t_start is None:
            self.t_start = time.time()
        return random.choice(ANIMAL_STRINGS)
    def done(self):   # run for 30s
        if self.t_start is None:
            self.t_start = time.time()
        return time.time() - self.t_start > TIME_TO_RUN
    def reset(self):
        self.t_start = None


# The rest is communication code

def run_client(port : int, comm_handler : Callable):
    """Boilerplate code.  comm_handler is a function f(socket) that handles 
    the communication loop. 
    """
    print("Client: Connecting to port",port,flush=True)
    clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    clientsocket.connect(('localhost',port))
    comm_handler(clientsocket)
    clientsocket.close()

PORT = 5678
client_logic = MyClientLogic()

def handle_client_communication(clientsocket : socket.socket):
    client_logic.reset()
    client_comm_error = False
    dt = 1.0/client_logic.rate()
    while not client_logic.done() and not client_comm_error:
        t0 = time.time()
        msg = client_logic.update()
        if msg is not None:
            print("Client: Sending",msg,flush=True)
            try:
                clientsocket.send(msg.encode('utf8'))
            except Exception as e:
                print("Client: socket broken",flush=True)
                client_comm_error = True 
        t1 = time.time()
        time.sleep(max(dt - (t1-t0),0))

def client_start():
    """Handles all of the client running."""
    run_client(PORT,handle_client_communication)

You can copy the above cells into separate files, and call `server_start()` and `client_start()` at the bottom.  This is already done for you in `rsbook_code/integration/tcp_server_example.py` and `rsbook_code/integration/tcp_client_example.py`.  

If you run the server example in one terminal window and the client example in another terminal window, you will get server output like the following:

```
Server: Accepted client from address ('127.0.0.1', 63871)
Server: Received chunk horse
Server: Received chunk alligator
Server: Received chunk horse
Server: Received chunk cat
...
```
after which the server exits after 30 seconds.  You will get client output like the following:

```
Client: Connecting to port 5678
Client: Sending horse
Client: Sending alligator
Client: Sending horse
Client: Sending cat
...
Client: socket broken
```

Observe that the termination of the client occurs when the server closes its socket (normally) and then the client gets an exception when it tries to write to it.

Although in this example, every message sent from the client was received exactly as a single "chunk" on the server, this will not always be the case.  Depending on the timing of messages and the network implementation, the network read queue may group together the data from multiple send calls, so that the server may receive multiple messages together (e.g., `horsealligator`) or partial messages (e.g., `allig`).   This is a common occurrence in serial port protocols, but this is unlikely to happen with small messages sent slowly across a TCP/IP or UDP network, because each message will typically be transmitted in a single *packet* along the network.  However, it will occur with large messages sent rapidly.  To solve this problem and guarantee that the reader processes full messages, a protocol will need to use some [framing](#framing) strategy.  Low-level message queuing libraries, like ZeroMQ, will often provide you with framing functionality while also providing socket-like granularity.

This example should also give you a sense that using raw socket streams to connect to multiple nodes is a rather complex procedure.  Here, the `accept` call and each `recv` call were blocking, and the server has to create a new thread to allow for that blocking.  If the server also had to send messages to other nodes, it would need to create a thread to process those messages, or take care to configure the server and client sockets to work in non-blocking mode.  Python's [`socketserver`](https://docs.python.org/3/library/socketserver.html) library provides some default implementations of such servers but, in practice, most roboticists use pre-existing communication libraries when available, and resort to low-level socket code only when absolutely necessary.  

#### Publish-subscribe paradigm

Pub-sub is quite a common paradigm in robotics due to its popularization through ROS and other packages like LCM.  The general idea is for the publisher endpoint to write to a channel and the subscriber endpoint will receive notifications whenever a message is written.  The pub-sub paradigm is message-based, which means that the message objects sent by the publisher are reconstructed in their entirety by the subscriber.  Also, a single publisher can have multiple subscribers, so that each message from the publisher gets sent to all subscribers.

Pub-sub systems usually have a notion of *topics* (sometimes called *keys*) that indicate a system-wide address for exchanging messages of a certain type.  This provides a convenient way to name channels in a *semantically meaningful* way, rather than making references to IP addresses.  This *semantic naming* approach lets us provide names to channels that describe *what information they contain* rather than a meaningless address.  This also makes it easier to test different node implementations -- as long as the a program uses the same topics as the old one, it can be used as a drop-in replacement! 

**Pub-sub pseudocode.** Pseudocode for a publisher would look like the following, where `publisher(topic)` is the function provided by the system to generate a publishing channel, `publish(channel,msg)` publishes a message on the given topic, and `generate_foo` is some application-specific logic.  

```python
foo_pub = publisher('/some/node/foo')
while not done():
    msg = generate_foo()
    publish(foo_pub,msg)
    pubsub_system_update()
    sleep(dt)
```

Behind the scenes, the communication system would need to perform some work each cycle, which we have assumed to be given by the function `pubsub_system_update`.  

The corresponding subscriber would look like the following:

```python
def handle_foo(msg):
    ... # do something when a message were to arrive

subscribe('/some/node/foo',handle_foo)
while not done():
    pubsub_system_update()
    sleep(dt)
```

Here `subscribe(topic,callback)` would register `callback(msg)` as the callback function to be called whenever a message were to arrove on the topic.    Note that the execution loop is almost empty, and any application logic that we would need to perform would need to be placed in the `handle_foo` callback.  What does this remind you of?  That's right, an event-driven execution loop.

Note that if you prefer a polling-style loop, it's a fairly simple matter to just store the last message, like so:

```python

def handle_foo(msg):   #the real message handler
    ... # do something when a message were to arrive

last_foo_msg = None
new_foo_msg = False
def handle_foo_cb(msg):   # a wrapper callback passed to the subscriber
    global last_foo_msg,new_foo_msg
    last_foo_msg = msg
    new_foo_msg = True

subscribe('/some/topic',handle_foo_cb)
while not done():    #we now perform polling the execution loop
    if new_foo_msg:  #polling!  Here, this triggers `handle_foo` only if a new message has arrived
        handle_foo(last_foo_msg)  #handle it for real
        new_foo_msg = False
    #if last_foo_msg is not None:   #this is an alternative that triggers `handle_foo` every cycle
    #   handle_foo(last_foo_sg)
    pubsub_system_update()
    sleep(dt)
```

See that the actual callback passed to the subscriber only stores the last message, while the execution of the handler happens in the main loop.  ROS programs abound with code like this!

**ROS 1 example.** Now, let's see what a basic publisher and subscriber would look like in ROS 1 Python code.  We will go through a few characteristics of the ROS system.  The first is that we will need to import the ROS Python bindings `rospy` and its message types.

In [None]:
import rospy
from std_msgs import String   # the ROS message type corresponding to strings

# Same logic as the client before

TIME_TO_RUN = 30.0
ANIMAL_STRINGS = ['dog','cat','alligator','horse']

class MyPubLogic:
    def __init__(self):
        self.t_start = None
    def rate(self):
        return 1.0   #runs at 1hz
    def update(self) -> str:
        if self.t_start is None:
            self.t_start = time.time()
        return random.choice(ANIMAL_STRINGS)
    def done(self):   # run for 30s
        if self.t_start is None:
            self.t_start = time.time()
        return time.time() - self.t_start > TIME_TO_RUN
    def reset(self):
        self.t_start = None

# Communication code follows

pub_logic = MyPubLogic()
rospy.init_node('animal_publisher')
rate = rospy.Rate(pub_logic.rate())
my_topic_pub = rospy.Publisher('/animal_generator', String)
while not rospy.is_shutdown():
      msg = String(pub_logic.update())   #Note: String is a ROS message type and you may pass a str to the constructor
      my_topic_pub.publish(msg)
      rate.sleep()
      if pub_logic.done():
          rospy.signal_shutdown("Program decided to stop")


...and now the subscriber...

In [None]:
import rospy
from std_msgs import String   # the ROS message type corresponding to strings

# Same logic as the server before
TIME_TO_RUN = 30.0

class MySubLogic:
    def __init__(self):
        self.t_start = None
    def on_message(self, addr, msg):
        print("Server: Received message",msg,"from",addr,flush=True)
    def done(self):   # run for 30s
        if self.t_start is None:
            self.t_start = time.time()
        return time.time() - self.t_start > TIME_TO_RUN
    def reset(self):
        self.t_start = None

# Communication code follows

sub_logic = MySubLogic()
rospy.init_node('animal_subscriber')
rate = rospy.Rate(pub_logic.rate())

def process_message(msg : String):
    #Note: String is a ROS message type and contains a single attribute `data` of str type 
    sub_logic.on_message('/animal_generator',msg.data)

rospy.Subscriber('/animal_generator', String, process_message, queue_size=10)
while not rospy.is_shutdown():
      rate.sleep()
      if sub_logic.done():
          rospy.signal_shutdown("Program decided to stop")

If you have ROS on your system, you can run this code in three terminal windows.  In each window, you will need to `source` the `/opt/ros/[ROS_VERSION]/setup.bash` according to your installation, if you haven't already set this up in your `.bashrc` file.  Then, run `roscore` in one of the terminal windows, and run `rsbook_code/integration/ros_pub_example.py` in a second window, and run `rsbook_code/integration/ros_sub_examples.py` in the third window. 

Now let's see a few things that differ between this and our pseudocode.  First, we had to run some code to set up the ROS node.  No big deal.  Second, we run the loop using a `rospy.Rate` object.  This isn't strictly necessary but it's best practice to do so (in advanced usage, you might be stepping through a simulation, and ROS time might not be the same as system time).  Third, we didn't have any calls to `pubsub_system_update`.  What gives?  Well, in `rospy.init_node` ROS actually spins up a thread to perform all of its internal updating.  This is why we need to explicitly call `rospy.signal_shutdown` to stop the node ourselves -- otherwise you would need to manually press Ctrl+C to kill the node when you are done.  Fourth, we needed to specify several arguments when creating the publishing and subscribing channels. 

To construct a `Publisher`, you specify the topic and the message type (in this case String).  In the [documentation](http://docs.ros.org/en/melodic/api/rospy/html/rospy.topics.Publisher-class.html) there are some other arguments available, but the most useful one is `latch`.  When `latch=True`, each subscriber will receive the last sent message when it first connects. This is especially useful for topics that report some aspect of system state and are infrequently published.  Note that the `Publisher` class also lets you query whether a subscriber is connected using `Publisher.get_num_connections()`, which lets you perform more advanced functionality like avoiding eating up CPU when nothing has subscribed, or error handling whe subscribers disconnect.

To construct a `Subscriber`, you specify the topic, message type (which must also match the publisher's message type, in this case String), a callback function, and some [optional arguments](http://docs.ros.org/en/melodic/api/rospy/html/rospy.topics.Subscriber-class.html).  We showed one argument here, `queue_size`, which is arguably the most important to understand.  This is usually set to 1 if you want to always process the most up-to-date message from the publisher, and something relatively large if you want to process all messages published in as close to in-order as possible.  Note that the docs indicate that you can set this to `None` which indicates an infinite queue size. I never recommend infinite queues, and we will discuss the nuances of [message queuing below](#message-queues).

**Bidirectional communication**. Now, if you are used to sockets which let you send or receive bidirectionally, you might ask the question, what if I need to communicate from the subscriber back to the publisher?  For example, I might like to tell the publisher that a message was successfully received, to change the resolution of images that it's sending, or to slow down its rate of sending because my processing can't keep up.  Well, the only way to do that is to establish a separate channel in which the publisher and subscriber roles are flipped.  In other words, given node A that is publishing to node B on topic X, we would like for B to provide some kind of responses to A.  So, we create a new topic Y in which B is the publisher and A is the subscriber.  Ideally, Y will be named appropriately so it becomes easy to associate X to Y (such as Y=X_response).

One potential pitfall here is synchronization.  If A is publishing messages quickly or B is processing messages slowly, we cannot guarantee that A will receive a response immediately after a message is sent.  For example, suppose A publishes messages at 20Hz and B acknowledges whether the message is received correctly.  Node A should resend a message if it was not received correctly.  Suppose A just published message #103, and B takes 0.15s to perform some processing before it sends an error response.  In this case, A will have already published message #104 before it receives the response from B.  So, if A implements naïve logic for determining which message should be resent, it will resend message #104!  To remedy this problem, you would need to either keep track of which messages have been sent and are awaiting responses (a sent message queue), or to include an ID for each published message which should be returned in B's response.  These schemes can become fairly complex, so message-level synchronization is not well-suited for publish-subscribe.  It is better to use request-reply or RPC paradigms for this use case.

**Brokers and roscore**.  Pub-sub as typically implemented doesn't ask the application developer to establish server and client roles.  Instead, communication happens in a peer-to-peer manner, where the server is a separate *broker* node and the clients are the publisher and the subcriber.  When the publisher or subscriber connect to the broker, the broker tells each of the publisher and subscriber how to find each other.  (Under the hood, each peer-to-peer condition will be set up with server and client roles for the underlying sockets, but the application developer doesn't have to think about the IP addresses of different nodes.)

Every ROS session involve starting `roscore` on your master machine, which allows ROS to connect subscribers to publishers and vice versa.  Subscribers and publishers find the broker by following the IP address given by `ROS_MASTER_URI`, and then ask the master for information about a given topic.  Through some exchanges of information, the broker tells the subscriber and publisher the IP address of the appropriate endpoint.  If you forget to run `roscore` or `ROS_MASTER_URI` isn't set properly to an IP address that's accessible on the local network, this whole operation falls apart.  

The information that is sent from the broker to the publisher and subscriber includes the IP address and port of the other endpoint.  Note that if either of the publisher or subscriber cannot reach each other's address on the local network, such as via a firewall configuration, the connection can also fail to be established.

**Computation graph**. As a system grows more complex there will be many nodes publishing and subscribing on many topics, and it can be difficult to debug issues that might arise from topic naming mistakes, message typing, and node shutdowns and error conditions.  Typical ROS systems will have dozens of nodes and hundreds of topics running simultaneously. It is, in general, a very good idea to document all topics and nodes being used in your system, and companies with strong software engineering teams will provide robust and easily accessible documentation.  Sadly, the status-quo is that documentation is left in a sorry state and it is extremely common to be delivered a robot with minimal documentation. In this case, you will need to do some forensic software engineering to discover how to operate and build on top of a provided system.

In order to do so, you will make frequent use of the `rostopic` and `rqtgraph` tools, which provide topic lists, information, and computation graph visualizations.  The computation graph connects computational nodes with directed edges corresponding to topics.  There are several types of errors that can be diagnosed in this fashion:
- Broken channel. Either a subscriber to a topic without a publisher, or a publisher to a topic without subscribers.  Often caused by topic naming mistakes.
- Multiple publishers for a single topic.  For example, during testing you may have a simulated camera publishing images to the same topic that a real camera is publishing to.  If you forget to turn one off, your subscribers will get a mix of simulated and real images.
- Terminated nodes. If some obscure process in a large launch file terminates due to an error condition, it can be extremely hard to find the error.  This is especially so if the implementer just printed out the error and quit rather than using [logging best practices](http://wiki.ros.org/rospy/Overview/Logging) that would let you [inspect error conditions](http://wiki.ros.org/rqt_console).
- Node naming conflicts.  Most common in lazy copying of boilerplate code, or initial attempts at implementing a "[worker node](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool)" paradigm common in parallel / multiprocessing applications.  ROS doesn't natively support such paradigms, so you can't just launch a program K times to subscribe to every K'th message for processing.  First of all, the node name provided to `rospy.init_node` must be unique.  Second, every subscriber to a topic gets every message published to the topic.  If you want to use ROS for such a use case, you'd need to initialize each node with a unique node name (like `worker_[i]` where `i` is the index of the worker) and write the publisher to divide its messages up and publish to K separate topics.

#### Remote procedure call

RPC is an extremely common paradigm that corresponds very well with our mental models of using functions implemented in external libraries.  To make a library API call, we import the library, then call the library function with appropriate arguments.  In basic RPC, the process is exactly the same: we connect to the server, then call the server's function with appropriate arguments. 

**Pseudocode**. First, let's look at pseudocode for the client, in which `rpc_connect` returns a "proxy" of the server, which acts like a standard Python object:

```python
server=rpc_client_connect(server_address)
print("The server advertises",server.list_methods())   #an RPC API may have such a function (this is called introspection); if so this will print ["foo"]
res = server.foo(3,['bar','baz'])  #foo is a function implemented in the server's API.  This is a blocking call.
print("The server returned",res)   #will print ['bar','baz','bar','baz','bar','baz']
```

The server code is a bit more complex, in that the server needs to be initialized, then it methods will be added (advertised), and finally the execution loop will begin and process RPC calls:

```python
def foo(n_times,string_list):
    return string_list*n_times

rpc_server_init()                         #tells the RPC system that this process is a server
rpc_server_advertise('foo',foo)           #adds a method to the server's API 
while not done():                         #runs the server until done() is performed
    rpc_server_process()                  #handle any pending calls  
```

**XML-RPC example**.  Let's now move on to some real code.  Instead of [ROS Services](http://wiki.ros.org/rospy/Overview/Services) or [Google's gRPC](https://grpc.io) which require an annoyingly long list of steps to define interface files, compile them, transfer them across package boundaries, etc., we will use XML-RPC, which has handy existing bindings in the standard Python libraries.

First, the very straightforward client code:

In [None]:
from xmlrpc.client import ServerProxy

PORT = 5678
server_connection = ServerProxy('http://127.0.0.1:'+str(PORT))

print("The server advertises:",server_connection.system.listMethods())

res = server_connection.foo(3,['bar','baz'])
print("The server returned",res)

print("Trying an incorrect call!")
res = server_connection.foo(3.5,['bar','baz'])
print("The server returned",res)

...and then the server code...

In [None]:
from xmlrpc.server import SimpleXMLRPCServer
import threading
import time

def foo(n_times,string_list):
    """Replicates a string list n_times"""
    if not isinstance(n_times,int):
        raise ValueError("Invalid n_times parameter")
    if not isinstance(string_list,list):
        raise ValueError("Invalid string_list parameter")
    if not all(isinstance(s,str) for s in string_list):
        raise ValueError("Invalid string_list contents")
    return string_list*n_times

TIME_TO_RUN = 30.0
start_time = None
def done():
    global start_time
    if start_time is None:
        start_time = time.time()
    return time.time()-start_time > TIME_TO_RUN

PORT = 5678
server = SimpleXMLRPCServer(('127.0.0.1',PORT))
server.register_function(foo,"foo")   #advertise "foo"
server.register_introspection_functions()  #need this for listMethods to work

#the SimpleXMLRPCServer event loop is rather complex, so instead,
#the "done" checking is run in a separate thread.  If you don't want to
#kill the server with an internal signal, you can eliminate all this
#done_checker logic.
def check_for_done(server):
    while True:
        if done():
            print("Stopping server")
            server.shutdown()
            break
        time.sleep(0.1)

done_checker = threading.Thread(target=check_for_done,args=(server,))
done_checker.daemon = True   #this will kill the thread if the main thread is killed
done_checker.start()

print("Starting XML-RPC server on localhost:"+str(PORT))
server.serve_forever()
done_checker.join()

Observe that our pseudocode was followed moderately well, except that we added some more error checking in `foo`, and rather than performing polling we run the `SimpleXMLRPCServer.serve_forever` method.  Like its name, it runs forever and blocks all the remaining code... which is OK if we want our server to run until it is manually terminated. 

If you ever want to stop the server, you must call the `SimpleXMLRPCServer.shutdown()` method... but how can this be done if `serve_forever` never exits?  There's two ways to do this: either call `server.shutdown()` within an advertised function and have the client terminate the server (external termination), or create a separate thread that monitors the done condition and shuts down the server (internal termination).  We choose the latter strategy here.  Note that the thread has to be created and started after the server is created but before `serve_forever` is called.  


Now, everything works beautifully in the ideal case, but many things can go wrong:
1. The client might not be able to connect to the server.
2. The client may have called a method that doesn't exist.
3. The client may not have specified valid arguments.
4. The server may have shut down.
5. The server may encounter an exception when calling the indicated method.
6. The server may be overloaded (like in a denial-of-service attack).
7. Blocking the client may take too long, either due to a long round-trip time over the network or the method triggering a long-running process on the server.

For errors 1-5, the client's connection to the server can either throw an exception when `foo` is called or return some error condition in `res`.  Robust server implementations can also determine overload conditions (error 6) and begin denying requests.  However, the problem of blocking (error 7) still remains.

Many synchronous RPC systems like ROS services offer a *timeout* keyword to avoid stalling the client too long.  If a response from the server is not received within the timeout period, then an error will be returned.  However, this is still an unsatisfactory solution for high-performance communication because it still blocks the main thread.  To fully address this issue, it is necessary to work with *asynchronous* RPC systems.

**Asynchronous RPC**.  Asynchronous RPC systems lets our client perform non-blocking RPC calls, but it will need to be written in a much more sophisticated manner. Specifically, our client must remember which requests were performed and process replies appropriately when they occur, whether they are normal replies or error conditions. 

One option, exhibited most notably by popular Javascript frameworks like JQuery, is to pass callbacks into RPC calls that handle normal and error responses. An  underlying execution framework keep track of all outstanding calls and will trigger the appropriate callbacks when the response is processed.  This is an instance of an event-based execution paradigm, and for simple program logic such as displaying values or passing RPC responses to other programs, adapting a program to this model is fairly straightforward.  A word of caution, however: once you start using asynchronous RPC for more complex program logic, you will often need to drastically reshape your program in complex ways, battling the "callback spaghetti" phenomenon all along the way.

Another common mechanism to implement asynchronous RPC is that of a [*promise*](https://en.wikipedia.org/wiki/Futures_and_promises) (aka "future" or "deferred").  A promise is a temporary object that is returned immediately from an RPC call and acts as a stand-in for an eventual result.  Promises support some combination of our three major real-time programming paradigms: 1. blocking until the result is returned, 2. polling for a returned result, and/or 3. event-based callbacks that are invoked when the result is returned.  As an example, the XML-RPC mechanism in the [Twisted framework](https://docs.twisted.org/en/twisted-18.7.0/web/howto/xmlrpc.html) provides a promise-style object called a [Deferred](https://docs.twisted.org/en/twisted-18.9.0/core/howto/defer.html) that uses callbacks for successful returns and error conditions. 


### Communication characteristics and error handling

Now that we've gone through some of the interfaces that communication systems provide at the application programming level, let's start peeking a bit more under the hood at how communication is actually implemented.  A deeper understanding is crucial if you wish to successfully implement high-performance systems: at the limits of processing speeds, communication speeds, or with unreliable networks like WiFi, cellular, and radio, the basic abstraction of "message goes in, message comes out" starts to break down.  

#### Message queuing 

Even assuming in the idea case where communication is perfectly reliable and instantaneous, there's a major pitfall to be aware of when concurrently running multiple communicating programs.  Nearly all communication protocols are asynchronous, which means that the receiver and sender of messages do not need to perform specific steps in a fixed temporal order.  A message can be written to the communication channel while a prior message is being received.  Or, a message can be processed by an application while the sender is generating more messages.  This in general is a very good thing, because due to the nature of the Internet and other networks it is difficult to ensure reliable and lock-step execution amongst multiple concurrent processes.

The underlying mechanism for handling concurrent messages being sent and received is that of a *message queue*.  Each communicator, whether using a stream, pub-sub, or RPC paradigm, will include a queue of pending messages.  A read queue will store a sequence of messages to be read by the application, while a write queue will store a sequence of messages that the application has written but has not yet been written to the network.  We don't usually need to fret much about write queues, since message delivery is usually handled automatically by the operating system, but the read queue is important to consider in our design.  

TODO: message queue figure

The key issue is that we cannot control the rate at which our application receives messages.  So, the *fast sender problem*  (or the *slow receiver problem*) arises when the sender sends messages faster than our application can handle them.  Here, the read queue starts to grow, and if our application doesn't get some relief, the queue can grow and grow!  This starts eating up memory, and our application starts processing older and older messages.  The most common symptom of this problem in robot systems is a "laggy" behavior in which the robot seems to be responding to old inputs.

There are a few strategies for addressing this problem. The first is good old-fashioned manual tuning of execution loop rates, processing speeds, or dropping every N'th message.  The second, which is implemented in ROS, is to define a maximum queue size and begin dropping messages when the queue size is exceeded.  Note that ROS drops the oldest message first.   The maximum queue size could be set to several meaningful values:
- 1: makes the most sense for "streaming" communication in which your application only wants to respond to the most recent message.
- 2-10: this might make sense if your communication network or processing demand is "bursty".  If several messages arrive quickly, then you would expect relief within a few processing cycles.  Examples may include user input events.
- Large ($\geq$10): this is recommended by the ROS documentation, but it's only a "fingers crossed" backstop to prevent your program from crashing due to high memory usage.  If you ever hit this queue size, something is going horribly wrong anyway and your robot's performance is likely to have drastically declined by then. 
Note that queue size limit strategy silently discards messages, and because your program isn't crashing, it can give you a false sense of confidence.  However, it makes fast-sender bugs hard to find, and just delays correcting the behavior.  (I would suggest monitoring the queue size in the application, but ROS does not make this easy.)

More advanced communication systems, such as streaming video services, adopt more sophisticated strategies for handling the fast sender problem and achieve a desired *quality of service* (QoS) of transmitted data regardless of network speeds and processing loads.  Some key strategies for achieving QoS include *health monitoring* and *backpressure*.  In health monitoring, the sender periodically probes that the receiver acknowledge receipt of a special health check (or heartbeat) message with a small response indicating that the receiver is ready to receive data.  Messages will be withheld until the receiver indicates that it is in a healthy state.  In backpressure, the receiver will send messages back to the sender if it is having trouble processing the data at the current rate. If the sender gets such a report, then it will lower the rate of message sending or degrade data quality (e.g., send a low-resolution or highly-compressed image). A QoS strategy will define for the sending application how the data should be degraded to meet the needs of the receiver.

TODO: backpressure figure


#### Latency, throughput, and packet loss

Let's delve into the network stack to understand a bit more about communication network performance.  The three key metrics that we should be concerned about are *latency*, *throughput*, and *packet loss*.  Latency measures how quickly each message is transmitted from the sender to the receiver.  Throughput measures the quantity of data (in terms of messages or bytes) transmitted in a given unit of time (usually seconds).  The maximum possible throughput of a network is known as *bandwidth*, and this may differ from the actual throughput due to network congestion, unreliable delivery, and other factors.  Finally, packet loss measures what fraction of message units (packets) are dropped.  Packet loss will incur dropped or incomplete messages, or delays in re-delivery. Fully reliable networks will have no packet loss.

(Note that we differentiate between full messages, which are defined at the application level, and packets, which are message units defined at the network level.  Small messages can be delivered in single packets, whereas large messages may need to be split into multiple packets.)

#### Delivery paradigm

In unreliable networks, packets will not necessary be delivered between network nodes successfully or in exactly the order they were sent. This can be extremely frustrating to remedy at the application level, so a network layer may implement alternative delivery paradigms.  In particular, TCP (Transmission Control Protocol) guarantees *in-order* and *reliable* delivery.  In-order delivery is the property that each packet (and hence every message) processed by the receiver was sent after the previously processed packet.  Reliable delivery is the property that every packet (and hence message) sent is processed by the receiver.  To ensure these properties, TCP requires that the receiving network node acknowledges (ACKs) the delivery of each packet.  If an ACK was not received, the sender will repeatedly send the packet until it receives an ACK.

In contrast to TCP, UDP (Unreliable Datagram Protocol) does not implement either of these paradigms.  The sender does not request or expect any ACK of its sent packets.  Messages may be received partially, out of order, or not at all.  This would appear to be a disadvantage, but one advantage of UDP over TCP is its speed, as the sender does not need to wait for the receiver to acknowledge message receipt and will never retry sending old messages.

It is also worth mentioning a counterintuitive "optimization" implemented in TCP.  If TCP is asked to perform many small writes, the strategy of immediately sending each write in a packet would slow down throughput significantly because 1) each packet incurs some overhead, which could be significantly more than the actual message size, and 2) TCP awaits the ACK of a packet before sending the next.  Instead, [Nagle's algorithm](https://en.wikipedia.org/wiki/Nagle%27s_algorithm) implements a buffer that fills up over subsequent writes until either enough bytes are accumulated or the prior packet is acknowledged.  This can inhibit real-time low-latency communication, and can also interact badly with the "delayed ACK" strategy implemented in TCP.  Delayed ACK bundles write ACKs with reads, but if no read is performed, an ACK is sent after some significant delay time (many hundreds of ms).  With Nagle's algorithm and certain communication patterns, this can lead to huge communication delays.  A typical approach in robotics to prioritize low latency is to immediately disable Nagle's algorithm on TCP sockets using the TCP_NODELAY flag. 

#### Disconnections

Orderly (closed by other endpoint) vs disorderly (crash)

Keepalive

TODO




### Serialization

Getting further into the details of the network stack, the transport layer is responsible for converting application-level messages into network-level signals that will be sent over the communication interface.  A key step in this layer is *serialization*, which is the process of encoding data structures in our desired programming language into a sequence of bytes.  This is also the process used to save objects in our language to disk.  The reverse process, which is performed in a receiver or when loading objects from disk, is called *deserialization*.  (Usually when we speak of serialization we also refer to deserialization as well.)

TODO: figure

Although objects are stored in our computer's RAM as bytes, it is not always straightforward to reconstruct the object from the same bytes. Complex data structures will contain pointers that store memory addresses that are only meaningful in the currently-running program.  High-level languages like Python will also store additional data in memory, like IDs and reference counts, that are also not meaningful and can even cause errors if transferred directly to another program.  Finally, different programming languages or different CPU architectures can store objects with different memory-level encodings, so serialization is also tasked with the job of achieving cross-language and cross-architecture translation.

The major top-level design choice in a serialization system is whether to use *typed* or *untyped* serialization.  An untyped system will be able to serialize / deserialize a wide diversity of objects, whereas typed serialization system will only be able to serialize objects of certain defined types.  Untyped serialization methods include Python's `pickle` module, JSON, XML, and YAML, which can represent a large range of objects including nested data structures.  The key advantage of untyped serialization is that they are very convenient for sending arbitrary and unstructured objects.  However, they do have some drawbacks.  First, they can be *verbose*, which means that there is an excessive overhead of bytes needed to encode objects so they can be properly decoded.  XML is particularly guilty of verbosity.  Second, these are only able to perform limited error checking.  If the sender sends an improper data structure, e.g., sending a string rather than an integer to an RPC call, then the receiver will need to perform its own checking at the application level.  

Typed serialization systems overcome these limitations, allowing compact byte-level encoding and enabling type checks for error handling.  Such systems provide a mechanism for specifying an object type, usually an *Interface Definition Language (IDL)* file, which allows the system to construct serialization / deserialization functions for objects of that type.  The IDL approach, notably used by ROS, LCM, Google Protocol Buffers, and Apache Thrift, defines a type using an auxiliary file that specifies a type, using notation like a C++ or Java-like class but usually with IDL-specific keywords.  An *IDL preprocessor* will then compile the IDL file into a language-specific class that can be imported into the application program.  Objects of this class will then have serialization methods and can be passed directly to the provided communication system.  A key feature of most packages is cross-language compilation so that a single IDL file can be converted to Python, C++, etc. such that serialized objects are byte-compatible.  Type checking is handled on the client side in that any improperly typed message will be caught in compilation or as a run-time error.

TODO: IDL example figure

Three disadvantages of typed serialization include
1) Complexity in learning a new IDL syntax, 
2) Distributing changes in IDLs across all nodes in a system and recompiling is challenging, requiring complex multi-step build procedures and mechanisms for keeping track of versioning, and
3) The need to convert between language-native objects and IDL compiled objects, which is typically easier than manual serialization but can be just as tedious. 
As an example of the second disadvantage, ROS has an "easy" 8-step build system for defining new message classes.  As an example of the third disadvantage, ROS developers frantically search the internet to find how to convert ROS message types into familiar forms.  For example, how do we get a point cloud as a [Numpy array](https://answers.ros.org/question/230680/extracting-the-xyz-coordinates-from-pointcloud2-data-in-python/)?  Should you use that solution or the [`numpy_msgs` alternative](https://wiki.ros.org/rospy_tutorials/Tutorials/numpy)?  There's no authoritative guide for best practices.... Be prepared to write lots of wrappers!

It should be noted that the error handling afforded by typed serialization is limited to finding data typing errors. They do not determine whether values are in appropriate ranges, arrays are of an appropriate size, etc.  An application must detect these errors and respond appropriately, e.g., by sending an error message.  One approach for semi-automatic error checking, most comonly used in XML and JSON formats common in web programming, is to define an auxiliary *schema* with which an object can be automatically checked (*schema validation*).  Schemas are data structures that specify conditions for valid object values, and are often implemented in the target object type (XML-Schema and JSON-Schema).  Schema validation is also often used to check human inputs to web forms.   Note that validation incurs an overhead, cannot check complex validation conditions that relate the values of several attributes, and requires learning the schema definition structure.  


### Framing

The last issue that we will discuss is fairly low-level in the transport stack, and you will not need to use this unless you are implementing your own byte-level protocol (like a serial protocol for an Arduino).  The issue of *framing* is that the channel sends and receives an "infinite" byte string that is not explicitly split into messages, so some mechanism is needed to tell a reader that one message ends and the next begins.  Three framing strategies are common:
- *Delimiter*. Each byte string is terminated with a special delimiter character, such as an endline. Note that the message representation must not contain the delimiter.  If a message might contain the delimiter character, then messages must be *escaped* with special encodings.  You are likely to have seen this before, such as the endline character being written as `\n` and all forward slashes being written as `\\` in Python strings. 
- *Length prefix*.  The sender prepends a length to the message's byte representation.  The receiver first parses the length and then reads the specified number of bytes as the message. The message byte string length must not exceed the maximum integer that can be encoded in the length prefix, e.g., 255 for a 1 byte prefix, 65535 for a 2 byte prefix, etc.
- *Streaming serialization*. Some serializers can automatically determine the end of an object as bytes come in via streaming, such as a JSON opening brace `{` needing to be partnered with a closing brace `}`. Note that built-in JSON parsers in most languages do not support streaming mode.


### Setting up networks for robots

TODO

## System development tools

System integration middleware can offer much more functionality than just facilitating inter-process communication: they can also offer many tools to help with robot system development.  We survey many of these here.

### Communication inspection

When many processes are running and communicating simultaneously, especially when you are working with external packages, it can be very challenging to figure out what channels are being used to send or receive data by what processes.  The *computation graph* is a helpful concept to understand and visualize how a system works. We will discuss a bit more about these [in the next chapter](SystemsEngineering.ipynb#computation-graphs) but here we describe commonly used tools.

The most well-known computation graph tool in robotics is [`rqtgraph`](http://wiki.ros.org/rqt_graph), which shows the processes (ROS nodes) and communication channels (ROS topics) currently running on your system. This tool helps debug typing naming errors, node and topic naming conflicts, and crashed nodes.

Other communication packages also provide [statistical analysis tools](https://www.rabbitmq.com/docs/prometheus) that help you understand the health of the computation graph.  These tend to be used more for cloud workflows where the system is meant to handle a large volume of user queries that place varying load on the system.  In ROS, you typically run `rostopic hz`, `rostopic bw`, and/or `rostopic info` to inspect how well topics are running.  `rqtgraph` can also be [launched in a manner](http://wiki.ros.org/rqt_graph#Topic_statistics) that includes some basic statistics information in the visualization.

### Parameters

We're ashamed to admit it, but we all hard-code things from time to time.  This is definitely not best practice for system integration, because it forces the person debugging your system performance — whether this is another person on your team, a user of your software libraries, or even yourself several months in the future — to hunt deep into your code to find the parameters that affect performance.  So, it is generally a good idea to keep any parameters that affect system-wide performance *in places that are easily accessible* system-wide.  There are a few non-horrendous options for doing this:

- *At the top of code files*, e.g., `SOME_PARAMETER=True`.  Embedding the parameter in the same file that it is used makes it clear what is being modified with this parameter, and keeps the overhead of organization low.  However, you will need to clearly document what this parameter does with a descriptive name and comment, as well as making users aware of it in a README file.  This is also disadvantageous for compiled programs, since any change to a parameter will need to trigger a recompilation step.  Similarly, reinstallation is needed for build systems that require package installation (such as ROS packages).
- *Configuration code files*, e.g., `mypackage/config.py` or `mypackage/include/settings.h`.  These advertise to your package's users where the settings are, if they were to browse your source code.  Disadvantageous for compiled programs, since any change to a parameter will need to trigger a recompilation step. Similarly, reinstallation is needed for build systems that require package installation (such as ROS packages).
- *Configuration files*, e.g., `myprogram_config.yaml` or `myprogram_config.xml`. Putting settings in separate files allows for parameter modification without triggering recompilation.  However, your program needs to be pointed to the correct location of the file, and there is a bit of coding and runtime overhead in parsing settings from the file.
- *Launch files*, e.g., [`roslaunch`](http://wiki.ros.org/roslaunch/XML/param).  Like configuration files, this allows for parameter modification without triggering recompilation.  This automatically chooses the right settings file for your program.  Also, when using `roslaunch`, parameters set in a launch file are also compatible with the `rosparam` parameter server. A potential weakness of this design is that the settings in a launch file may be hard to humans to find, parse, and understand without a package maintainer providing good documentation. 
- *Parameter servers*, e.g., `rosparam`.  These provide tools to set settings to configure the behavior of programs and an API to retrieve settings in the program.  Note that a program should have good documentation to identify which parameters it uses. 
- *Dynamic parameter servers*, e.g., the defunct [`dynamic_reconfigure`](http://wiki.ros.org/dynamic_reconfigure/Tutorials) package in ROS. In this solution the parameters are available through a server interface that returns settings to processes, and allows the user to configure settings as a process is running.  This does incur some overhead in having the process repeatedly query settings from the server.
- *Unified parameter APIs*, e.g., [ROS 2's approach](http://design.ros2.org/articles/ros_parameters.html) for parameters.  Each process has an interface in which it can respond to parameter listing, getting, and setting queries.  The downside of this approach is that there is no unified collection of system parameters, and a process must launch before its parameters can be reconfigured.

When projects grow large, [parameter setting becomes quite complex](http://wiki.ros.org/roslaunch/Tutorials/Roslaunch%20tips%20for%20larger%20projects). As an example, it is a common practice in ROS systems to dig through packages to find and modify their configuration and launch files in-place, but this does not play nicely with version control systems like Git that (by default) will prompt users to commit changes to the package repository.  This is not usually what we want to do.  So there are a plethora of options here: creating a new launch file, creating a parent launch file that overrides the parameter value in the child launch file, or using command-line arguments to override parameters.  Sadly, the wide variation in implementation practices between projects and sub-projects makes it extremely confusing for users to anticipate how to best configure a system. 


### Logging

You may have noticed that programming real-time systems can be difficult due to the inability to step through a program's execution, such as with a debugger, while ensuring that it interacts with other programs in the same way.  It's also hard to use printing: if you encounter an error on one time step and print an error message, then that message can quickly disappear off the console as more mundane messages are printed, and you won't have a chance to read it minutes later. Also, as projects get more complex, you become likely to encounter the issue of cascading failures which propagage through multiple programs on a system.

Logging is a common approach to helping debug such systems.  We will talk about three different forms of logging: 1) event logging, 2) message recording and replay, and 3) state logging.

*Event logging* is the standard form of logging built into Python, Java, ROS, and other systems in which a program saves messages to a file. Each message consists of a timestamp, log level, optional file and line of code, and a string describing the event.  The *log level* (or severity level) is logger-dependent but usually ranges from "debug", "info", "warn", "error", and "fatal" levels, and you can configure a program to only log messages above a certain level.  You can also configure log readers to filter log messages by level.  It is generally useful to have event logging to diagnose anomalous conditions, but the effective use of event logging requires implementers to be conscious of the verbosity of their logs and the computational and storage overhead of logging too much.

*Message recording and replay* is used most notably in the [`rosbag`](http://wiki.ros.org/ROS/Tutorials/Recording%20and%20playing%20back%20data) program, and is largely most suitable for use in pub-sub systems.  Recording is used to save all messages that are published to one or more topics to a file (a .bag file, in the case of `rosbag`), and then replay will publish those messages back to those topics.  By doing so, you can capture the behavior of the inputs to a program during operation, debug and modify the program, and then replicate the input in a debugging run.  You can also use visualization tools like `rviz` or `rqtgraph` to visually inspect the values of these logs and identify problematic conditions.

A major disadvantage of message recording is that log files become enormous, especially when recording large messages like images over long periods of time.  Moreover, in order to debug a problem that occurred 10 minutes into running, you may need to replay all 10 minutes of messages before that problem can be debugged so that you can replicate your program's state.  To address this issue, it becomes necessary for a system middleware to be more explicit about how it stores program and system state.  A *state logging* functionality would periodically capture a snapshot of all meaningful internal variables of a program / system to disk.  Upon restoration of a state from disk, the program / system can resume exactly where it was left off or at a designated point in time.  There are no widely used open-source system middleware packages that perform state logging, but this is an active topic of research and experimental development. [Database-driven communication](https://github.com/krishauser/reem), such as a [blackboard model](https://en.wikipedia.org/wiki/Blackboard_system), is particularly well-suited for this because the entire database can be dumped at periodic intervals. 

As system development becomes more mature and you performs repeated testing of your system, you will quickly become overwhelmed by the number of logs saved. *Log management* describes the set of practices that a larger team will use to determine what to log, how to store and organize logs, and how to browse logs for events or behaviors of interest.  More sophisticated log management software may include specialized hardware, compression algorithms, and cloud servers to help teams maintain logs with high computational and storage performance, as well as web dashboards, event managers, and visualization tools to help identify errors as they occur and diagnose causes of failures after the fact.  (Notably, ROS does not provide any log management system, which is a small part of why large enterprises tend to mature beyond ROS.) 


### Visualization

TODO

2D or 3D visualizations

Live plotting


### Simulation

TODO

### Program analysis and tuning

Profiling

Learning dashboards (Tensorboard, WandB)

Auto-tuning

### Containerization

TODO

## Summary

### Key takeaways

TODO

### Glossary

Real-time programming
- [Execution loop](#execution-loop): The main loop of a real-time program that repeatedly performs some work until a termination criterion is met.
- [Blocking](#Blocking-requests): When a function call stops the execution of our application's code until an interaction (with another process or device) completes.
- [Polling loop](#Polling-loops): An execution loop that avoids blocking by repeatedly checking whether events (such as the arrival of a message from another process) are present before executing corresponding program logic.  Event checks are performed at a fixed rate. 
- [Event loop](#Event-loops): An execution loop that responds to events via callbacks.  Callbacks implement the program logic, and are triggered by the event system, which frequently takes control over the execution loop.
- [Callback](#Event-loops): A function that is called by an event system when an event happens.  Callbacks are defined by the application to implement the program logic.
- [State machine](#State-machines): A common paradigm for programming real-time systems that change functionality at discrete points in time in response to events.  Each state implements a functionality, and events correspond to transitions between states.
- [Process](#processes): Code that is run by an OS scheduler and is provided its own memory space.
- [Thread](#threading): A code execution unit in which the OS scheduler executes instructions in sequence.  A process can have one or more threads.
- [Launcher](#Launching-concurrent-processes): A single entry point to running multiple concurrent programs.  Usually a launch script or a settings file provided to a launch program (e.g., `roslaunch`).

Communication
- Inter-process communication (IPC)
- Stream
- Publish-subscribe
- Remote procedure call
- Queuing
- Fast sender problem / slow receiver problem
- Latency
- Throughput
- Bandwidth
- Packet loss
- Client
- Server
- Broker
- Transmission Control Protocol (TCP)
- Unreliable Datagram Protocol (UDP)
- Reliable delivery
- In-order delivery
- Keepalive
- Typed serialization
- Untyped serialization
- Schema

System development
- Configuration file
- Parameter server
- Dynamic parameter server
- Logging
- Log level / severity level
- Physics simulation
- Hardware-in-the-loop simulation
- Anomaly injection


## Exercises

1. When using publish-subscribe in ROS, are you implementing a polling, event-based, or mixed execution loop?  Is this using blocking or non-blocking calls?  Which of the [communication characteristics and error handling](#communication-characteristics-and-error-handling) items do you need to worry about?  Repeat the same exercise but for the ROS services communication API.  Research the ROS documentation and online resources to help you determine these answers.
2. In Python, implement serialization and framing for a TCP/IP stream protocol in which JSON objects are sent from the client to the server, and delimited by endlines (`'\n'` character).  Bonus: make sure your serialization and framing work with strings that contain the endline character!
3. ROS messages are not the most convenient objects to work with in native languages and other software libraries, so it is common for ROS to provide conversion utilities.  For example, [cv_bridge](http://wiki.ros.org/cv_bridge) is provided to convert ROS images to OpenCV images.  Your team always uses OpenCV, so you are considering how to structure your infrastructure.  You are considering three options:
   1. Your team continues to use `cv_bridge` to send and receive images in ROS format.
   2. You write an interface class around all of your system's communication that accepts native OpenCV images as input and produces them as output.
   3. You write new wrappers `ImageSubscriber` / `ImagePublisher` for ROS subscriber / publisher classes that accept native OpenCV images in `publish` calls and outputs native OpenCV images in `subscribe` callbacks.  
   
   What are your opinions about each of these three options?  Which is the easiest for you?  Which is the easiest for the team?  Are there any performance considerations between these options?
4. 