Skip to content

Commit

Permalink
mcu-util: explicitly join on task handles (#125)
Browse files Browse the repository at this point in the history
* mcu-interface: fix blocking, add kill for can_rx task, fix ack

* mcu-interface: use unbounded channel

* mcu-interface: introduce explicit task join handle

* fix typo

* switch info to debug

* mcu-util: kill tasks before joining

* improve task joining
  • Loading branch information
TheButlah committed Jun 12, 2024
1 parent 852886e commit 95dd9db
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 130 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions mcu-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ rust-version.workspace = true
async-trait = "0.1.77"
can-rs = { path = "../can", features = ["isotp"] }
color-eyre.workspace = true
futures.workspace = true
orb-messages.workspace = true
prost = "0.12.3"
tokio.workspace = true
pin-project = "1.1.5"
thiserror.workspace = true
tokio-serial = "5.4.1"
tokio.workspace = true
tracing.workspace = true
futures.workspace = true

[package.metadata.orb]
unsupported_targets = [
Expand Down
16 changes: 11 additions & 5 deletions mcu-interface/examples/log_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ async fn main() -> Result<()> {
.init();

let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel();
let _iface = CanRawMessaging::new(String::from("can0"), Device::Security, msg_tx)
.wrap_err("failed to create messaging interface")?;
let (iface, task_handle) =
CanRawMessaging::new(String::from("can0"), Device::Security, msg_tx)
.wrap_err("failed to create messaging interface")?;

let recv_fut = async {
while let Some(msg) = msg_rx.recv().await {
Expand All @@ -29,8 +30,13 @@ async fn main() -> Result<()> {
};

tokio::select! {
() = recv_fut => Ok(()),
result = tokio::signal::ctrl_c() => { println!("ctrl-c detected"); result.wrap_err("failed to listen for ctrl-c")}

() = recv_fut => (),
result = tokio::signal::ctrl_c() => {
println!("ctrl-c detected");
result.wrap_err("failed to listen for ctrl-c")?;
}
}

drop(iface);
task_handle.await.wrap_err("can task terminated uncleanly")
}
53 changes: 41 additions & 12 deletions mcu-interface/src/can/canfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use color_eyre::eyre::{eyre, Context, Result};
use futures::FutureExt as _;
use orb_messages::CommonAckError;
use prost::Message;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -18,7 +19,7 @@ use crate::{
MessagingInterface,
};

use super::ACK_RX_TIMEOUT;
use super::{CanTaskHandle, CanTaskJoinError, CanTaskPanic, ACK_RX_TIMEOUT};

pub struct CanRawMessaging {
stream: FrameStream<CANFD_DATA_LEN>,
Expand All @@ -31,12 +32,15 @@ pub struct CanRawMessaging {

impl CanRawMessaging {
/// CanRawMessaging opens a CAN stream filtering messages addressed only to the Jetson
/// and start listening for incoming messages in a new blocking thread
/// and start listening for incoming messages in a new blocking thread.
///
/// Returns a handle to join on the blocking thread and retrieve any errors it
/// produces.
pub fn new(
bus: String,
can_node: Device,
new_message_queue: mpsc::UnboundedSender<McuPayload>,
) -> Result<Self> {
) -> Result<(Self, CanTaskHandle)> {
// open socket
let stream = FrameStream::<CANFD_DATA_LEN>::build()
.nonblocking(false)
Expand All @@ -55,18 +59,42 @@ impl CanRawMessaging {

let (ack_tx, ack_rx) = mpsc::unbounded_channel();
let (kill_tx, kill_rx) = oneshot::channel();
let (task_join_tx, task_join_rx) = oneshot::channel();
let task_join_rx = CanTaskHandle(task_join_rx);
let stream_copy = stream.try_clone()?;
tokio::task::spawn_blocking(move || {
can_rx(stream_copy, can_node, ack_tx, new_message_queue, kill_rx)
// We directly spawn a thread instead of tokio::task::spawn_blocking,
// for two reaasons:
//
// 1. Under normal conditions, this closure runs forever, and tokio
// advises only using spawn_blocking for operations that "eventually
// finish on their own"
// 2. tokio::main will not return until all tasks are completed. And
// unlike regular async tasks, blocking tasks cannot be cancelled.
// `kill_tx` partially solves this, but I think its just better to
// decouple tokio from this task.
std::thread::spawn(move || {
let result: Result<(), CanTaskJoinError> =
match std::panic::catch_unwind(AssertUnwindSafe(|| {
can_rx(stream_copy, can_node, ack_tx, new_message_queue, kill_rx)
})) {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(CanTaskJoinError::Err(err)),
Err(panic) => Err(CanTaskPanic::new(panic).into()),
};
debug!(result=?result, "raw can_rx task terminated");
task_join_tx.send(result)
});

Ok(Self {
stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
can_node,
_kill_tx: kill_tx,
})
Ok((
Self {
stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
can_node,
_kill_tx: kill_tx,
},
task_join_rx,
))
}

async fn wait_ack(&mut self, expected_ack_number: u32) -> Result<CommonAckError> {
Expand Down Expand Up @@ -129,6 +157,7 @@ fn can_rx(
Err(oneshot::error::TryRecvError::Empty) => (),
}

trace!("reading from raw");
match stream.recv(&mut frame, 0) {
Ok(_nbytes) => {
break;
Expand Down
51 changes: 40 additions & 11 deletions mcu-interface/src/can/isotp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::FutureExt as _;
use orb_messages::CommonAckError;
use prost::Message;
use std::io::{Read, Write};
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU16, Ordering};
use tokio::sync::{mpsc, oneshot};
use tokio::time::timeout;
Expand All @@ -13,12 +14,13 @@ use can_rs::isotp::addr::CanIsotpAddr;
use can_rs::isotp::stream::IsotpStream;
use can_rs::{Id, CAN_DATA_LEN};

use crate::can::CanTaskPanic;
use crate::{
create_ack, handle_main_mcu_message, handle_sec_mcu_message, McuPayload,
MessagingInterface,
};

use super::ACK_RX_TIMEOUT;
use super::{CanTaskHandle, CanTaskJoinError, ACK_RX_TIMEOUT};

/// ISO-TP addressing scheme
/// 11-bit standard ID
Expand Down Expand Up @@ -92,6 +94,9 @@ impl CanIsoTpMessaging {
/// pairs of addresses, one for transmission of ISO-TP messages and one for reception.
/// A blocking thread is created for listening to new incoming messages.
///
/// Returns a handle to join on the blocking thread and retrieve any errors it
/// produces.
///
/// One pair of addresses _should_ be uniquely used on the bus to prevent misinterpretation of
/// transmitted messages.
/// If a pair of addresses is used by several programs, they must ensure one, and only one,
Expand All @@ -101,7 +106,7 @@ impl CanIsoTpMessaging {
local: IsoTpNodeIdentifier,
remote: IsoTpNodeIdentifier,
new_message_queue: mpsc::UnboundedSender<McuPayload>,
) -> Result<CanIsoTpMessaging> {
) -> Result<(Self, CanTaskHandle)> {
let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?;
debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst);

Expand All @@ -119,17 +124,40 @@ impl CanIsoTpMessaging {

let (ack_tx, ack_rx) = mpsc::unbounded_channel();
let (kill_tx, kill_rx) = oneshot::channel();
// spawn CAN receiver
tokio::task::spawn_blocking(move || {
can_rx(bus, remote, local, ack_tx, new_message_queue, kill_rx)
let (task_join_tx, task_join_rx) = oneshot::channel();
let task_join_rx = CanTaskHandle(task_join_rx);
// We directly spawn a thread instead of tokio::task::spawn_blocking,
// for two reaasons:
//
// 1. Under normal conditions, this closure runs forever, and tokio
// advises only using spawn_blocking for operations that "eventually
// finish on their own"
// 2. tokio::main will not return until all tasks are completed. And
// unlike regular async tasks, blocking tasks cannot be cancelled.
// `kill_tx` partially solves this, but I think its just better to
// decouple tokio from this task.
std::thread::spawn(move || {
let result: Result<(), CanTaskJoinError> =
match std::panic::catch_unwind(AssertUnwindSafe(|| {
can_rx(bus, remote, local, ack_tx, new_message_queue, kill_rx)
})) {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(CanTaskJoinError::Err(err)),
Err(panic) => Err(CanTaskPanic::new(panic).into()),
};
debug!(result=?result, "isotp can_rx task terminated");
task_join_tx.send(result)
});

Ok(CanIsoTpMessaging {
stream: tx_isotp_stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
_kill_tx: kill_tx,
})
Ok((
CanIsoTpMessaging {
stream: tx_isotp_stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
_kill_tx: kill_tx,
},
task_join_rx,
))
}

async fn wait_ack(&mut self, expected_ack_number: u32) -> Result<CommonAckError> {
Expand Down Expand Up @@ -206,6 +234,7 @@ fn can_rx(
Err(oneshot::error::TryRecvError::Empty) => (),
}

trace!("reading from isotp");
let buffer = match rx_isotp_stream.read(&mut buffer) {
Ok(_) => buffer,
Err(e) => {
Expand Down
76 changes: 75 additions & 1 deletion mcu-interface/src/can/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,80 @@
use std::time::Duration;
use std::{any::Any, task::Poll, time::Duration};

use pin_project::pin_project;
use tokio::sync::oneshot;

pub mod canfd;
pub mod isotp;

const ACK_RX_TIMEOUT: Duration = Duration::from_millis(1500);

pub type CanTaskResult = Result<(), CanTaskJoinError>;

/// Handle that can be used to detect errors in the can receive task.
///
/// Note that dropping this handle doesn't kill the task.
#[pin_project]
#[derive(Debug)]
pub struct CanTaskHandle(#[pin] oneshot::Receiver<CanTaskResult>);

impl std::future::Future for CanTaskHandle {
type Output = CanTaskResult;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let rx = self.project().0;
rx.poll(cx).map(|recv| match recv {
Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()),
Ok(Err(err)) => Err(err),
})
}
}

impl CanTaskHandle {
/// Blocks until the task is complete.
///
/// It is recommended to simply .await instead, since `CanTaskHandle` implements
/// `Future`.
pub fn join(self) -> CanTaskResult {
match self.0.blocking_recv() {
Ok(Ok(())) | Err(oneshot::error::RecvError { .. }) => Ok(()),
Ok(Err(err)) => Err(err),
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum CanTaskJoinError {
#[error(transparent)]
Panic(#[from] CanTaskPanic),
#[error(transparent)]
Err(#[from] color_eyre::Report),
}

#[derive(thiserror::Error)]
#[error("panic in thread used to receive from canbus")]
// Mutex is there to make it implement Sync without using `unsafe`
pub struct CanTaskPanic(std::sync::Mutex<Box<dyn Any + Send + 'static>>);

impl CanTaskPanic {
fn new(err: Box<dyn Any + Send + 'static>) -> Self {
Self(std::sync::Mutex::new(err))
}

/// Returns the object with which the task panicked.
///
/// You can pass this into [`std::panic::resume_unwind()`] to propagate the
/// panic.
pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
self.0.into_inner().expect("infallible")
}
}

impl std::fmt::Debug for CanTaskPanic {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple(std::any::type_name::<CanTaskPanic>())
.finish()
}
}
1 change: 1 addition & 0 deletions mcu-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ orb-mcu-interface.path = "../mcu-interface"
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
futures.workspace = true

[package.metadata.orb]
unsupported_targets = [
Expand Down
Loading

0 comments on commit 95dd9db

Please sign in to comment.