Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiThreadedExecutor(MutuallyExclusive) sometimes misses to execute Waitable #2012

Closed
fujitatomoya opened this issue Sep 6, 2022 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@fujitatomoya
Copy link
Collaborator

fujitatomoya commented Sep 6, 2022

Bug report

Required Info:

  • Operating System:
    • Ubuntu 22.04
  • Installation type:
    • Source code
  • Version or commit hash:
  • DDS implementation:
    • rmw_fastrtps
  • Client library (if applicable):
    • rclcpp

Steps to reproduce issue

(terminal-1) $ ros2 run prover_rclcpp intraprocess_pub_sub
(terminal-2) $ ros2 topic pub /goal std_msgs/msg/Empty {} -1

Expected behavior

image

all subscriptions should be able to receive message for each publication.

Actual behavior

sometimes subscription does not call user callback on message.

Additional information

we can avoid this problem if,

  • using SingleThreadedExecutor
  • disable intra-process communication
@fujitatomoya fujitatomoya added the bug Something isn't working label Sep 7, 2022
@fujitatomoya
Copy link
Collaborator Author

CC: @Barry-Xu-2018 @iuhilnehc-ynos

@Barry-Xu-2018
Copy link
Collaborator

Barry-Xu-2018 commented Sep 7, 2022

We attach our analysis here.
The root cause:
For one callback group with CallbackGroupType::MutuallyExclusive, while using MultiThreadedExecutor, one thread is dealing with one waitable handle, another thread cannot find executable handle since MutuallyExclusive and it will call memory_strategy_->clear_handles(); to lose waitable handle ( In rmw_wait, a subscription entity can skip wait if data isn't taken. waitable entity doesn't skip. This means waitable entity has to wait for the next trigger).

Detailed analysis as below

void
MultiThreadedExecutor::run(size_t this_thread_number)
{
  (void)this_thread_number;
  while (rclcpp::ok(this->context_) && spinning.load()) {
    rclcpp::AnyExecutable any_exec;
    {
      std::lock_guard wait_lock{wait_mutex_};
      if (!rclcpp::ok(this->context_) || !spinning.load()) {
        return;
      }
      if (!get_next_executable(any_exec, next_exec_timeout_)) {
        continue;
      }
    } // <== There is a lock to make sure only one thread can execute these codes
    if (yield_before_execute_) {
      std::this_thread::yield();
    }

    execute_any_executable(any_exec);

    // Clear the callback_group to prevent the AnyExecutable destructor from
    // resetting the callback group `can_be_taken_from`
    any_exec.callback_group.reset();
  }
}

multi-threads only can execute execute_any_executable() in parallel.

In goal_callback(), publish twice.
After Executor::wait_for_work(), there are 2 waitable handles in waitable_handles_.
One thread call get_next_ready_executable() and then call memory_strategy_->get_next_waitable() to get any_executable for one waitable handle and callback group will be 'locked' since default callback group type is MutuallyExclusive.

bool
Executor::get_next_ready_executable_from_map(...)
{
  ...
  if (success) {
    // If it is valid, check to see if the group is mutually exclusive or
    // not, then mark it accordingly ..Check if the callback_group belongs to this executor
    if (any_executable.callback_group && any_executable.callback_group->type() == \
      CallbackGroupType::MutuallyExclusive)
    {
      // It should not have been taken otherwise
      assert(any_executable.callback_group->can_be_taken_from().load());
      // Set to false to indicate something is being run from this group
      // This is reset to true either when the any_executable is executed or when the
      // any_executable is destructued
      any_executable.callback_group->can_be_taken_from().store(false);
    }
  }
}

Note that feedback_subscription and local_subscription use the default callback group (That is, use the same one).
While this thread (Thread A) leave code with lock and calls execute_any_executable(any_exec), another thread (Thread B) run code with lock (call get_next_executable() => get_next_ready_executable() => get_next_ready_executable_from_map() => get_next_waitable()).
In AllocatorMemoryStrategy::get_next_waitable(),

void
get_next_waitable(...)
{
   ...
    if (!group->can_be_taken_from().load()) {
       // Group is mutually exclusive and is being used, so skip it for now
       // Leave it to be checked next time, but continue searching
       ++it;
       continue;
    }
   ... 
}

Another waitable handle cannot be dealt with since callback group is 'locked' by thread A.
If not find an executable task, thread B will call wait_for_work().

bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
  bool success = false;
  // Check to see if there are any subscriptions or timers needing service
  // TODO(wjwwood): improve run to run efficiency of this function
  success = get_next_ready_executable(any_executable);
  // If there are none
  if (!success) {
    // Wait for subscriptions or timers to work on
    wait_for_work(timeout);
    if (!spinning.load()) {
      return false;
    }
    // Try again
    success = get_next_ready_executable(any_executable);
  }
  return success;
}

At the beginning of wait_for_work, it will clear all handles (including waitable_handles_).
intra_subscription use waitable entity (Normal subscription use subscription entity). In rmw_wait, a subscription entity can skip wait if data isn't taken. waitable entity doesn't skip. This means waitable entity has to wait for the next trigger.
This leads to trigger loss.

void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
    ...
    memory_strategy_->clear_handles();
