Skip to content

Commit

Permalink
Add intra-process tracepoints (#30)
Browse files Browse the repository at this point in the history
Signed-off-by: Kodai Yamasaki <114902604+ymski@users.noreply.github.com>
  • Loading branch information
ymski committed Apr 13, 2023
1 parent 020c8b6 commit 3a1a0ae
Show file tree
Hide file tree
Showing 9 changed files with 651 additions and 4 deletions.
116 changes: 112 additions & 4 deletions doc/design_ros_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ Design document for the general ROS 2 instrumentation, tracing, and analysis eff
1. [Node/component creation](#nodecomponent-creation)
1. [Publisher creation](#publisher-creation)
1. [Subscription creation](#subscription-creation)
1. [IntraProcessBuffer creation](#intraprocessbuffer-creation)
1. [Executors](#executors)
1. [Subscription callbacks](#subscription-callbacks)
1. [Intra-process callback](#intra-process-callback)
1. [Message publishing](#message-publishing)
1. [Intra-process message publishing](#intra-process-message-publishing)
1. [Service creation](#service-creation)
1. [Service callbacks](#service-callbacks)
1. [Client creation](#client-creation)
Expand Down Expand Up @@ -116,13 +119,20 @@ The following table summarizes the instrumentation and links to the correspondin
| `rclcpp` | `rclcpp_subscription_init` | [*Subscription creation*](#subscription-creation) |
| | `rclcpp_subscription_callback_added` | [*Subscription creation*](#subscription-creation) |
| | `rclcpp_publish` | [*Message publishing*](#message-publishing) |
| | `rclcpp_intra_publish` | [*Intra-process message publishing*](#intra-process-message-publishing) |
| | `rclcpp_take` | [*Subscription callbacks*](#subscription-callbacks) |
| | `rclcpp_service_callback_added` | [*Service creation*](#service-creation) |
| | `rclcpp_timer_callback_added` | [*Timer creation*](#timer-creation) |
| | `rclcpp_timer_link_node` | [*Timer creation*](#timer-creation) |
| | `rclcpp_callback_register` | [*Subscription creation*](#subscription-creation), [*Service creation*](#service-creation), [*Timer creation*](#timer-creation) |
| | `callback_start` | [*Subscription callbacks*](#subscription-callbacks), [*Service callbacks*](#service-callbacks), [*Client request/response*](#client-requestresponse), [*Timer callbacks*](#timer-callbacks) |
| | `callback_end` | [*Subscription callbacks*](#subscription-callbacks), [*Service callbacks*](#service-callbacks), [*Client request/response*](#client-requestresponse), [*Timer callbacks*](#timer-callbacks) |
| | `rclcpp_construct_ring_buffer` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_buffer_to_ipb` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_ipb_to_subscription` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_ring_buffer_enqueue` | [*Intra-process message publishing*](#intra-process-message-publishing) |
| | `rclcpp_ring_buffer_dequeue` | [*Intra-process callback*](#intra-process-callback) |
| | `rclcpp_ring_buffer_clear` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_executor_get_next_ready` | [*Executors*](#executors) |
| | `rclcpp_executor_wait_for_work` | [*Executors*](#executors) |
| | `rclcpp_executor_execute` | [*Executors*](#executors), [*Timer callbacks*](#timer-callbacks), [*Subscription callbacks*](#subscription-callbacks) |
Expand Down Expand Up @@ -293,6 +303,7 @@ It creates an `rmw_subscription_t` handle by calling `rmw_create_subscription()`
`rclcpp::Subscription` creates an `rclcpp::AnySubscriptionCallback` object and associates it with itself.

If intra-process is enabled, `rclcpp::Subscription` also creates a `rclcpp::SubscriptionIntraProcess` object, which has its own `rclcpp::AnySubscriptionCallback` object.
See the [IntraProcessBuffer creation section](#intraprocessbuffer-creation).

**Important information**:
* Link between `rcl_subscription_t` and `rmw_subscription_t` handles and `rclcpp::Subscription` object
Expand Down Expand Up @@ -337,6 +348,42 @@ sequenceDiagram
rclcpp-->>tracetools: TP(rclcpp_callback_register, rclcpp::AnySubscriptionCallback *, symbol)
```

#### IntraProcessBuffer creation

The initialization process for the buffer used in intra-process communication is performed as part of the [initialization process for the subscription](#subscription-creation).
First, the `rclcpp::SubscriptionIntraProcess` that is responsible for managing intra-process communication is created.
It then creates an instance of `rclcpp::experimental::SubscriptionIntraProcessBuffer` using the `rclcpp::experimental::create_intra_process_buffer()` function.
This function creates a `rclcpp::experimental::buffers::TypedIntraProcessBuffer` object with a `rclcpp::experimental::buffers::RingBufferImplementation`.
The initialization of intra-process communication is completed by registering the settings related to the intra-process communication generated above using `rclcpp::SubscriptionBase::setup_intra_process()`.

```mermaid
sequenceDiagram
participant Subscription
participant SubscriptionIntraProcess
participant SubscriptionIntraProcessBuffer
participant RingBufferImplementation
participant TypedIntraProcessBuffer
participant IntraProcessManager
participant tracetools
Note over Subscription: construct subscription
Subscription ->> SubscriptionIntraProcess: construct
SubscriptionIntraProcess ->> SubscriptionIntraProcessBuffer: construct
Note over SubscriptionIntraProcessBuffer: rclcpp::exmerimental::create_intra_process_buffer()
SubscriptionIntraProcessBuffer ->> RingBufferImplementation: construct
RingBufferImplementation -->> tracetools: TP(rclcpp_construct_ring_buffer, buffer *, capacity)
SubscriptionIntraProcessBuffer ->> TypedIntraProcessBuffer: construct
TypedIntraProcessBuffer -->> tracetools: TP(rclcpp_buffer_to_ipb, buffer *, ipb *)
SubscriptionIntraProcessBuffer -->> tracetools: TP(rclcpp_ipb_to_subscription, ipb *, subscription *)
SubscriptionIntraProcess --> tracetools: TP(rclcpp_subscription_callback_added, rclcpp::SubscriptionIntraProcess *, rclcpp::Waitable *)
Subscription ->> IntraProcessManager: construct
Subscription ->> IntraProcessManager: add_subscription(subscription_intra_process)
Note over Subscription: setup_intra_process(intra_process_subscription_id, ipm)
Subscription -->> tracetools: TP(rclcpp_subscription_init, rcl_subscription_t *, rclcpp::SubscriptionIntraProcess *)
Subscription --> tracetools: TP(rclcpp_subscription_init, rcl_subscription_t *, rclcpp::Subscription *)
Subscription --> tracetools: TP(rclcpp_subscription_callback_added, rclcpp::Subscription *, rclcpp::AnySubscriptionCallback *)
```

#### Executors

An `rclcpp::Executor` object is created for a given process.
Expand All @@ -347,7 +394,7 @@ Nodes are instanciated, usually as a `shared_ptr` through `std::make_shared<Node
After all the nodes have been added, `rclcpp::Executor::spin()` is called (there are other spinning varations, but this is the main one).
`rclcpp::executors::SingleThreadedExecutor::spin()` simply loops forever until the process' context isn't valid anymore.
It fetches the next `rclcpp::AnyExecutable` (e.g., subscription, timer, service, client), possibly waiting a bit, and calls `rclcpp::Executor::execute_any_executable()` with it.
This then calls the relevant `execute*()` method (e.g., [`execute_timer()`](#timer-callbacks), [`execute_subscription()`](#subscription-callbacks), `execute_service()`, `execute_client()`).
This then calls the relevant `execute*()` method (e.g., [`execute_timer()`](#timer-callbacks), [`execute_subscription()`](#subscription-callbacks), `execute_service()`, `execute_client()`, `Waitable::execute()`).

**Important information**:
* Timestamps of executor phases
Expand Down Expand Up @@ -426,15 +473,47 @@ sequenceDiagram
Executor->>Subscription: return_message(msg)
```

#### Intra-process callback

Intra-process subscriptions are handled in the `rclcpp` layer.
Callback functions are wrapped by an `rclcpp::Waitable` object (i.e., the `rclcpp::SubscriptionIntraProcess` object), which is registered when creating the `rclcpp::Subscription` object.

In `rclcpp::Executor::get_next_ready_executable_from_map()`, the `rclcpp::Executor` checks for new intra-process messages.
If there is a new message, it calls `rclcpp::SubscriptionIntraProcess::take_data()`, which calls `rclcpp::IntraProcessBuffer::consume_*()` (e.g., `consume_share()`, `consume_unique()`), which in turn calls `rclcpp::BufferImplementationBase::dequeue()` to get the message from the intra-process buffer.
Then, in `rclcpp::Executor::get_next_ready_executable(any_exec)`, the executor calls `rclcpp::Waitable::execute()` with the message, which is actually `rclcpp::SubscriptionIntraProcess::execute()`.
This checks message is `shared_ptr` or `unique_ptr`, and then it calls `rclcpp::SubscriptionIntraProcess::dispatch_intra_process()`, which then calls the callback `std::function`.
Finally, the callback group (`any_exec.callback_group`) is reset.

```mermaid
sequenceDiagram
participant Executor
participant SubscriptionIntraProcess
participant IntraProcessBuffer
participant RingBufferImplementation
participant AnySubscriptionCallback
participant tracetools
Note over Executor: get_next_ready_executable(any_exec)
Executor ->> SubscriptionIntraProcess: take_data()
SubscriptionIntraProcess ->> IntraProcessBuffer: consume_*()
IntraProcessBuffer ->> RingBufferImplementation: dequeue()
RingBufferImplementation -->> tracetools: TP(rclcpp_ring_buffer_dequeue, buffer *, index)
Note over Executor: execute_any_executable(any_exec)
Executor ->> SubscriptionIntraProcess: execute(any_exec.data)
SubscriptionIntraProcess ->> AnySubscriptionCallback: dispatch_intra_process()
AnySubscriptionCallback -->> tracetools: TP(callback_start, callback, is_intra_process)
Note over AnySubscriptionCallback: std::function(...)
AnySubscriptionCallback -->> tracetools: TP(callback_end, callback)
Note over Executor: reset any_exec.callback_group
```

#### Message publishing

To publish a message, a message object is first allocated (or loaned) and then populated at the user level (e.g., in a node).
The message is then published through one of the `rclcpp::Publisher::publish()` methods.
For normal inter-process publishing, this then passes that on to `rcl`, which itself passes it to `rmw`, which passes it on to the underlying middleware.

TODO add inter- vs. intra-process execution flow
TODO talk about IntraProcessManager stuff?

**Important information**:
* Link to publisher handle(s)
* Message being published, with timestamp
Expand Down Expand Up @@ -462,6 +541,35 @@ sequenceDiagram
Note over node: keeps, returns, or destroys msg
```

#### Intra-process message publishing

To publishing a message in intra-process, a message object is first allocated (or loaned) and then populated at the user level (e.g., in a node).
The message is then published through one of the `rclcpp::Publisher::publish()` methods.

For normal intra-process publishing, this then passes that on to `rclcpp::IntraProcessManager`, which itself passes it to `rclcpp::TypedIntraProcessBuffer`, which passes it on to the `rclcpp::RingBufferImplementation`.
In `rclcpp::RingBufferImplementation`, published data is stored by its `enqueue()` method.

```mermaid
sequenceDiagram
participant node
participant Publisher
participant IntraProcessManager
participant SubscriptionIntraProcess
participant TypedIntraProcessBuffer
participant RingBufferImplementation
participant tracetools
Note over node: spin
Note over node: creates a msg
node ->> Publisher: publish(message)
Publisher -->> tracetools : TP(rclcpp_intra_publish, publisher_handle, message *)
Publisher ->> IntraProcessManager: do_intra_process_publish()/do_intra_process_publish_and_return_shared()
IntraProcessManager ->> SubscriptionIntraProcess: provide_intra_process_data()
SubscriptionIntraProcess ->> TypedIntraProcessBuffer: add_shared(message *)/add_unique(message *)
TypedIntraProcessBuffer ->> RingBufferImplementation: enqueue()
RingBufferImplementation -->> tracetools : TP(rclcpp_ring_buffer_enqueue, buffer *, index, size, overwritten)
```

#### Service creation

Service server creation is similar to subscription creation.
Expand Down
2 changes: 2 additions & 0 deletions test_tracetools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ if(BUILD_TESTING)
if(NOT TRACETOOLS_TRACEPOINTS_EXCLUDED)
find_package(ament_cmake_pytest REQUIRED)
set(_test_tracetools_pytest_tests
test/test_buffer.py
test/test_executor.py
test/test_intra.py
test/test_intra_pub_sub.py
test/test_lifecycle_node.py
test/test_node.py
test/test_pub_sub.py
Expand Down
95 changes: 95 additions & 0 deletions test_tracetools/test/test_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright 2023 Research Institute of Systems Planning, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from tracetools_test.case import TraceTestCase
from tracetools_trace.tools import tracepoints as tp
from tracetools_trace.tools.lttng import is_lttng_installed


@unittest.skipIf(not is_lttng_installed(minimum_version='2.9.0'), 'LTTng is required')
class TestBuffer(TraceTestCase):

def __init__(self, *args) -> None:
super().__init__(
*args,
session_name_prefix='session-test-buffer-creation',
events_ros=[
tp.rclcpp_construct_ring_buffer,
tp.rclcpp_ipb_to_subscription,
tp.rclcpp_buffer_to_ipb,
],
package='test_tracetools',
nodes=['test_intra'],
)

def test_all(self):
# Check events as set
self.assertEventsSet(self._events_ros)

# Check fields
construct_buffer_events = self.get_events_with_name(tp.rclcpp_construct_ring_buffer)
for event in construct_buffer_events:
self.assertValidPointer(event, 'buffer')
self.assertFieldType(event, 'capacity', int)

ipb_to_subscription_events = self.get_events_with_name(tp.rclcpp_ipb_to_subscription)
for event in ipb_to_subscription_events:
self.assertValidPointer(event, ['ipb', 'subscription'])

buffer_to_ipb_events = self.get_events_with_name(tp.rclcpp_buffer_to_ipb)
for event in buffer_to_ipb_events:
self.assertValidPointer(event, ['buffer', 'ipb'])

# Check corresponding events for construct_buffer_event
for construct_event in construct_buffer_events:
target_buffer = self.get_field(construct_event, 'buffer')
target_buffer_to_ipb_event = self.get_events_with_field_value(
'buffer',
target_buffer,
buffer_to_ipb_events)
# Only 1 for our given buffer
self.assertNumEventsEqual(
target_buffer_to_ipb_event,
1,
'none or more than 1 buffer_to_ipb event for the buffer',
)

target_ipb = self.get_field(target_buffer_to_ipb_event[0], 'ipb')
target_ipb_to_subscription_event = self.get_events_with_field_value(
'ipb',
target_ipb,
ipb_to_subscription_events)

# Only 1 for our given ipb
self.assertNumEventsEqual(
target_ipb_to_subscription_event,
1,
'none or more than 1 ipb_to_subscription event for the ipb',
)

# Check subscription init order
# * rclcpp_construct_ring_buffer
# * rclcpp_buffer_to_ipb
# * rclcpp_ipb_to_subscription
self.assertEventOrder([
construct_event,
target_buffer_to_ipb_event[0],
target_ipb_to_subscription_event[0]
])


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 3a1a0ae

Please sign in to comment.