Skip to content

Commit

Permalink
Get network flows of publishers and subscriptions
Browse files Browse the repository at this point in the history
Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>
  • Loading branch information
Ananya Muddukrishna committed Dec 14, 2020
1 parent fc6c364 commit 56798e9
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 0 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/message_info.cpp
src/rclcpp/network_flow.cpp
src/rclcpp/node.cpp
src/rclcpp/node_options.cpp
src/rclcpp/node_interfaces/node_base.cpp
Expand Down
98 changes: 98 additions & 0 deletions rclcpp/include/rclcpp/network_flow.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__NETWORK_FLOW_HPP_
#define RCLCPP__NETWORK_FLOW_HPP_

#include <cstdint>
#include <string>
#include <iostream>

#include "rcl/network_flow.h"

#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

/**
* Class describes endpoints of network flow such as transport protocol,
* internet protocol, transport port, flow label, and internet address
*/
class NetworkFlow
{
public:
/// Construct from rcl_network_flow_t
RCLCPP_PUBLIC
explicit NetworkFlow(const rcl_network_flow_t & network_flow)
: transport_protocol_(
rcl_network_flow_get_transport_protocol_string(network_flow.transport_protocol)),
internet_protocol_(
rcl_network_flow_get_internet_protocol_string(network_flow.internet_protocol)),
transport_port_(network_flow.transport_port),
flow_label_(network_flow.flow_label),
internet_address_(network_flow.internet_address)
{
}

/// Get transport protocol
RCLCPP_PUBLIC
const std::string & transport_protocol() const;

/// Get internet protocol
RCLCPP_PUBLIC
const std::string & internet_protocol() const;

/// Get transport port
RCLCPP_PUBLIC
uint16_t transport_port() const;

/// Get flow label
RCLCPP_PUBLIC
uint32_t flow_label() const;

/// Get internet address
RCLCPP_PUBLIC
const std::string & internet_address() const;

/// Compare two NetworkFlow instances
friend bool operator==(const NetworkFlow & left, const NetworkFlow & right);
friend bool operator!=(const NetworkFlow & left, const NetworkFlow & right);

/// Streaming helper
friend std::ostream & operator<<(std::ostream & os, const NetworkFlow & network_flow);

private:
std::string transport_protocol_;
std::string internet_protocol_;
uint16_t transport_port_;
uint32_t flow_label_;
std::string internet_address_;
};

/// Check if two NetworkFlow instances are equal
RCLCPP_PUBLIC
bool operator==(const NetworkFlow & left, const NetworkFlow & right);

/// Check if two NetworkFlow instances are not equal
RCLCPP_PUBLIC
bool operator!=(const NetworkFlow & left, const NetworkFlow & right);

/// Streaming helper
RCLCPP_PUBLIC
std::ostream & operator<<(std::ostream & os, const NetworkFlow & network_flow);

} // namespace rclcpp

#endif // RCLCPP__NETWORK_FLOW_HPP_
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcl/publisher.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp"
Expand Down Expand Up @@ -193,6 +194,15 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
uint64_t intra_process_publisher_id,
IntraProcessManagerSharedPtr ipm);

