Skip to content

Commit

Permalink
Refactor to remove and
Browse files Browse the repository at this point in the history
  • Loading branch information
nnmm committed Jun 23, 2022
1 parent e0dad8b commit 776a0bc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 74 deletions.
5 changes: 3 additions & 2 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
};
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;

for live_subscription in &live_subscriptions {
wait_set.add_subscription(live_subscription.clone())?;
for live_subscription in live_subscriptions {
// SAFETY: TODO in issue #
unsafe { live_subscription.add_to_wait_set(&mut wait_set)? };
}

let ready_entities = wait_set.wait(timeout)?;
Expand Down
8 changes: 4 additions & 4 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use self::publisher::*;
pub use self::subscription::*;

use crate::rcl_bindings::*;
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult};
use crate::{Context, ParameterOverrideMap, QoSProfile, RclrsError, ToResult, Waitable};

use std::cmp::PartialEq;
use std::ffi::CStr;
Expand Down Expand Up @@ -68,7 +68,7 @@ unsafe impl Send for rcl_node_t {}
pub struct Node {
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
pub(crate) subscriptions: Vec<Weak<dyn Waitable>>,
_parameter_map: ParameterOverrideMap,
}

Expand Down Expand Up @@ -205,12 +205,12 @@ impl Node {
{
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
self.subscriptions
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
.push(Arc::downgrade(&subscription) as Weak<dyn Waitable>);
Ok(subscription)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn Waitable>> {
self.subscriptions
.iter()
.filter_map(Weak::upgrade)
Expand Down
57 changes: 22 additions & 35 deletions rclrs/src/node/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::error::{RclReturnCode, ToResult};
use crate::qos::QoSProfile;
use crate::Node;
use crate::{rcl_bindings::*, RclrsError};
use crate::{Node, QoSProfile, WaitSet, Waitable};

use std::boxed::Box;
use std::ffi::CStr;
Expand All @@ -11,25 +10,13 @@ use std::sync::Arc;

use rosidl_runtime_rs::{Message, RmwMessage};

use parking_lot::{Mutex, MutexGuard};
use parking_lot::Mutex;

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_subscription_t {}

/// Internal struct used by subscriptions.
pub struct SubscriptionHandle {
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
}

impl SubscriptionHandle {
pub(crate) fn lock(&self) -> MutexGuard<rcl_subscription_t> {
self.rcl_subscription_mtx.lock()
}
}

impl Drop for SubscriptionHandle {
impl<T: Message> Drop for Subscription<T> {
fn drop(&mut self) {
let rcl_subscription = self.rcl_subscription_mtx.get_mut();
let rcl_node = &mut *self.rcl_node_mtx.lock();
Expand All @@ -40,14 +27,6 @@ impl Drop for SubscriptionHandle {
}
}

/// Trait to be implemented by concrete [`Subscription`]s.
pub trait SubscriptionBase: Send + Sync {
/// Internal function to get a reference to the `rcl` handle.
fn handle(&self) -> &SubscriptionHandle;
/// Tries to take a new message and run the callback with it.
fn execute(&self) -> Result<(), RclrsError>;
}

/// Struct for receiving messages of type `T`.
///
/// There can be multiple subscriptions for the same topic, in different nodes or the same node.
Expand All @@ -63,7 +42,8 @@ pub struct Subscription<T>
where
T: Message,
{
pub(crate) handle: Arc<SubscriptionHandle>,
rcl_subscription_mtx: Mutex<rcl_subscription_t>,
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
/// The callback function that runs when a message was received.
pub callback: Mutex<Box<dyn FnMut(T) + 'static + Send>>,
message: PhantomData<T>,
Expand Down Expand Up @@ -113,13 +93,9 @@ where
.ok()?;
}

let handle = Arc::new(SubscriptionHandle {
Ok(Self {
rcl_subscription_mtx: Mutex::new(rcl_subscription),
rcl_node_mtx: node.rcl_node_mtx.clone(),
});

Ok(Self {
handle,
callback: Mutex::new(Box::new(callback)),
message: PhantomData,
})
Expand All @@ -133,7 +109,8 @@ where
// SAFETY: No preconditions for the function used
// The unsafe variables get converted to safe types before being returned
unsafe {
let raw_topic_pointer = rcl_subscription_get_topic_name(&*self.handle.lock());
let raw_topic_pointer =
rcl_subscription_get_topic_name(&*self.rcl_subscription_mtx.lock());
CStr::from_ptr(raw_topic_pointer)
.to_string_lossy()
.into_owned()
Expand Down Expand Up @@ -164,7 +141,7 @@ where
// ```
pub fn take(&self) -> Result<T, RclrsError> {
let mut rmw_message = <T as Message>::RmwMsg::default();
let rcl_subscription = &mut *self.handle.lock();
let rcl_subscription = &mut *self.rcl_subscription_mtx.lock();
let ret = unsafe {
// SAFETY: The first two pointers are valid/initialized, and do not need to be valid
// beyond the function call.
Expand All @@ -181,12 +158,22 @@ where
}
}

impl<T> SubscriptionBase for Subscription<T>
impl<T> Waitable for Subscription<T>
where
T: Message,
{
fn handle(&self) -> &SubscriptionHandle {
&self.handle
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError> {
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
// for as long as the wait set exists, because it's stored in self.subscriptions.
// Passing in a null pointer for the third argument is explicitly allowed.
rcl_wait_set_add_subscription(
&mut wait_set.rcl_wait_set,
&*self.rcl_subscription_mtx.lock(),
std::ptr::null_mut(),
)
.ok()?;
wait_set.subscriptions.push(self);
Ok(())
}

fn execute(&self) -> Result<(), RclrsError> {
Expand Down
64 changes: 31 additions & 33 deletions rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,55 @@

use crate::error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult};
use crate::rcl_bindings::*;
use crate::{Context, SubscriptionBase};
use crate::Context;

use std::sync::Arc;
use std::time::Duration;
use std::vec::Vec;

use parking_lot::Mutex;

/// Trait to be implemented by entities that can be waited on, like a [`Subscription`][1].
///
/// [1]: crate::Subscription
pub trait Waitable: Send + Sync {
/// Adds itself to the given wait set.
///
/// This will return an error if the number of waitables of this kind in the wait set is larger
/// than the capacity set in [`WaitSet::new`].
///
/// # Safety
///
/// The same waitable must not be added to multiple wait sets, because that would make it
/// unsafe to simultaneously wait on those wait sets. Quoting from the rcl docs:
/// "This function is thread-safe for unique wait sets with unique contents.
/// This function cannot operate on the same wait set in multiple threads, and
/// the wait sets may not share content.
/// For example, calling `rcl_wait()` in two threads on two different wait sets
/// that both contain a single, shared guard condition is undefined behavior."
///
/// This function is unsafe because the implementation must ensure that the item is not yet in a
/// different wait set.
unsafe fn add_to_wait_set(self: Arc<Self>, wait_set: &mut WaitSet) -> Result<(), RclrsError>;
/// Tries to take a new message and run the callback with it.
fn execute(&self) -> Result<(), RclrsError>;
}

/// A struct for waiting on subscriptions and other waitable entities to become ready.
pub struct WaitSet {
rcl_wait_set: rcl_wait_set_t,
pub(crate) rcl_wait_set: rcl_wait_set_t,
// Used to ensure the context is alive while the wait set is alive.
_rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
// The subscriptions that are currently registered in the wait set.
// This correspondence is an invariant that must be maintained by all functions,
// even in the error case.
subscriptions: Vec<Arc<dyn SubscriptionBase>>,
pub(crate) subscriptions: Vec<Arc<dyn Waitable>>,
}

/// A list of entities that are ready, returned by [`WaitSet::wait`].
pub struct ReadyEntities {
/// A list of subscriptions that have potentially received messages.
pub subscriptions: Vec<Arc<dyn SubscriptionBase>>,
pub subscriptions: Vec<Arc<dyn Waitable>>,
}

impl Drop for rcl_wait_set_t {
Expand All @@ -56,7 +82,7 @@ impl WaitSet {
/// Creates a new wait set.
///
/// The given number of subscriptions is a capacity, corresponding to how often
/// [`WaitSet::add_subscription`] may be called.
/// [`Waitable::add_to_wait_set`] may be called.
pub fn new(number_of_subscriptions: usize, context: &Context) -> Result<Self, RclrsError> {
let rcl_wait_set = unsafe {
// SAFETY: Getting a zero-initialized value is always safe
Expand Down Expand Up @@ -98,34 +124,6 @@ impl WaitSet {
debug_assert_eq!(ret, 0);
}

/// Adds a subscription to the wait set.
///
/// It is possible, but not useful, to add the same subscription twice.
///
/// This will return an error if the number of subscriptions in the wait set is larger than the
/// capacity set in [`WaitSet::new`].
///
/// The same subscription must not be added to multiple wait sets, because that would make it
/// unsafe to simultaneously wait on those wait sets.
pub fn add_subscription(
&mut self,
subscription: Arc<dyn SubscriptionBase>,
) -> Result<(), RclrsError> {
unsafe {
// SAFETY: I'm not sure if it's required, but the subscription pointer will remain valid
// for as long as the wait set exists, because it's stored in self.subscriptions.
// Passing in a null pointer for the third argument is explicitly allowed.
rcl_wait_set_add_subscription(
&mut self.rcl_wait_set,
&*subscription.handle().lock(),
std::ptr::null_mut(),
)
}
.ok()?;
self.subscriptions.push(subscription);
Ok(())
}

/// Blocks until the wait set is ready, or until the timeout has been exceeded.
///
/// If the timeout is `None` then this function will block indefinitely until
Expand Down

0 comments on commit 776a0bc

Please sign in to comment.