Skip to content

Commit

Permalink
Made double_take_data test work without intrusive behavior
Browse files Browse the repository at this point in the history
Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jan 24, 2024
1 parent 8c68c71 commit b5eb517
Showing 1 changed file with 201 additions and 86 deletions.
287 changes: 201 additions & 86 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,19 +410,36 @@ class TestWaitable : public rclcpp::Waitable
add_to_wait_set(rcl_wait_set_t * wait_set) override
{
rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_);
if (retrigger_guard_condition && num_unprocessed_triggers > 0) {
gc_.trigger();
}
}

void trigger()
{
num_unprocessed_triggers++;
gc_.trigger();
}

void trigger_and_hold_execute()
{
hold_execute = true;

trigger();
}

void release_execute()
{
hold_execute = false;
cv.notify_one();
}

bool
is_ready(rcl_wait_set_t * wait_set) override
{
is_ready_called_before_take_data = true;
for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) {
if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) {
is_ready_called_before_take_data = true;
return true;
}
}
Expand All @@ -438,14 +455,17 @@ class TestWaitable : public rclcpp::Waitable
}

is_ready_called_before_take_data = false;

num_unprocessed_triggers--;

return nullptr;
}

std::shared_ptr<void>
take_data_by_entity_id(size_t id) override
{
(void) id;
return nullptr;
return take_data();
}

void
Expand All @@ -462,6 +482,12 @@ class TestWaitable : public rclcpp::Waitable
throw;
}
}

if(hold_execute)
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait(lk);
}
}

void
Expand Down Expand Up @@ -496,11 +522,22 @@ class TestWaitable : public rclcpp::Waitable
return execute_promise_.get_future();
}

void enable_retriggering(bool enabled)
{
retrigger_guard_condition = enabled;
}

private:
bool is_ready_called_before_take_data = false;
bool retrigger_guard_condition = true;
std::promise<void> execute_promise_;
std::mutex execute_promise_mutex_;
std::atomic<uint> num_unprocessed_triggers = 0;
std::atomic<bool> hold_execute = false;
size_t count_ = 0;
std::condition_variable cv;
std::mutex cv_m;

rclcpp::GuardCondition gc_;
};

Expand Down Expand Up @@ -547,136 +584,209 @@ TYPED_TEST(TestExecutors, spinAll)
spinner.join();
}

TEST(TestExecutorsOnlyNode, double_take_data)
TEST(TestExecutors, double_take_data)
{
rclcpp::init(0, nullptr);

rclcpp::executors::MultiThreadedExecutor executor;

const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
rclcpp::Node::SharedPtr node = std::make_shared<rclcpp::Node>("node", test_name.str());

class MyExecutor : public rclcpp::executors::SingleThreadedExecutor
{
public:
/**
* This is a copy of Executor::get_next_executable with a callback, to test
* for a special race condition
*/
bool get_next_executable_with_callback(
rclcpp::AnyExecutable & any_executable,
std::chrono::nanoseconds timeout,
std::function<void(void)> inbetween)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
success = get_next_ready_executable(any_executable);
// If there are none
if (!success) {

inbetween();

// Wait for subscriptions or timers to work on
wait_for_work(timeout);
if (!spinning.load()) {
return false;
}
// Try again
success = get_next_ready_executable(any_executable);
}
return success;
}
auto waitable_interfaces = node->get_node_waitables_interface();

void spin_once_with_callback(
std::chrono::nanoseconds timeout,
std::function<void(void)> inbetween)
{
rclcpp::AnyExecutable any_exec;
if (get_next_executable_with_callback(any_exec, timeout, inbetween)) {
execute_any_executable(any_exec);
}
}
auto first_cbg = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
true);

};
auto third_cbg = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
true);

MyExecutor executor;
// these waitable have one job, to make the MemoryStrategy::collect_enties method take
// a long time, in order to force our race condition
std::vector<std::shared_ptr<TestWaitable>> stuffing_waitables;
std::vector<std::shared_ptr<rclcpp::CallbackGroup>> stuffing_cbgs;

for (size_t i = 0; i < 50; i++) {
auto cbg = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
true);

stuffing_cbgs.push_back(cbg);

for (int j = 0; j < 200; j++) {
auto waitable = std::make_shared<TestWaitable>();
stuffing_waitables.push_back(waitable);
waitable_interfaces->add_waitable(waitable, cbg);
}
}

// this is the callback group were wo introduce the double take
auto callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
true);


std::vector<std::shared_ptr<TestWaitable>> waitables;

auto waitable_interfaces = node->get_node_waitables_interface();
auto w3 = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(w3, third_cbg);

// First group of waitables, that gets processed.
// We use the shared count of these waitables and the callback group,
// to estimate when MemoryStrategy::collect_enties is called in the spinner thread
std::vector<std::shared_ptr<TestWaitable>> first_waitables;
auto non_triggered_in_first_cbg = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(non_triggered_in_first_cbg, first_cbg);

auto non_triggered_in_first_cbg2 = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(non_triggered_in_first_cbg2, first_cbg);


