Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intra process tracepoint #30

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 112 additions & 4 deletions doc/design_ros_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ Design document for the general ROS 2 instrumentation, tracing, and analysis eff
1. [Node/component creation](#nodecomponent-creation)
1. [Publisher creation](#publisher-creation)
1. [Subscription creation](#subscription-creation)
1. [IntraProcessBuffer creation](#intraprocessbuffer-creation)
1. [Executors](#executors)
1. [Subscription callbacks](#subscription-callbacks)
1. [Intra-process callback](#intra-process-callback)
1. [Message publishing](#message-publishing)
1. [Intra-process message publishing](#intra-process-message-publishing)
1. [Service creation](#service-creation)
1. [Service callbacks](#service-callbacks)
1. [Client creation](#client-creation)
Expand Down Expand Up @@ -116,13 +119,20 @@ The following table summarizes the instrumentation and links to the correspondin
| `rclcpp` | `rclcpp_subscription_init` | [*Subscription creation*](#subscription-creation) |
| | `rclcpp_subscription_callback_added` | [*Subscription creation*](#subscription-creation) |
| | `rclcpp_publish` | [*Message publishing*](#message-publishing) |
| | `rclcpp_intra_publish` | [*Intra-process message publishing*](#intra-process-message-publishing) |
| | `rclcpp_take` | [*Subscription callbacks*](#subscription-callbacks) |
| | `rclcpp_service_callback_added` | [*Service creation*](#service-creation) |
| | `rclcpp_timer_callback_added` | [*Timer creation*](#timer-creation) |
| | `rclcpp_timer_link_node` | [*Timer creation*](#timer-creation) |
| | `rclcpp_callback_register` | [*Subscription creation*](#subscription-creation), [*Service creation*](#service-creation), [*Timer creation*](#timer-creation) |
| | `callback_start` | [*Subscription callbacks*](#subscription-callbacks), [*Service callbacks*](#service-callbacks), [*Client request/response*](#client-requestresponse), [*Timer callbacks*](#timer-callbacks) |
| | `callback_end` | [*Subscription callbacks*](#subscription-callbacks), [*Service callbacks*](#service-callbacks), [*Client request/response*](#client-requestresponse), [*Timer callbacks*](#timer-callbacks) |
| | `rclcpp_construct_ring_buffer` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_buffer_to_ipb` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_ipb_to_subscription` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_ring_buffer_enqueue` | [*Intra-process message publishing*](#intra-process-message-publishing) |
| | `rclcpp_ring_buffer_dequeue` | [*Intra-process callback*](#intra-process-callback) |
| | `rclcpp_ring_buffer_clear` | [*IntraProcessBuffer creation*](#intraprocessbuffer-creation) |
| | `rclcpp_executor_get_next_ready` | [*Executors*](#executors) |
| | `rclcpp_executor_wait_for_work` | [*Executors*](#executors) |
| | `rclcpp_executor_execute` | [*Executors*](#executors), [*Timer callbacks*](#timer-callbacks), [*Subscription callbacks*](#subscription-callbacks) |
Expand Down Expand Up @@ -293,6 +303,7 @@ It creates an `rmw_subscription_t` handle by calling `rmw_create_subscription()`
`rclcpp::Subscription` creates an `rclcpp::AnySubscriptionCallback` object and associates it with itself.

If intra-process is enabled, `rclcpp::Subscription` also creates a `rclcpp::SubscriptionIntraProcess` object, which has its own `rclcpp::AnySubscriptionCallback` object.
See the [IntraProcessBuffer creation section](#intraprocessbuffer-creation).

**Important information**:
* Link between `rcl_subscription_t` and `rmw_subscription_t` handles and `rclcpp::Subscription` object
Expand Down Expand Up @@ -337,6 +348,42 @@ sequenceDiagram
rclcpp-->>tracetools: TP(rclcpp_callback_register, rclcpp::AnySubscriptionCallback *, symbol)
```

#### IntraProcessBuffer creation

The initialization process for the buffer used in intra-process communication is performed as part of the [initialization process for the subscription](#subscription-creation).
First, the `rclcpp::SubscriptionIntraProcess` that is responsible for managing intra-process communication is created.
It then creates an instance of `rclcpp::experimental::SubscriptionIntraProcessBuffer` using the `rclcpp::experimental::create_intra_process_buffer()` function.
This function creates a `rclcpp::experimental::buffers::TypedIntraProcessBuffer` object with a `rclcpp::experimental::buffers::RingBufferImplementation`.
The initialization of intra-process communication is completed by registering the settings related to the intra-process communication generated above using `rclcpp::SubscriptionBase::setup_intra_process()`.

```mermaid
sequenceDiagram
participant Subscription
participant SubscriptionIntraProcess
participant SubscriptionIntraProcessBuffer
participant RingBufferImplementation
participant TypedIntraProcessBuffer
participant IntraProcessManager
participant tracetools

Note over Subscription: construct subscription
Subscription ->> SubscriptionIntraProcess: construct
SubscriptionIntraProcess ->> SubscriptionIntraProcessBuffer: construct
Note over SubscriptionIntraProcessBuffer: rclcpp::exmerimental::create_intra_process_buffer()
SubscriptionIntraProcessBuffer ->> RingBufferImplementation: construct
RingBufferImplementation -->> tracetools: TP(rclcpp_construct_ring_buffer, buffer *, capacity)
SubscriptionIntraProcessBuffer ->> TypedIntraProcessBuffer: construct
TypedIntraProcessBuffer -->> tracetools: TP(rclcpp_buffer_to_ipb, buffer *, ipb *)
SubscriptionIntraProcessBuffer -->> tracetools: TP(rclcpp_ipb_to_subscription, ipb *, subscription *)
SubscriptionIntraProcess --> tracetools: TP(rclcpp_subscription_callback_added, rclcpp::SubscriptionIntraProcess *, rclcpp::Waitable *)
Subscription ->> IntraProcessManager: construct
Subscription ->> IntraProcessManager: add_subscription(subscription_intra_process)
Note over Subscription: setup_intra_process(intra_process_subscription_id, ipm)
Subscription -->> tracetools: TP(rclcpp_subscription_init, rcl_subscription_t *, rclcpp::SubscriptionIntraProcess *)
Subscription --> tracetools: TP(rclcpp_subscription_init, rcl_subscription_t *, rclcpp::Subscription *)
Subscription --> tracetools: TP(rclcpp_subscription_callback_added, rclcpp::Subscription *, rclcpp::AnySubscriptionCallback *)
```

#### Executors

An `rclcpp::Executor` object is created for a given process.
Expand All @@ -347,7 +394,7 @@ Nodes are instanciated, usually as a `shared_ptr` through `std::make_shared<Node
After all the nodes have been added, `rclcpp::Executor::spin()` is called (there are other spinning varations, but this is the main one).
`rclcpp::executors::SingleThreadedExecutor::spin()` simply loops forever until the process' context isn't valid anymore.
It fetches the next `rclcpp::AnyExecutable` (e.g., subscription, timer, service, client), possibly waiting a bit, and calls `rclcpp::Executor::execute_any_executable()` with it.
This then calls the relevant `execute*()` method (e.g., [`execute_timer()`](#timer-callbacks), [`execute_subscription()`](#subscription-callbacks), `execute_service()`, `execute_client()`).
This then calls the relevant `execute*()` method (e.g., [`execute_timer()`](#timer-callbacks), [`execute_subscription()`](#subscription-callbacks), `execute_service()`, `execute_client()`, `Waitable::execute()`).

**Important information**:
* Timestamps of executor phases
Expand Down Expand Up @@ -426,15 +473,47 @@ sequenceDiagram
Executor->>Subscription: return_message(msg)
```

#### Intra-process callback

Intra-process subscriptions are handled in the `rclcpp` layer.
Callback functions are wrapped by an `rclcpp::Waitable` object (i.e., the `rclcpp::SubscriptionIntraProcess` object), which is registered when creating the `rclcpp::Subscription` object.

In `rclcpp::Executor::get_next_ready_executable_from_map()`, the `rclcpp::Executor` checks for new intra-process messages.
If there is a new message, it calls `rclcpp::SubscriptionIntraProcess::take_data()`, which calls `rclcpp::IntraProcessBuffer::consume_*()` (e.g., `consume_share()`, `consume_unique()`), which in turn calls `rclcpp::BufferImplementationBase::dequeue()` to get the message from the intra-process buffer.
Then, in `rclcpp::Executor::get_next_ready_executable(any_exec)`, the executor calls `rclcpp::Waitable::execute()` with the message, which is actually `rclcpp::SubscriptionIntraProcess::execute()`.
This checks message is `shared_ptr` or `unique_ptr`, and then it calls `rclcpp::SubscriptionIntraProcess::dispatch_intra_process()`, which then calls the callback `std::function`.
Finally, the callback group (`any_exec.callback_group`) is reset.

```mermaid
sequenceDiagram

participant Executor
participant SubscriptionIntraProcess
participant IntraProcessBuffer
participant RingBufferImplementation
participant AnySubscriptionCallback
participant tracetools

Note over Executor: get_next_ready_executable(any_exec)
Executor ->> SubscriptionIntraProcess: take_data()
SubscriptionIntraProcess ->> IntraProcessBuffer: consume_*()
IntraProcessBuffer ->> RingBufferImplementation: dequeue()
RingBufferImplementation -->> tracetools: TP(rclcpp_ring_buffer_dequeue, buffer *, index)
Note over Executor: execute_any_executable(any_exec)
Executor ->> SubscriptionIntraProcess: execute(any_exec.data)
SubscriptionIntraProcess ->> AnySubscriptionCallback: dispatch_intra_process()
AnySubscriptionCallback -->> tracetools: TP(callback_start, callback, is_intra_process)
Note over AnySubscriptionCallback: std::function(...)
AnySubscriptionCallback -->> tracetools: TP(callback_end, callback)
Note over Executor: reset any_exec.callback_group
```

#### Message publishing

To publish a message, a message object is first allocated (or loaned) and then populated at the user level (e.g., in a node).
The message is then published through one of the `rclcpp::Publisher::publish()` methods.
For normal inter-process publishing, this then passes that on to `rcl`, which itself passes it to `rmw`, which passes it on to the underlying middleware.

TODO add inter- vs. intra-process execution flow
TODO talk about IntraProcessManager stuff?

**Important information**:
* Link to publisher handle(s)
* Message being published, with timestamp
Expand Down Expand Up @@ -462,6 +541,35 @@ sequenceDiagram
Note over node: keeps, returns, or destroys msg
```

#### Intra-process message publishing

To publishing a message in intra-process, a message object is first allocated (or loaned) and then populated at the user level (e.g., in a node).
The message is then published through one of the `rclcpp::Publisher::publish()` methods.

For normal intra-process publishing, this then passes that on to `rclcpp::IntraProcessManager`, which itself passes it to `rclcpp::TypedIntraProcessBuffer`, which passes it on to the `rclcpp::RingBufferImplementation`.
In `rclcpp::RingBufferImplementation`, published data is stored by its `enqueue()` method.

```mermaid
sequenceDiagram
participant node
participant Publisher
participant IntraProcessManager
participant SubscriptionIntraProcess
participant TypedIntraProcessBuffer
participant RingBufferImplementation
participant tracetools

Note over node: spin
Note over node: creates a msg
node ->> Publisher: publish(message)
Publisher -->> tracetools : TP(rclcpp_intra_publish, publisher_handle, message *)
Publisher ->> IntraProcessManager: do_intra_process_publish()/do_intra_process_publish_and_return_shared()
IntraProcessManager ->> SubscriptionIntraProcess: provide_intra_process_data()
SubscriptionIntraProcess ->> TypedIntraProcessBuffer: add_shared(message *)/add_unique(message *)
TypedIntraProcessBuffer ->> RingBufferImplementation: enqueue()
RingBufferImplementation -->> tracetools : TP(rclcpp_ring_buffer_enqueue, buffer *, index, size, overwritten)
```

#### Service creation

Service server creation is similar to subscription creation.
Expand Down
2 changes: 2 additions & 0 deletions test_tracetools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ if(BUILD_TESTING)
if(NOT TRACETOOLS_TRACEPOINTS_EXCLUDED)
find_package(ament_cmake_pytest REQUIRED)
set(_test_tracetools_pytest_tests
test/test_buffer.py
test/test_executor.py
test/test_intra.py
test/test_intra_pub_sub.py
test/test_lifecycle_node.py
test/test_node.py
test/test_pub_sub.py
Expand Down
95 changes: 95 additions & 0 deletions test_tracetools/test/test_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright 2023 Research Institute of Systems Planning, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from tracetools_test.case import TraceTestCase
from tracetools_trace.tools import tracepoints as tp
from tracetools_trace.tools.lttng import is_lttng_installed


@unittest.skipIf(not is_lttng_installed(minimum_version='2.9.0'), 'LTTng is required')
class TestBuffer(TraceTestCase):

def __init__(self, *args) -> None:
super().__init__(
*args,
session_name_prefix='session-test-buffer-creation',
events_ros=[
tp.rclcpp_construct_ring_buffer,
tp.rclcpp_ipb_to_subscription,
tp.rclcpp_buffer_to_ipb,
],
package='test_tracetools',
nodes=['test_intra'],
)

def test_all(self):
# Check events as set
self.assertEventsSet(self._events_ros)

# Check fields
construct_buffer_events = self.get_events_with_name(tp.rclcpp_construct_ring_buffer)
for event in construct_buffer_events:
self.assertValidPointer(event, 'buffer')
self.assertFieldType(event, 'capacity', int)

ipb_to_subscription_events = self.get_events_with_name(tp.rclcpp_ipb_to_subscription)
for event in ipb_to_subscription_events:
self.assertValidPointer(event, ['ipb', 'subscription'])

buffer_to_ipb_events = self.get_events_with_name(tp.rclcpp_buffer_to_ipb)
for event in buffer_to_ipb_events:
self.assertValidPointer(event, ['buffer', 'ipb'])

# Check corresponding events for construct_buffer_event
for construct_event in construct_buffer_events:
target_buffer = self.get_field(construct_event, 'buffer')
target_buffer_to_ipb_event = self.get_events_with_field_value(
'buffer',
target_buffer,
buffer_to_ipb_events)
# Only 1 for our given buffer
self.assertNumEventsEqual(
target_buffer_to_ipb_event,
1,
'none or more than 1 buffer_to_ipb event for the buffer',
)

target_ipb = self.get_field(target_buffer_to_ipb_event[0], 'ipb')
target_ipb_to_subscription_event = self.get_events_with_field_value(
'ipb',
target_ipb,
ipb_to_subscription_events)

# Only 1 for our given ipb
self.assertNumEventsEqual(
target_ipb_to_subscription_event,
1,
'none or more than 1 ipb_to_subscription event for the ipb',
)

# Check subscription init order
# * rclcpp_construct_ring_buffer
# * rclcpp_buffer_to_ipb
# * rclcpp_ipb_to_subscription
self.assertEventOrder([
construct_event,
target_buffer_to_ipb_event[0],
target_ipb_to_subscription_event[0]
])


if __name__ == '__main__':
unittest.main()
Loading