Skip to content

Commit

Permalink
Merge pull request #53 from stepfunc/bugfix/bindings
Browse files Browse the repository at this point in the history
Fix blocking callbacks and fix tracing::Span leak
  • Loading branch information
emgre committed Nov 1, 2021
2 parents 0b3fb9a + a8619c8 commit 5537537
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
### Next ###
* Client callbacks are now not blocking.
See [#53](https://github.com/stepfunc/rodbus/pull/53).
* Fix leak of `tracing::Span` in bindings.
See [#53](https://github.com/stepfunc/rodbus/pull/53).
* Add Linux AArch64 support in Java and .NET.
See [#51](https://github.com/stepfunc/rodbus/pull/51).

Expand Down
2 changes: 1 addition & 1 deletion ffi/rodbus-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ repository = "https://github.com/stepfunc/rodbus"
readme = "../README.md"

[dependencies]
ci-script = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" }
ci-script = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" }
rodbus-schema = { path = "../rodbus-schema" }
rodbus-ffi = { path = "../rodbus-ffi" }
3 changes: 2 additions & 1 deletion ffi/rodbus-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ crate-type = ["rlib", "staticlib", "cdylib"]

[dependencies]
tracing = "0.1"
tracing-core = "0.1"
tracing-subscriber = "0.2"
rodbus = { path = "../../rodbus" }
tokio = { version = "1.5", features = ["rt-multi-thread"]}
num_cpus = "1"

[build-dependencies]
rodbus-schema = { path = "../rodbus-schema" }
rust-oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" }
rust-oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" }
22 changes: 12 additions & 10 deletions ffi/rodbus-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) unsafe fn channel_read_coils(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.read_coils(range, callback))?;
.spawn(async move { session.read_coils(range, callback).await })?;

Ok(())
}
Expand All @@ -69,7 +69,7 @@ pub(crate) unsafe fn channel_read_discrete_inputs(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.read_discrete_inputs(range, callback))?;
.spawn(async move { session.read_discrete_inputs(range, callback).await })?;

Ok(())
}
Expand All @@ -87,7 +87,7 @@ pub(crate) unsafe fn channel_read_holding_registers(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.read_holding_registers(range, callback))?;
.spawn(async move { session.read_holding_registers(range, callback).await })?;

Ok(())
}
Expand All @@ -105,7 +105,7 @@ pub(crate) unsafe fn channel_read_input_registers(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.read_input_registers(range, callback))?;
.spawn(async move { session.read_input_registers(range, callback).await })?;

Ok(())
}
Expand All @@ -122,7 +122,7 @@ pub(crate) unsafe fn channel_write_single_coil(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.write_single_coil(bit.into(), callback))?;
.spawn(async move { session.write_single_coil(bit.into(), callback).await })?;

Ok(())
}
Expand All @@ -137,9 +137,11 @@ pub(crate) unsafe fn channel_write_single_register(
let callback = callback.convert_to_fn_once();

let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.write_single_register(register.into(), callback))?;
channel.runtime.spawn(async move {
session
.write_single_register(register.into(), callback)
.await
})?;

Ok(())
}
Expand All @@ -159,7 +161,7 @@ pub(crate) unsafe fn channel_write_multiple_coils(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.write_multiple_coils(args, callback))?;
.spawn(async move { session.write_multiple_coils(args, callback).await })?;

Ok(())
}
Expand All @@ -179,7 +181,7 @@ pub(crate) unsafe fn channel_write_multiple_registers(
let mut session = param.build_session(channel);
channel
.runtime
.block_on(session.write_multiple_registers(args, callback))?;
.spawn(async move { session.write_multiple_registers(args, callback).await })?;

Ok(())
}
Expand Down
12 changes: 12 additions & 0 deletions ffi/rodbus-ffi/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ impl tracing::Subscriber for Adapter {
fn exit(&self, span: &Id) {
self.inner.exit(span)
}

fn clone_span(&self, span: &Id) -> Id {
self.inner.clone_span(span)
}

fn try_close(&self, span: Id) -> bool {
self.inner.try_close(span)
}

fn current_span(&self) -> tracing_core::span::Current {
self.inner.current_span()
}
}

