Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement intra process communications for Pub/Sub #73

Merged
merged 6 commits into from
Aug 21, 2015
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3)
project(rclcpp)

find_package(ament_cmake REQUIRED)
find_package(rcl_interfaces REQUIRED)

ament_export_dependencies(rmw)
ament_export_dependencies(rcl_interfaces)
Expand All @@ -12,6 +13,16 @@ ament_export_include_directories(include)
if(AMENT_ENABLE_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()

if(NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra")
endif()

include_directories(include)

ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be wrapped in a condition:

if(TARGET test_intra_process_manager)

endif()

ament_package(
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct AnyExecutable
{}
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
Expand Down
39 changes: 37 additions & 2 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,61 @@
#ifndef RCLCPP_RCLCPP_CONTEXT_HPP_
#define RCLCPP_RCLCPP_CONTEXT_HPP_

#include <rclcpp/macros.hpp>

#include <iostream>

#include <memory>
#include <mutex>
#include <typeinfo>
#include <typeindex>
#include <unordered_map>

#include <rclcpp/macros.hpp>
#include <rmw/rmw.h>

namespace rclcpp
{

namespace context
{

/* ROS Context */
class Context
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(Context);

Context() {}

template<typename SubContext, typename ... Args>
std::shared_ptr<SubContext>
get_sub_context(Args && ... args)
{
std::lock_guard<std::mutex> lock(mutex_);

std::type_index type_i(typeid(SubContext));
std::shared_ptr<SubContext> sub_context;
auto it = sub_contexts_.find(type_i);
if (it == sub_contexts_.end()) {
// It doesn't exist yet, make it
sub_context = std::shared_ptr<SubContext>(
new SubContext(std::forward<Args>(args) ...),
[] (SubContext * sub_context_ptr) {
delete sub_context_ptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the custom deleter here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it stores it as a void * and I believe it is necessary to ensure that the destructor is called even if the last reference is a shared_ptr of type void *, but I don't know that for sure.

});
sub_contexts_[type_i] = sub_context;
} else {
// It exists, get it out and cast it.
sub_context = std::static_pointer_cast<SubContext>(it->second);
}
return sub_context;
}

private:
RCLCPP_DISABLE_COPY(Context);

std::unordered_map<std::type_index, std::shared_ptr<void>> sub_contexts_;
std::mutex mutex_;

};

} /* namespace context */
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/contexts/default_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class DefaultContext : public rclcpp::context::Context

};

DefaultContext::SharedPtr
get_global_default_context()
{
static DefaultContext::SharedPtr default_context = DefaultContext::make_shared();
return default_context;
}

} // namespace default_context
} // namespace contexts
} // namespace rclcpp
Expand Down
68 changes: 53 additions & 15 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_

#include <iostream>

#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <iostream>
#include <list>
#include <memory>
#include <vector>

#include <rcl_interfaces/msg/intra_process_message.hpp>

#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
Expand Down Expand Up @@ -159,6 +160,9 @@ class Executor
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
}
Expand Down Expand Up @@ -194,6 +198,24 @@ class Executor
subscription->return_message(message);
}

static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for & here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's ok though, and the other functions do it this way. I'd prefer to have a different pull request fix all the functions if this is really better.

{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
}
} else {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
}

static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr & timer)
Expand Down Expand Up @@ -293,22 +315,24 @@ class Executor
}));
}
// Use the number of subscriptions to allocate memory in the handles
size_t number_of_subscriptions = subs.size();
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions;
subscriber_handles.subscriber_count = 0;
// TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers =
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
subscriber_handles.subscribers = memory_strategy_->borrow_handles(
HandleType::subscription_handle, max_number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers.");
}
// Then fill the SubscriberHandles with ready subscriptions
size_t subscriber_handle_index = 0;
for (auto & subscription : subs) {
subscriber_handles.subscribers[subscriber_handle_index] = \
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->subscription_handle_->data;
subscriber_handle_index += 1;
if (subscription->intra_process_subscription_handle_) {
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->intra_process_subscription_handle_->data;
}
}

// Use the number of services to allocate memory in the handles
Expand Down Expand Up @@ -414,7 +438,7 @@ class Executor
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i) {
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
subscriber_handles_.push_back(handle);
Expand Down Expand Up @@ -463,13 +487,18 @@ class Executor
}
for (auto & weak_subscription : group->subscription_ptrs_) {
auto subscription = weak_subscription.lock();
if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
if (subscription) {
if (subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
}
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
return subscription;
}
}
}
}
}
return rclcpp::subscription::SubscriptionBase::SharedPtr();
return nullptr;
}

rclcpp::service::ServiceBase::SharedPtr
Expand Down Expand Up @@ -653,6 +682,11 @@ class Executor
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->intra_process_subscription_handle_) {
is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group) {
Expand All @@ -668,7 +702,11 @@ class Executor
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
subscriber_handles_.erase(it++);
Expand Down Expand Up @@ -804,7 +842,7 @@ class Executor
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
if (any_exec->subscription) {
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready
Expand Down
Loading