Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rmw_fastrtps] Improve handling of dynamic discovery #653

Merged
merged 46 commits into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
765933f
Support specification of discovery range and static peers
gbiggs Aug 7, 2022
acc9541
Apply suggestions from eProsima
gbiggs Oct 4, 2022
b2a6a24
Use participant ignoring
gbiggs Nov 10, 2022
24bdb5a
Improve handling of aliases for hosts
gbiggs Nov 26, 2022
7c38fef
Adds support for using IP addresses to specify peers
arjo129 Jan 31, 2023
05daea6
Remove excessive logging
arjo129 Jan 31, 2023
05d3836
Add name lookup and clean up implementation.
arjo129 Feb 15, 2023
5bb9754
Two more scenarios fixed.
arjo129 Feb 23, 2023
4ab38d3
Revert rmw changes
arjo129 Feb 27, 2023
fdf6e83
Add support for dynamic allocations
arjo129 Mar 2, 2023
30c7ee1
Support new requirements
arjo129 Mar 2, 2023
8f1ac6b
Update to latest rmw API
mxgrey Mar 9, 2023
d320029
Merge branch 'rolling' into gbiggs/discovery-peers-specification
sloretz Mar 24, 2023
033af73
Update to use API on Fast-DDS master
sloretz Mar 27, 2023
6ff095d
Merge branch 'rolling' into gbiggs/discovery-peers-specification
sloretz Mar 27, 2023
0fbce8e
Update with rmw_discovery_options_t changes
sloretz Mar 28, 2023
65de1fb
Minimize diff with rolling in custom_participant_info.hpp
sloretz Mar 28, 2023
f055b23
Minimize diff with rolling in participant.cpp
sloretz Mar 28, 2023
a2577c5
Collapse lines
sloretz Mar 28, 2023
dfd7dd2
conditional on one line
sloretz Mar 28, 2023
fe77f29
return instead of setting variable
sloretz Mar 28, 2023
5b29937
Make log messages more informative
sloretz Mar 28, 2023
2db3042
Works without ignore participant change!
sloretz Mar 29, 2023
1c1cbdf
Remove unused code
sloretz Mar 29, 2023
9740a00
Remove more unused code
sloretz Mar 29, 2023
cbaf1fb
Minimize diff with rolling
sloretz Mar 29, 2023
ce60f9f
NOT_SET and SYSTEM_DEFAULT values
sloretz Mar 29, 2023
02208da
OFF implementation that doesn't crash
sloretz Mar 30, 2023
e1d6787
Set discovery range in test
sloretz Mar 30, 2023
930a833
Lint
sloretz Mar 30, 2023
2bf0af2
Call rmw_discovery_options_init()
sloretz Mar 30, 2023
426253c
Workaround deadlock with rclcpp global logging mutex
sloretz Mar 30, 2023
7e80152
Add shared memory transport for LOCALHOST traffic
sloretz Mar 31, 2023
969c2b4
Configure max initial peers range on udp transport
sloretz Mar 31, 2023
d1e94da
Disable built-in transports and fix lint
sloretz Mar 31, 2023
1f56fb7
Error when range is an invalid value
sloretz Mar 31, 2023
60ce225
undo unnecessary test change
sloretz Apr 3, 2023
a5dae76
Document Setting range to SYSTEM_DEFAULT
sloretz Apr 4, 2023
581ab86
Limit participants to 1 when discover is OFF
sloretz Apr 4, 2023
cf9bf83
With SUBNET and initial peers, add default multicast address as a mul…
sloretz Apr 4, 2023
9cec2d6
Add multicast address to initial peer list when there are other stati…
sloretz Apr 4, 2023
47facb9
Bump required Fast-DDS version to 2.10
sloretz Apr 4, 2023
b68b332
Set maxInitialPeersRange to 32
sloretz Apr 4, 2023
5faeb3e
Grammar
sloretz Apr 4, 2023
df6139e
Shorten sentence
sloretz Apr 4, 2023
9824979
Merge branch 'rolling' into gbiggs/discovery-peers-specification
sloretz Apr 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ init_context_impl(
eprosima_fastrtps_identifier,
context->actual_domain_id,
&context->options.security_options,
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
&context->options.discovery_options,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info)
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/test/test_get_native_entities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TestNativeEntities : public ::testing::Test
});
options.enclave = rcutils_strdup("/", rcutils_get_default_allocator());
ASSERT_STREQ("/", options.enclave);
options.discovery_options.automatic_discovery_range = RMW_AUTOMATIC_DISCOVERY_RANGE_OFF;
ret = rmw_init(&options, &context);
ASSERT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
constexpr char node_name[] = "my_node";
Expand All @@ -56,8 +57,11 @@ class TestNativeEntities : public ::testing::Test

