Skip to content

Commit

Permalink
Add rmw_publisher_wait_for_all_acked support (#519)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Jun 2, 2021
1 parent 604da87 commit b19cdcd
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 18 deletions.
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
eprosima_fastrtps_identifier, publisher);
}

rmw_ret_t
rmw_publisher_wait_for_all_acked(const rmw_publisher_t * publisher, rmw_time_t wait_timeout)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_wait_for_all_acked(
eprosima_fastrtps_identifier, publisher, wait_timeout);
}

rmw_ret_t
rmw_publisher_get_actual_qos(
const rmw_publisher_t * publisher,
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
eprosima_fastrtps_identifier, publisher);
}

rmw_ret_t
rmw_publisher_wait_for_all_acked(const rmw_publisher_t * publisher, rmw_time_t wait_timeout)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_wait_for_all_acked(
eprosima_fastrtps_identifier, publisher, wait_timeout);
}

rmw_ret_t
rmw_publisher_get_actual_qos(
const rmw_publisher_t * publisher,
Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ add_library(rmw_fastrtps_shared_cpp
src/rmw_wait.cpp
src/rmw_wait_set.cpp
src/subscription.cpp
src/time_utils.cpp
src/TypeSupport_impl.cpp
src/utils.cpp
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ __rmw_publisher_assert_liveliness(
const char * identifier,
const rmw_publisher_t * publisher);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_wait_for_all_acked(
const char * identifier,
const rmw_publisher_t * publisher,
rmw_time_t wait_timeout);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_destroy_publisher(
Expand Down
24 changes: 6 additions & 18 deletions rmw_fastrtps_shared_cpp/src/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,10 @@
#include "fastdds/dds/publisher/qos/DataWriterQos.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/qos/TopicQos.hpp"
#include "fastdds/rtps/common/Time_t.h"

#include "rmw/error_handling.h"
#include "rmw_dds_common/time_utils.hpp"

static
eprosima::fastrtps::Duration_t
rmw_time_to_fastrtps(const rmw_time_t & time)
{
if (rmw_time_equal(time, RMW_DURATION_INFINITE)) {
return eprosima::fastrtps::rtps::c_RTPSTimeInfinite.to_duration_t();
}

rmw_time_t clamped_time = rmw_dds_common::clamp_rmw_time_to_dds_time(time);
return eprosima::fastrtps::Duration_t(
static_cast<int32_t>(clamped_time.sec),
static_cast<uint32_t>(clamped_time.nsec));
}
#include "time_utils.hpp"

static
bool
Expand Down Expand Up @@ -119,11 +105,13 @@ bool fill_entity_qos_from_profile(
}

if (!is_rmw_duration_unspecified(qos_policies.lifespan)) {
entity_qos.lifespan().duration = rmw_time_to_fastrtps(qos_policies.lifespan);
entity_qos.lifespan().duration =
rmw_fastrtps_shared_cpp::rmw_time_to_fastrtps(qos_policies.lifespan);
}

if (!is_rmw_duration_unspecified(qos_policies.deadline)) {
entity_qos.deadline().period = rmw_time_to_fastrtps(qos_policies.deadline);
entity_qos.deadline().period =
rmw_fastrtps_shared_cpp::rmw_time_to_fastrtps(qos_policies.deadline);
}

switch (qos_policies.liveliness) {
Expand All @@ -141,7 +129,7 @@ bool fill_entity_qos_from_profile(
}
if (!is_rmw_duration_unspecified(qos_policies.liveliness_lease_duration)) {
entity_qos.liveliness().lease_duration =
rmw_time_to_fastrtps(qos_policies.liveliness_lease_duration);
rmw_fastrtps_shared_cpp::rmw_time_to_fastrtps(qos_policies.liveliness_lease_duration);

// Docs suggest setting no higher than 0.7 * lease_duration, choosing 2/3 to give safe buffer.
// See doc at https://github.com/eProsima/Fast-RTPS/blob/
Expand Down
27 changes: 27 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "rmw_fastrtps_shared_cpp/rmw_context_impl.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

#include "time_utils.hpp"

namespace rmw_fastrtps_shared_cpp
{
rmw_ret_t
Expand Down Expand Up @@ -119,6 +121,31 @@ __rmw_publisher_assert_liveliness(
return RMW_RET_OK;
}

rmw_ret_t
__rmw_publisher_wait_for_all_acked(
const char * identifier,
const rmw_publisher_t * publisher,
rmw_time_t wait_timeout)
{
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

auto info = static_cast<CustomPublisherInfo *>(publisher->data);

eprosima::fastrtps::Duration_t timeout = rmw_time_to_fastrtps(wait_timeout);

ReturnCode_t ret = info->data_writer_->wait_for_acknowledgments(timeout);
if (ReturnCode_t::RETCODE_OK == ret) {
return RMW_RET_OK;
}

return RMW_RET_TIMEOUT;
}

rmw_ret_t
__rmw_publisher_get_actual_qos(
const rmw_publisher_t * publisher,
Expand Down
35 changes: 35 additions & 0 deletions rmw_fastrtps_shared_cpp/src/time_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw_dds_common/time_utils.hpp"

#include "time_utils.hpp"

namespace rmw_fastrtps_shared_cpp
{

eprosima::fastrtps::Duration_t
rmw_time_to_fastrtps(const rmw_time_t & time)
{
if (rmw_time_equal(time, RMW_DURATION_INFINITE)) {
return eprosima::fastrtps::rtps::c_RTPSTimeInfinite.to_duration_t();
}

rmw_time_t clamped_time = rmw_dds_common::clamp_rmw_time_to_dds_time(time);
return eprosima::fastrtps::Duration_t(
static_cast<int32_t>(clamped_time.sec),
static_cast<uint32_t>(clamped_time.nsec));
}

} // namespace rmw_fastrtps_shared_cpp
27 changes: 27 additions & 0 deletions rmw_fastrtps_shared_cpp/src/time_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef TIME_UTILS_HPP_
#define TIME_UTILS_HPP_

#include "fastdds/rtps/common/Time_t.h"

namespace rmw_fastrtps_shared_cpp
{

eprosima::fastrtps::Duration_t rmw_time_to_fastrtps(const rmw_time_t & time);

} // namespace rmw_fastrtps_shared_cpp

#endif // TIME_UTILS_HPP_

0 comments on commit b19cdcd

Please sign in to comment.