Skip to content

Commit

Permalink
Use tokio for client session's main processing loop
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Aug 1, 2021
1 parent 7a243b8 commit 6d4dd1e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 38 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
- Support `Aes256-Sha256-RsaPss` security policy
- Support `rsa-oaep-sha2-256` encryption for identity tokens
- Compliance improvements
- Tokio 1.8
- Upgrade from Tokio 0.1 to 1.8.x long term support

### Planned

Expand Down
91 changes: 54 additions & 37 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ use std::{
convert::TryFrom,
result::Result,
str::FromStr,
sync::{mpsc, Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock},
thread,
};

use tokio::time::{interval_at, Duration, Instant};
use tokio::{
sync::oneshot,
time::{interval_at, Duration, Instant},
};

use opcua_core::{
comms::{
Expand Down Expand Up @@ -119,6 +122,7 @@ pub enum HistoryUpdateDetails {
}

/// A `Session` runs in a loop, which can be terminated by sending it a `SessionCommand`.
#[derive(Debug)]
pub enum SessionCommand {
/// Stop running as soon as possible
Stop,
Expand Down Expand Up @@ -608,25 +612,16 @@ impl Session {
/// connection errors. The run command will break if the session is disconnected
/// and cannot be reestablished.
///
/// The `run()` function returns a `Sender` that can be used to send a message to the session
/// to cause it to terminate.
///
/// # Arguments
///
/// * `session` - the session to run ynchronously
///
/// # Returns
///
/// * `mpsc::Sender<ClientCommand>` - A sender that allows the caller to send a message to the
/// run loop to cause it to stop.
///
pub fn run(session: Arc<RwLock<Session>>) -> mpsc::Sender<SessionCommand> {
let (tx, rx) = mpsc::channel();
pub fn run(session: Arc<RwLock<Session>>) {
let (tx, rx) = oneshot::channel();
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
tx
}

/// Asynchronously runs a polling loop over the supplied session. The run command performs
/// Runs the session in a new thread and returns asynchronously. The run command performs
/// periodic actions such as receiving messages, processing subscriptions, and recovering from
/// connection errors. The run command will break if the session is disconnected
/// and cannot be reestablished.
Expand All @@ -642,35 +637,30 @@ impl Session {
///
/// # Returns
///
/// * `mpsc::Sender<ClientCommand>` - A sender that allows the caller to send a message to the
/// run loop to cause it to stop.
/// * `oneshot::Sender<ClientCommand>` - A sender that allows the caller to send a message to the
/// run loop to cause it to stop. Note that dropping the sender, i.e. not binding it to
/// a variable will also cause the loop to stop.
///
pub fn run_async(session: Arc<RwLock<Session>>) -> mpsc::Sender<SessionCommand> {
let (tx, rx) = mpsc::channel();
pub fn run_async(session: Arc<RwLock<Session>>) -> oneshot::Sender<SessionCommand> {
let (tx, rx) = oneshot::channel();
thread::spawn(move || Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx));
tx
}

/// The main running loop for a session. This is used by `run()` and `run_async()` to run
/// continuously until a signal is received to terminate.
fn run_loop(
/// The asynchronous main session loop. This is the function that processes responses and
/// keeps the session alive. Note that while the client normally calls `run()` or `run_loop()`
/// to invoke this, there may be situations where the client wishes to directly use this
/// function, for example if the client has its own Tokio runtime and prefers to spawn the task
/// with that.
pub async fn session_task(
session: Arc<RwLock<Session>>,
sleep_interval: u64,
rx: mpsc::Receiver<SessionCommand>,
rx: oneshot::Receiver<SessionCommand>,
) {
loop {
match rx.try_recv() {
Ok(command) => match command {
SessionCommand::Stop => {
info!("Run session was terminated by a message");
break;
}
},
Err(mpsc::TryRecvError::Disconnected) => {
info!("Run session terminated because the channel disconnected");
break;
}
Err(mpsc::TryRecvError::Empty) => {
tokio::select! {
_ = async {
let mut timer = interval_at(Instant::now(), Duration::from_millis(sleep_interval));
loop {
// Poll the session.
let poll_result = {
let mut session = session.write().unwrap();
Expand All @@ -680,20 +670,47 @@ impl Session {
Ok(did_something) => {
// If the session did nothing, then sleep for a moment to save some CPU
if !did_something {
thread::sleep(Duration::from_millis(sleep_interval))
timer.tick().await;
}
}
Err(_) => {
// Break the loop if connection goes down
info!("Connection to server broke, so terminating");
info!("Run session connection to server broke, so terminating");
break;
}
}
}
} => {}
message = rx => {
if let Ok(message) = message {
// Only message is a Quit command so no point even testing what it is.
info!("Run session was terminated by a message {:?}", message);
}
else {
debug!("Run session receiver is terminated, presumably by caller dropping oneshot");
}
}
}
}

/// The main running loop for a session. This is used by `run()` and `run_async()` to run
/// continuously until a signal is received to terminate.
pub fn run_loop(
session: Arc<RwLock<Session>>,
sleep_interval: u64,
rx: oneshot::Receiver<SessionCommand>,
) {
// The running loop is a single threaded tokio executor. It invokes session_task() to
// handle responses, send publish requests and handle session commands, i.e. to stop.
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async move {
Self::session_task(session, sleep_interval, rx).await;
});
}

/// Polls on the session which basically dispatches any pending
/// async responses, attempts to reconnect if the client is disconnected from the client and
/// sleeps a little bit if nothing needed to be done.
Expand Down

0 comments on commit 6d4dd1e

Please sign in to comment.