impl From<tracing::Level> for ffi::LogLevel {
Expand Down
9 changes: 4 additions & 5 deletions ffi/rodbus-ffi/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::future::Future;

use crate::ffi;
use tokio::runtime::Handle;

pub struct Runtime {
pub(crate) inner: std::sync::Arc<tokio::runtime::Runtime>,
Expand All @@ -27,7 +26,7 @@ pub(crate) struct RuntimeHandle {
}

impl RuntimeHandle {
pub(crate) fn block_on<F: Future>(&self, future: F) -> Result<F::Output, ffi::ParamError> {
/*pub(crate) fn block_on<F: Future>(&self, future: F) -> Result<F::Output, ffi::ParamError> {
let inner = self
.inner
.upgrade()
Expand All @@ -36,9 +35,9 @@ impl RuntimeHandle {
return Err(ffi::ParamError::RuntimeCannotBlockWithinAsync);
}
Ok(inner.block_on(future))
}
}*/

/*pub(crate) fn spawn<F>(&self, future: F) -> Result<(), ffi::ParamError>
pub(crate) fn spawn<F>(&self, future: F) -> Result<(), ffi::ParamError>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -49,7 +48,7 @@ impl RuntimeHandle {
.ok_or(ffi::ParamError::RuntimeDestroyed)?;
inner.spawn(future);
Ok(())
}*/
}
}

fn build_runtime<F>(f: F) -> std::result::Result<tokio::runtime::Runtime, std::io::Error>
Expand Down
4 changes: 2 additions & 2 deletions ffi/rodbus-ffi/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ pub(crate) unsafe fn create_tcp_server(
decode_level.into(),
))
.map_err(|_| ffi::ParamError::ServerBindError)?;
let join_handle = runtime.inner.spawn(task);
runtime.inner.spawn(task);

let server_handle = Server {
_server: ServerHandle::new(tx, join_handle),
_server: ServerHandle::new(tx),
map: handler_map,
};

Expand Down
2 changes: 1 addition & 1 deletion ffi/rodbus-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ repository = "https://github.com/stepfunc/rodbus"
readme = "../README.md"

[dependencies]
oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.3" }
oo-bindgen = { git = "https://github.com/stepfunc/oo_bindgen.git", tag = "0.1.4" }
rodbus = { path = "../../rodbus" }
3 changes: 1 addition & 2 deletions guide/docs/api/client/tcp_client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ values={[

## Maximum Queued Requests

Each channel sends one request at a time and has a fixed-length buffer of requests to send. If the queue is full when demanding
a request, this call **will block** until the queue has enough space.
Each channel sends one request at a time and has a fixed-length buffer of requests to send.

## Endpoint Configuration

Expand Down
2 changes: 1 addition & 1 deletion rodbus/src/client/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
tracing::warn!("error occurred making request: {}", e);
}

result.as_ref().err().and_then(|e| SessionError::from(e))
result.as_ref().err().and_then(SessionError::from)
}

async fn execute_request(
Expand Down
11 changes: 5 additions & 6 deletions rodbus/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ pub use types::*;
/// A handle to the server async task. The task is shutdown when the handle is dropped.
#[derive(Debug)]
pub struct ServerHandle {
tx: tokio::sync::mpsc::Sender<()>,
handle: tokio::task::JoinHandle<()>,
_tx: tokio::sync::mpsc::Sender<()>,
}

impl ServerHandle {
/// Construct a [ServerHandle] from its fields
///
/// This function is only required for the C bindings
pub fn new(tx: tokio::sync::mpsc::Sender<()>, handle: tokio::task::JoinHandle<()>) -> Self {
ServerHandle { tx, handle }
pub fn new(tx: tokio::sync::mpsc::Sender<()>) -> Self {
ServerHandle { _tx: tx }
}
}

Expand All @@ -52,7 +51,7 @@ pub async fn spawn_tcp_server_task<T: RequestHandler>(
let listener = crate::tokio::net::TcpListener::bind(addr).await?;

let (tx, rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(create_tcp_server_task_impl(
tokio::spawn(create_tcp_server_task_impl(
rx,
max_sessions,
addr,
Expand All @@ -61,7 +60,7 @@ pub async fn spawn_tcp_server_task<T: RequestHandler>(
decode,
));

Ok(ServerHandle::new(tx, handle))
Ok(ServerHandle::new(tx))
}

/// Creates a TCP server task that can then be spawned onto the runtime manually.
Expand Down

0 comments on commit 5537537

Please sign in to comment.