void TearDown() override
{
rmw_ret_t ret = rmw_destroy_node(node);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
rmw_ret_t ret;
if (nullptr != node) {
ret = rmw_destroy_node(node);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
}
sloretz marked this conversation as resolved.
Show resolved Hide resolved
ret = rmw_shutdown(&context);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
ret = rmw_context_fini(&context);
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ init_context_impl(
eprosima_fastrtps_identifier,
context->actual_domain_id,
&context->options.security_options,
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
&context->options.discovery_options,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TestNativeEntities : public ::testing::Test
});
options.enclave = rcutils_strdup("/", rcutils_get_default_allocator());
ASSERT_STREQ("/", options.enclave);
options.discovery_options.automatic_discovery_range = RMW_AUTOMATIC_DISCOVERY_RANGE_OFF;
ret = rmw_init(&options, &context);
ASSERT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
constexpr char node_name[] = "my_node";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantList

void on_participant_discovery(
eprosima::fastdds::dds::DomainParticipant *,
eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info) override
eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info,
bool & should_be_ignored) override
{
should_be_ignored = false;
sloretz marked this conversation as resolved.
Show resolved Hide resolved
switch (info.status) {
case eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT:
{
Expand Down Expand Up @@ -211,13 +213,20 @@ class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantList
if (RMW_RET_OK != rmw_dds_common::parse_type_hash_from_user_data(
userDataValue.data(), userDataValue.size(), type_hash))
{
// Avoid deadlock trying to acquire rclcpp's global logging mutex
// by using eProsima's logging mechanism.
// TODO(sloretz) revisit when this is fixed: https://github.com/ros2/rclcpp/issues/2147
EPROSIMA_LOG_WARNING(
"rmw_fastrtps_shared_cpp", "Failed to parse a type hash for a topic");
/*
RCUTILS_LOG_WARN_NAMED(
"rmw_fastrtps_shared_cpp",
"Failed to parse type hash for topic '%s' with type '%s' from USER_DATA '%*s'.",
proxyData.topicName().c_str(),
proxyData.typeName().c_str(),
static_cast<int>(userDataValue.size()),
reinterpret_cast<const char *>(userDataValue.data()));
*/
type_hash = rosidl_get_zero_initialized_type_hash();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ create_participant(
const char * identifier,
size_t domain_id,
const rmw_security_options_t * security_options,
bool localhost_only,
const rmw_discovery_options_t * discovery_options,
const char * enclave,
rmw_dds_common::Context * common_context);

Expand Down
80 changes: 67 additions & 13 deletions rmw_fastrtps_shared_cpp/src/participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

Expand All @@ -28,9 +28,11 @@
#include "fastdds/dds/subscriber/Subscriber.hpp"
#include "fastdds/dds/subscriber/qos/SubscriberQos.hpp"
#include "fastdds/rtps/attributes/PropertyPolicy.h"
#include "fastdds/rtps/common/Locator.h"
#include "fastdds/rtps/common/Property.h"
#include "fastdds/rtps/transport/UDPv4TransportDescriptor.h"
#include "fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h"
#include "fastrtps/utils/IPLocator.h"

#include "rcpputils/scope_exit.hpp"
#include "rcutils/env.h"
Expand Down Expand Up @@ -145,7 +147,7 @@ rmw_fastrtps_shared_cpp::create_participant(
const char * identifier,
size_t domain_id,
const rmw_security_options_t * security_options,
bool localhost_only,
const rmw_discovery_options_t * discovery_options,
const char * enclave,
rmw_dds_common::Context * common_context)
{
Expand All @@ -161,19 +163,71 @@ rmw_fastrtps_shared_cpp::create_participant(
eprosima::fastdds::dds::DomainParticipantQos domainParticipantQos =
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->get_default_participant_qos();

// Configure discovery
switch (discovery_options->automatic_discovery_range) {
sloretz marked this conversation as resolved.
Show resolved Hide resolved
case RMW_AUTOMATIC_DISCOVERY_RANGE_NOT_SET:
RMW_SET_ERROR_MSG("automatic discovery range must be set");
return nullptr;
break;
case RMW_AUTOMATIC_DISCOVERY_RANGE_OFF:
domainParticipantQos.wire_protocol().builtin.discovery_config.discoveryProtocol =
eprosima::fastrtps::rtps::DiscoveryProtocol_t::NONE;
sloretz marked this conversation as resolved.
Show resolved Hide resolved
break;
case RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST: {
// Clear the list of multicast listening locators
domainParticipantQos.wire_protocol().builtin.metatrafficMulticastLocatorList.clear();
// Add a unicast locator to prevent creation of default multicast locator
eprosima::fastrtps::rtps::Locator_t default_unicast_locator;
domainParticipantQos.wire_protocol()
.builtin.metatrafficUnicastLocatorList.push_back(default_unicast_locator);
// Add a shared memory transport
sloretz marked this conversation as resolved.
Show resolved Hide resolved
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
domainParticipantQos.transport().user_transports.push_back(shm_transport);
// Add UDP transport with max initial peers set to maximum possible value
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
udp_transport->maxInitialPeersRange = 119;
domainParticipantQos.transport().user_transports.push_back(udp_transport);
break;
}
case RMW_AUTOMATIC_DISCOVERY_RANGE_SUBNET:
// Nothing to do; use the default FastDDS behaviour
break;
case RMW_AUTOMATIC_DISCOVERY_RANGE_SYSTEM_DEFAULT:
// Nothing to do; use the default FastDDS behaviour
break;
}

if (localhost_only) {
// In order to use the interface white list, we need to disable the default transport config
domainParticipantQos.transport().use_builtin_transports = false;

// Add a UDPv4 transport with only localhost enabled
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
udp_transport->interfaceWhiteList.emplace_back("127.0.0.1");
domainParticipantQos.transport().user_transports.push_back(udp_transport);
// Add initial peers if LOCALHOST or SUBNET are used
if (
RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST == discovery_options->automatic_discovery_range ||
RMW_AUTOMATIC_DISCOVERY_RANGE_SUBNET == discovery_options->automatic_discovery_range)
sloretz marked this conversation as resolved.
Show resolved Hide resolved
{
for (size_t ii = 0; ii < discovery_options->static_peers_count; ++ii) {
eprosima::fastrtps::rtps::Locator_t peer;
auto response = eprosima::fastrtps::rtps::IPLocator::resolveNameDNS(
discovery_options->static_peers[ii].peer_address);
// Get the first returned IPv4
if (response.first.size() > 0) {
eprosima::fastrtps::rtps::IPLocator::setIPv4(peer, response.first.begin()->data());
} else {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"Unable to resolve peer %s\n",
discovery_options->static_peers[ii].peer_address);
return nullptr;
}
// Not specifying the port of the peer means FastDDS will try all
// possible participant ports according to the port calculation equation
// in the RTPS spec section 9.6.1.1, up to the number of peers specified
// in maxInitialPeersRange.
domainParticipantQos.wire_protocol().builtin.initialPeersList.push_back(peer);
}
}

// Add SHM transport if available
auto shm_transport = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
domainParticipantQos.transport().user_transports.push_back(shm_transport);
if (RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST == discovery_options->automatic_discovery_range) {
// Add localhost as a static peer
eprosima::fastrtps::rtps::Locator_t peer;
eprosima::fastrtps::rtps::IPLocator::setIPv4(peer, "127.0.0.1");
domainParticipantQos.wire_protocol().builtin.initialPeersList.push_back(peer);
Comment on lines +247 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case of RMW_AUTOMATIC_DISCOVERY_RANGE_SUBNET, if initialPeersList is not empty, we should add the default multicast address to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the initial peer list, or as a multicast locator? I assumed the latter in cf9bf83

}

size_t length = snprintf(nullptr, 0, "enclave=%s;", enclave) + 1;
Expand Down
24 changes: 17 additions & 7 deletions rmw_fastrtps_shared_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ rmw_init_options_init(
init_options->domain_id = RMW_DEFAULT_DOMAIN_ID;
init_options->security_options = rmw_get_default_security_options();
init_options->localhost_only = RMW_LOCALHOST_ONLY_DEFAULT;
return RMW_RET_OK;
init_options->discovery_options = rmw_get_zero_initialized_discovery_options();
return rmw_discovery_options_init(&(init_options->discovery_options), 0, &allocator);
}

rmw_ret_t
Expand All @@ -67,21 +68,25 @@ rmw_init_options_copy(
RMW_SET_ERROR_MSG("expected zero-initialized dst");
return RMW_RET_INVALID_ARGUMENT;
}
const rcutils_allocator_t * allocator = &src->allocator;
RCUTILS_CHECK_ALLOCATOR(allocator, return RMW_RET_INVALID_ARGUMENT);

rcutils_allocator_t allocator = src->allocator;
RCUTILS_CHECK_ALLOCATOR(&allocator, return RMW_RET_INVALID_ARGUMENT);
rmw_init_options_t tmp = *src;
tmp.enclave = rcutils_strdup(tmp.enclave, *allocator);
tmp.enclave = rcutils_strdup(tmp.enclave, allocator);
if (NULL != src->enclave && NULL == tmp.enclave) {
return RMW_RET_BAD_ALLOC;
}
tmp.security_options = rmw_get_zero_initialized_security_options();
rmw_ret_t ret =
rmw_security_options_copy(&src->security_options, allocator, &tmp.security_options);
rmw_security_options_copy(&src->security_options, &allocator, &tmp.security_options);
if (RMW_RET_OK != ret) {
allocator->deallocate(tmp.enclave, allocator->state);
allocator.deallocate(tmp.enclave, allocator.state);
return ret;
}
tmp.discovery_options = rmw_get_zero_initialized_discovery_options();
ret = rmw_discovery_options_copy(
&src->discovery_options,
&allocator,
&tmp.discovery_options);
*dst = tmp;
return RMW_RET_OK;
}
Expand All @@ -105,6 +110,11 @@ rmw_init_options_fini(const char * identifier, rmw_init_options_t * init_options

allocator->deallocate(init_options->enclave, allocator->state);
rmw_ret_t ret = rmw_security_options_fini(&init_options->security_options, allocator);
if (ret != RMW_RET_OK) {
return ret;
}

ret = rmw_discovery_options_fini(&init_options->discovery_options);
*init_options = rmw_get_zero_initialized_init_options();
return ret;
}
Expand Down