for (int i = 0; i < 3; i++) {
auto cbg_start = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(cbg_start, callback_group);

// These waitables will get triggered while cbg_start is beeing executed
for (int i = 0; i < 20; i++) {
auto waitable = std::make_shared<TestWaitable>();
waitables.push_back(waitable);
waitable_interfaces->add_waitable(waitable, callback_group);
}

// used to detect if all triggers were processed
auto cbg_end = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(cbg_end, callback_group);

executor.add_node(node);

for (auto & waitable : waitables) {
waitable->trigger();
// ref count if not reference by the internals of the executor
auto min_ref_cnt = non_triggered_in_first_cbg.use_count();
auto cbg_min_ref_cnt = first_cbg.use_count();

for (auto & w : waitables) {
EXPECT_EQ(w->get_count(), 0);
}

// a node has some default subscribers, that need to get executed first, therefore the loop
for (int i = 0; i < 10; i++) {
executor.spin_once(std::chrono::milliseconds(10));
if (waitables.front()->get_count() > 0) {
// stop execution, after the first waitable has been executed
break;
std::atomic_bool exception = false;

std::thread t([&executor, &exception]() {
try {
executor.spin();
} catch (const std::exception & e) {
exception = true;
} catch (...) {
exception = true;
}
});

size_t start_count = cbg_start->get_count();
cbg_start->trigger_and_hold_execute();

//wait until the first waitable is executed and blocks the callback_group
while (cbg_start->get_count() == start_count) {
std::this_thread::sleep_for(1ms);
}

for (auto & w : waitables) {
w->trigger();
}

// trigger w3 to make sure, the MemoryStrategy clears its internal list of ready entities
{
auto cnt = w3->get_count();
w3->trigger();
while (w3->get_count() == cnt) {
std::this_thread::sleep_for(1ms);
}
}

EXPECT_EQ(waitables.front()->get_count(), 1);
// observe the use counts of non_triggered_in_first_cbg, non_triggered_in_first_cbg2 and first_cbg
// in order to fugure out if MemoryStrategy::collect_enties is beeing calles
while (true) {
w3->trigger();
bool restart = false;

// block the callback group, this is something that may happen during multi threaded execution
// This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work
callback_group->can_be_taken_from().exchange(false);
// There should be no reference to our waitiable
while (min_ref_cnt != non_triggered_in_first_cbg.use_count() ||
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
{
}

bool no_ready_executable = false;
// wait for the callback group to be taken
while (true) {
// node and callback group ptrs are referenced
if (cbg_min_ref_cnt != first_cbg.use_count()) {
break;
}

//now there should be no ready events now,
executor.spin_once_with_callback(
std::chrono::milliseconds(10), [&]() {
no_ready_executable = true;
});
// is we got more references to the waitable, while the group pointer is not referenced, this is the wrong spot
if (min_ref_cnt != non_triggered_in_first_cbg.use_count() ||
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
{
restart = true;
break;
}
}

EXPECT_TRUE(no_ready_executable);
if (restart) {
continue;
}

//rearm, so that rmw_wait will push a second entry into the queue
for (auto & waitable : waitables) {
waitable->trigger();
}
// callback group pointer is referenced
while (true) {
// trigger criteria, both pointers were collected
if (min_ref_cnt != non_triggered_in_first_cbg.use_count() &&
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
{
break;
}

no_ready_executable = false;
// invalid, second pointer is referenced, but not first one
if (min_ref_cnt == non_triggered_in_first_cbg.use_count() &&
min_ref_cnt != non_triggered_in_first_cbg2.use_count())
{
restart = true;
break;
}

while (!no_ready_executable) {
executor.spin_once_with_callback(
std::chrono::milliseconds(10), [&]() {
//unblock the callback group
callback_group->can_be_taken_from().exchange(true);
// group or node pointer was released, while waitable poitner were not taken.
if (cbg_min_ref_cnt == first_cbg.use_count()) {
restart = true;
break;
}

no_ready_executable = true;
}
if (restart) {
continue;
}

});
break;
}
EXPECT_TRUE(no_ready_executable);

// now we process all events from get_next_ready_executable
EXPECT_NO_THROW(
for (int i = 0; i < 10; i++) {
executor.spin_once(std::chrono::milliseconds(1));
//we unblock the callback_group now, this should force the race condition
cbg_start->release_execute();

std::this_thread::yield();
std::this_thread::sleep_for(10ms);

size_t end_count = cbg_end->get_count();
cbg_end->trigger();

// wait for all triggers to be executed, or for an exception to occur
while (end_count == cbg_end->get_count() && !exception) {
std::this_thread::sleep_for(10ms);
}
);

node.reset();
EXPECT_FALSE(exception);

node.reset();
rclcpp::shutdown();
}

t.join();
}

TYPED_TEST(TestExecutorsOnlyNode, missing_event)
{
Expand Down Expand Up @@ -732,6 +842,11 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event)
executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration);
}

EXPECT_EQ(1u, my_waitable->get_count());
EXPECT_EQ(1u, my_waitable2->get_count());
//now the second waitable should get processed
executor.spin_once(std::chrono::milliseconds(10));

EXPECT_EQ(1u, my_waitable->get_count());
EXPECT_EQ(1u, my_waitable2->get_count());
}
Expand Down

0 comments on commit b5eb517

Please sign in to comment.