Skip to content

Commit

Permalink
Less thread work
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Aug 4, 2021
1 parent c44a35f commit 1ed8caf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 34 deletions.
31 changes: 11 additions & 20 deletions client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
io::{self, ReadHalf, WriteHalf},
net::TcpStream,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
time::{interval_at, Duration, Instant},
time::{interval, Duration},
};
use tokio_util::codec::FramedRead;

Expand Down Expand Up @@ -300,39 +300,30 @@ impl TcpTransport {
// has also terminated.
self.spawn_connection_task(runtime, addr, endpoint_url);

let result = Arc::new(RwLock::new(Ok(())));

let result_task = result.clone();
runtime.block_on(async move {
debug!("Spawning task that waits on the connection state to change");
let result = runtime.block_on(async move {
// Poll for the state to indicate connect is ready
debug!("Waiting for a connect (or failure to connect)");
let mut timer = interval_at(
Instant::now(),
Duration::from_millis(Self::WAIT_POLLING_TIMEOUT),
);
let mut timer = interval(Duration::from_millis(Self::WAIT_POLLING_TIMEOUT));
loop {
timer.tick().await;
match self.connection_state.state() {
ConnectionState::Processing => {
debug!("Connected");
break;
return Ok(());
}
ConnectionState::Finished(status_code) => {
error!("Connected failed with status {}", status_code);
let mut result = trace_write_lock_unwrap!(result_task);
*result = Err(StatusCode::BadConnectionClosed);
break;
return Err(StatusCode::BadConnectionClosed);
}
_ => {
// Still waiting for something to happen
}
}
timer.tick().await;
}
});

// Getting result is a bit of a pain
let result = trace_read_lock_unwrap!(result);
result.clone()
result
}

/// Disconnects the stream from the server (if it is connected)
Expand Down Expand Up @@ -446,7 +437,7 @@ impl TcpTransport {
}
};
// Wait for connection state to be closed
let mut timer = interval_at(Instant::now(), Duration::from_millis(10));
let mut timer = interval(Duration::from_millis(10));
loop {
timer.tick().await;
{
Expand Down Expand Up @@ -513,8 +504,9 @@ impl TcpTransport {
tokio::spawn(async move {
let id = format!("finished-monitor-task, {}", id);
register_runtime_component!(&id);
let mut timer = interval_at(Instant::now(), Duration::from_millis(200));
let mut timer = interval(Duration::from_millis(200));
loop {
timer.tick().await;
if connection_state.is_finished() {
// Set the flag
let mut finished_flag = trace_write_lock_unwrap!(finished_flag);
Expand All @@ -524,7 +516,6 @@ impl TcpTransport {
*finished_flag = true;
break;
}
timer.tick().await;
}
info!("Timer for finished is finished");
deregister_runtime_component!(&id);
Expand Down
24 changes: 10 additions & 14 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{

use tokio::{
sync::oneshot,
time::{interval_at, Duration, Instant},
time::{interval, Duration, Instant},
};

use opcua_core::{
Expand Down Expand Up @@ -631,7 +631,7 @@ impl Session {
///
pub fn run(session: Arc<RwLock<Session>>) {
let (_tx, rx) = oneshot::channel();
Self::run_loop(true, session, Self::POLL_SLEEP_INTERVAL, rx);
Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx);
}

/// Runs the session asynchronously on a new thread. The function returns immediately
Expand All @@ -658,7 +658,7 @@ impl Session {
///
pub fn run_async(session: Arc<RwLock<Session>>) -> oneshot::Sender<SessionCommand> {
let (tx, rx) = oneshot::channel();
thread::spawn(move || Self::run_loop(true, session, Self::POLL_SLEEP_INTERVAL, rx));
thread::spawn(move || Self::run_loop(session, Self::POLL_SLEEP_INTERVAL, rx));
tx
}

Expand All @@ -674,7 +674,7 @@ impl Session {
) {
tokio::select! {
_ = async {
let mut timer = interval_at(Instant::now(), Duration::from_millis(sleep_interval));
let mut timer = interval(Duration::from_millis(sleep_interval));
loop {
// Poll the session.
let poll_result = {
Expand Down Expand Up @@ -722,7 +722,6 @@ impl Session {
/// * `rx` - A receiver that the task uses to receive a quit command directly from the caller.
///
pub fn run_loop(
block_on: bool,
session: Arc<RwLock<Session>>,
sleep_interval: u64,
rx: oneshot::Receiver<SessionCommand>,
Expand All @@ -733,14 +732,11 @@ impl Session {
Self::session_task(session, sleep_interval, rx).await;
}
};

// Spawn the task on the alloted runtime
let session = trace_read_lock_unwrap!(session);
if block_on {
thread::spawn(move || {
let session = trace_read_lock_unwrap!(session);
session.runtime.block_on(task);
} else {
session.runtime.spawn(task);
}
});
}

/// Polls on the session which basically dispatches any pending
Expand Down Expand Up @@ -1128,17 +1124,17 @@ impl Session {
// The timer runs at a higher frequency timer loop to terminate as soon after the session
// state has terminated. Each time it runs it will test if the interval has elapsed or not.
let session_activity_interval = Duration::from_millis(session_activity);
let mut timer = interval_at(Instant::now(), Duration::from_millis(MIN_SESSION_ACTIVITY_MS));
let mut timer = interval(Duration::from_millis(MIN_SESSION_ACTIVITY_MS));
let mut last_timeout = Instant::now();

loop {
timer.tick().await;

if connection_state.is_finished() {
info!("Session activity timer is terminating");
break;
}

timer.tick().await;

// Get the time now
let now = Instant::now();

Expand Down

0 comments on commit 1ed8caf

Please sign in to comment.