/// Get network flow
/**
* Describes network flows that this publisher is sending messages out on
* \return vector of NetworkFlow
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlow>
get_network_flow() const;

protected:
template<typename EventCallbackT>
void
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/network_flow.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
Expand Down Expand Up @@ -263,6 +264,15 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
bool
exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);

/// Get network flow
/**
* Describes network flows that this subscription is receiving messages in on
* \return vector of NetworkFlow
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlow>
get_network_flow() const;

protected:
template<typename EventCallbackT>
void
Expand Down
76 changes: 76 additions & 0 deletions rclcpp/src/rclcpp/network_flow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>

#include "rclcpp/network_flow.hpp"

namespace rclcpp
{

const std::string &
NetworkFlow::transport_protocol() const
{
return transport_protocol_;
}

const std::string &
NetworkFlow::internet_protocol() const
{
return internet_protocol_;
}

uint16_t NetworkFlow::transport_port() const
{
return transport_port_;
}

uint32_t NetworkFlow::flow_label() const
{
return flow_label_;
}

const std::string &
NetworkFlow::internet_address() const
{
return internet_address_;
}

bool operator==(const NetworkFlow & left, const NetworkFlow & right)
{
return left.transport_protocol_ == right.transport_protocol_ &&
left.internet_protocol_ == right.internet_protocol_ &&
left.transport_port_ == right.transport_port_ &&
left.flow_label_ == right.flow_label_ &&
left.internet_address_ == right.internet_address_;
}

bool operator!=(const NetworkFlow & left, const NetworkFlow & right)
{
return !(left == right);
}

std::ostream & operator<<(std::ostream & os, const NetworkFlow & network_flow)
{
os << '{' <<
"\"transportProtocol\": " << network_flow.transport_protocol_ << ", " <<
"\"transportPort\": " << network_flow.transport_port_ << ", " <<
"\"internetProtocol\": " << network_flow.internet_protocol_ << ", " <<
"\"internetAddress\": " << network_flow.internet_address_ << ", " <<
"\"flowLabel\": " << network_flow.flow_label_ << ", " <<
'}';
return os;
}

} // namespace rclcpp
32 changes: 32 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/qos_event.hpp"

Expand Down Expand Up @@ -268,3 +269,34 @@ PublisherBase::default_incompatible_qos_callback(
get_topic_name(),
policy_name.c_str());
}

std::vector<rclcpp::NetworkFlow> PublisherBase::get_network_flow() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_array_t network_flow_array = rcl_get_zero_initialized_network_flow_array();
rcl_ret_t ret = rcl_publisher_get_network_flow(
publisher_handle_.get(), &allocator, &network_flow_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("error obtaining network flows of publisher: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK != rcl_network_flow_array_fini(&network_flow_array, &allocator)) {
error_msg += std::string(", also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlow> network_flow_vector;
for (size_t i = 0; i < network_flow_array.size; ++i) {
network_flow_vector.push_back(rclcpp::NetworkFlow(network_flow_array.network_flow[i]));
}

ret = rcl_network_flow_array_fini(&network_flow_array, &allocator);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_vector;
}
31 changes: 31 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,34 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
}
throw std::runtime_error("given pointer_to_subscription_part does not match any part");
}

std::vector<rclcpp::NetworkFlow> SubscriptionBase::get_network_flow() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_array_t network_flow_array = rcl_get_zero_initialized_network_flow_array();
rcl_ret_t ret = rcl_subscription_get_network_flow(
subscription_handle_.get(), &allocator, &network_flow_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("Error obtaining network flows of subscription: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK != rcl_network_flow_array_fini(&network_flow_array, &allocator)) {
error_msg += std::string(". Also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlow> network_flow_vector;
for (size_t i = 0; i < network_flow_array.size; ++i) {
network_flow_vector.push_back(rclcpp::NetworkFlow(network_flow_array.network_flow[i]));
}

ret = rcl_network_flow_array_fini(&network_flow_array, &allocator);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_vector;
}
30 changes: 30 additions & 0 deletions rclcpp/test/rclcpp/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,33 @@ TEST_F(TestPublisher, run_event_handlers) {
EXPECT_THROW(handler->execute(data), std::runtime_error);
}
}

TEST_F(TestPublisher, get_network_flow_errors) {
initialize();
const rclcpp::QoS publisher_qos(1);
auto publisher = node->create_publisher<test_msgs::msg::Empty>("topic", publisher_qos);

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_publisher_get_network_flow, RCL_RET_ERROR);
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_ERROR);
EXPECT_THROW(
publisher->get_network_flow(),
rclcpp::exceptions::RCLError);
}
{
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_ERROR);
EXPECT_THROW(
publisher->get_network_flow(),
rclcpp::exceptions::RCLError);
}
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_publisher_get_network_flow, RCL_RET_OK);
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_OK);
EXPECT_NO_THROW(publisher->get_network_flow());
}
}
34 changes: 34 additions & 0 deletions rclcpp/test/rclcpp/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,3 +510,37 @@ INSTANTIATE_TEST_SUITE_P(
TestSubscriptionThrows, TestSubscriptionInvalidIntraprocessQos,
::testing::ValuesIn(invalid_qos_profiles()),
::testing::PrintToStringParamName());

TEST_F(TestSubscription, get_network_flow_errors) {
initialize();
const rclcpp::QoS subscription_qos(1);
auto subscription_callback = [](const test_msgs::msg::Empty::SharedPtr msg) {
(void)msg;
};
auto subscription = node->create_subscription<test_msgs::msg::Empty>(
"topic", subscription_qos, subscription_callback);

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_get_network_flow, RCL_RET_ERROR);
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_ERROR);
EXPECT_THROW(
subscription->get_network_flow(),
rclcpp::exceptions::RCLError);
}
{
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_ERROR);
EXPECT_THROW(
subscription->get_network_flow(),
rclcpp::exceptions::RCLError);
}
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_get_network_flow, RCL_RET_OK);
auto mock_network_flow_array_fini = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_network_flow_array_fini, RCL_RET_OK);
EXPECT_NO_THROW(subscription->get_network_flow());
}
}

0 comments on commit 56798e9

Please sign in to comment.