...
}

There is a senior which publishing message feedback and local_topic all can be received.
Thread A call wait_for_work() and wait_for_work() only report one waitable. Thread A locks callback group and call execute_any_executable(). Thread B call get_next_ready_executable and nothing can be executable and call wait_for_work() and wait_for_work() report another writable. But since callback group is 'locked', nothing can be done. Thread X call get_next_ready_executable() (At this time, thread A finish execution and callback group is 'freed'), thread X find executable waitable handle and call execute_any_executable() to receive message.

@Barry-Xu-2018
Copy link
Collaborator

We have one solution, but no good.
@ivanpauno we want to hear your opinions about how to fix it.

@fujitatomoya
Copy link
Collaborator Author

CC: @alsora @mauropasse

@alsora
Copy link
Collaborator

alsora commented Sep 7, 2022

This does not seem specific to intra-process communication: the same problem could happen with any other waitable object (although intra-process communication is likely the heaviest user of this class).

It looks like there are multiple, all related, issues:

  • the wait_for_work function clears all handles even if some of them still had data
  • the get_next_waitable ignores callback groups that are locked assuming that they will be serviced later
  • the __rmw_wait ignores whether guard conditions have been serviced or not.

The first thing I want to point out is that all the involved code is "removed" if using the events executor.
Implementing a multi-threaded events executor is possible and it would solve the problems (while also being extremely more CPU efficient).

This problem only happens with a mutually exclusive callback group, so what is the expected behavior here?
I can see two options:

  1. the waitable object shouldn't be cleared if it wasn't serviced.
  2. the thread calling get_next_waitable should remain blocked until it can access the callback group.

These two options are very different and essentially translate to either a lock-free or locked implementation.
With option 1, the executor threads are never blocked. If an entity can't be accessed it goes back to the "cache of pending entities" and it will be checked again at the next iteration.
With option 2, executor threads will be blocked until the previous consumer is done using the callback group.

I have to admit that I never used the ROS multi-threaded executor in a real application (i used it only for some benchmarking and I found it less efficient than having multiple single-threaded executors), so I don't know much about its design.
However, I don't see the advantage for the multi-threaded executor threads to be "lock-free": the scenario where a mutually exclusive callback group has 2 active entities at the same time will result in a thread "busy-waiting" until it can finally access the group, by continuously trying to get the entity. This seems terribly inefficient.
Considering that the threads involved here are not "application thread" but rather they are internal threads of the executor, I think that users don't care about whether they block each others or not, as long as entities are serviced in the fastest and most efficient way.

Having said that, I can see many different ways to "fix" this problem:

  • use a different executor that does not rely on waitset
  • rework the multi threaded executor to not busy-wait on entities, but rather block until a group can eventually be accessed.
  • rework rmw_wait to skip waiting if guard conditions haven't been serviced
  • rework waitables at the rclcpp/rcl layer to store information about whether they have been serviced or not.

@Barry-Xu-2018
Copy link
Collaborator

This does not seem specific to intra-process communication: the same problem could happen with any other waitable object (although intra-process communication is likely the heaviest user of this class).

Yes.
Thanks for your summary.

Having said that, I can see many different ways to "fix" this problem:
...

  • rework rmw_wait to skip waiting if guard conditions haven't been serviced

This leads each DDS needs to change.
DDS need to ask upper layer if this condition is handled.
So a bit unreasonable to ask DDS to handle this.

@fujitatomoya
Copy link
Collaborator Author

@alsora appreciate for quick response and information.

This does not seem specific to intra-process communication: the same problem could happen with any other waitable object (although intra-process communication is likely the heaviest user of this class).

agree, i will change the subject accordingly.

@fujitatomoya fujitatomoya changed the title MultiThreadedExecutor does not work with intra-process communication MultiThreadedExecutor(MutuallyExclusive) sometimes misses to execute Waitable Sep 7, 2022
@fujitatomoya
Copy link
Collaborator Author

the thread calling get_next_waitable should remain blocked until it can access the callback group.

this blocks everything else, until this callback group completes the callback which might take a while.

rework rmw_wait to skip waiting if guard conditions haven't been serviced

i think this sounds reasonable.

DDS need to ask upper layer if this condition is handled.

(correction, it is rmw implementation but DDS.)

(just a idea) what about rclcpp lets rmw implementation can skip the rmw_wait for Waitable? so that we can take advantage of anything else from current code?
this will require busy-loop but it is the same behavior with other entity that can skip the rmw_wait, unless we have a way to notify the callbackgroup when the task is completed and ready to take the next one.

This problem only happens with a mutually exclusive callback group

as a work-around, we can use Reentrant against this particular problem.

@roncapat
Copy link

Hello, I unfortunately noticed some callback do not fire under heavy load using multi threaded container and full intra-process solution with a bunch of components. Are there any planned actions to solve this problem? Any updates?

@fujitatomoya
Copy link
Collaborator Author

@roncapat can you try #2109 that would not be the final fix but it should work against this problem.

@roncapat
Copy link

@fujitatomoya, just did a quick test and it seems to work as expected👍👍

@fujitatomoya
Copy link
Collaborator Author

addressed by #2109, closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants