Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AsyncRead/AsyncWrite traits for generic TCP implementation #164

Merged
merged 3 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@

- TCP Server: Replace `NewService` trait with closures and never panic
- TCP Server: Stabilize "tcp-server" feature
- TCP Client: Allow to attach a generic transport layer, e.g. for TLS support
- RTU Server: Remove `NewService` trait and pass instance directly
- Fix (sync): No timeout while establishing serial RTU connections

### Breaking Changes

- RTU: Replaced the async and fallible `connect()` and `connect_slave()`
functions with synchronous and infallible variants named `attach()` and
`attach_slave()` in correspondence to their TCP counterparts.

## v0.7.1 (2023-02-28)

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ all-features = true
# <https://users.rust-lang.org/t/psa-please-specify-precise-dependency-versions-in-cargo-toml/71277>

[dependencies]
async-trait = "0.1.65"
async-trait = "0.1.66"
byteorder = "1.4.3"
bytes = "1.4.0"
futures = { version = "0.3.26", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let builder = tokio_serial::new(tty_path, 19200);
let port = SerialStream::open(&builder).unwrap();

let mut ctx = rtu::connect_slave(port, slave).await?;
let mut ctx = rtu::attach_slave(port, slave);
println!("Reading a sensor value");
let rsp = ctx.read_holding_registers(0x082B, 2).await?;
println!("Sensor value is: {rsp:?}");
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-server-address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Connecting client...");
let client_serial = tokio_serial::SerialStream::open(&builder).unwrap();
let mut ctx = rtu::connect_slave(client_serial, slave).await?;
let mut ctx = rtu::attach_slave(client_serial, slave);
println!("Reading input registers...");
let rsp = ctx.read_input_registers(0x00, 7).await?;
println!("The result is '{rsp:#x?}'"); // The result is '[0x0,0x0,0x77,0x0,0x0,0x0,0x0,]'
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Connecting client...");
let client_serial = tokio_serial::SerialStream::open(&builder).unwrap();
let mut ctx = rtu::connect(client_serial).await?;
let mut ctx = rtu::attach(client_serial);
println!("Reading input registers...");
let rsp = ctx.read_input_registers(0x00, 7).await?;
println!("The result is '{rsp:#x?}'"); // The result is '[0x0,0x0,0x77,0x0,0x0,0x0,0x0,]'
Expand Down
20 changes: 8 additions & 12 deletions src/client/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,26 @@

//! RTU client connections

use super::*;

use crate::service;

use std::io::Error;
use tokio::io::{AsyncRead, AsyncWrite};

use super::*;

/// Connect to no particular Modbus slave device for sending
/// broadcast messages.
pub async fn connect<T>(transport: T) -> Result<Context, Error>
pub fn attach<T>(transport: T) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
{
connect_slave(transport, Slave::broadcast()).await
attach_slave(transport, Slave::broadcast())
}

/// Connect to any kind of Modbus slave device.
pub async fn connect_slave<T>(transport: T, slave: Slave) -> Result<Context, Error>
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
{
let client = service::rtu::connect_slave(transport, slave).await?;

Ok(Context {
let client = crate::service::rtu::Client::new(transport, slave);
Context {
client: Box::new(client),
})
}
}
7 changes: 4 additions & 3 deletions src/client/sync/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::{block_on_with_timeout, Context};

use tokio_serial::{SerialPortBuilder, SerialStream};

use crate::client::rtu::connect_slave as async_connect_slave;
use crate::slave::Slave;

/// Connect to no particular Modbus slave device for sending
Expand Down Expand Up @@ -41,8 +40,10 @@ pub fn connect_slave_with_timeout(
.enable_time()
.build()?;
// SerialStream::open requires a runtime at least on cfg(unix).
let serial = runtime.block_on(async { SerialStream::open(builder) })?;
let async_ctx = block_on_with_timeout(&runtime, timeout, async_connect_slave(serial, slave))?;
let serial = block_on_with_timeout(&runtime, timeout, async {
SerialStream::open(builder).map_err(Into::into)
})?;
let async_ctx = crate::client::rtu::attach_slave(serial, slave);
let sync_ctx = Context {
runtime,
async_ctx,
Expand Down
51 changes: 34 additions & 17 deletions src/client/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,48 @@

//! TCP client connections

use super::*;
use std::{fmt, io::Error, net::SocketAddr};

use crate::service;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};

use std::{future::Future, io::Error, net::SocketAddr};
use super::*;

/// Establish a direct connection to a Modbus TCP coupler.
pub fn connect(socket_addr: SocketAddr) -> impl Future<Output = Result<Context, Error>> {
connect_slave(socket_addr, Slave::tcp_device())
pub async fn connect(socket_addr: SocketAddr) -> Result<Context, Error> {
connect_slave(socket_addr, Slave::tcp_device()).await
}

/// Connect to a physical, broadcast, or custom Modbus device,
/// probably through a Modbus TCP gateway that is forwarding
/// messages to/from the corresponding slave device.
pub fn connect_slave(
socket_addr: SocketAddr,
slave: Slave,
) -> impl Future<Output = Result<Context, Error>> {
let context_future = service::tcp::connect_slave(socket_addr, slave);

async {
let context = context_future.await?;

Ok(Context {
client: Box::new(context),
})
pub async fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result<Context, Error> {
let transport = TcpStream::connect(socket_addr).await?;
let context = attach_slave(transport, slave);
Ok(context)
}

/// Attach a new client context to a direct transport connection.
///
/// The connection could either be an ordinary [`TcpStream`] or a TLS connection.
pub fn attach<T>(transport: T) -> Context
where
T: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug + 'static,
{
attach_slave(transport, Slave::tcp_device())
}

/// Attach a new client context to a transport connection.
///
/// The connection could either be an ordinary [`TcpStream`] or a TLS connection.
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
where
T: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug + 'static,
{
let client = crate::service::tcp::Client::new(transport, slave);
Context {
client: Box::new(client),
}
}
117 changes: 53 additions & 64 deletions src/service/rtu.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
// SPDX-FileCopyrightText: Copyright (c) 2017-2023 slowtec GmbH <post@slowtec.de>
// SPDX-License-Identifier: MIT OR Apache-2.0

use crate::{
client::Client,
codec,
frame::{rtu::*, *},
slave::*,
};

use futures_util::{future, sink::SinkExt as _, stream::StreamExt as _};
use std::{
fmt::Debug,
future::Future,
fmt,
io::{Error, ErrorKind},
};

use futures_util::{sink::SinkExt as _, stream::StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

pub(crate) fn connect_slave<T>(
transport: T,
slave: Slave,
) -> impl Future<Output = Result<Context<T>, Error>>
where
T: AsyncRead + AsyncWrite + Debug + Unpin + 'static,
{
let framed = Framed::new(transport, codec::rtu::ClientCodec::default());

let slave_id = slave.into();
future::ok(Context {
service: framed,
slave_id,
})
}
use crate::{
codec,
frame::{rtu::*, *},
slave::*,
};

/// Modbus RTU client
#[derive(Debug)]
pub(crate) struct Context<T: AsyncRead + AsyncWrite + Debug + Unpin + 'static> {
service: Framed<T, codec::rtu::ClientCodec>,
pub(crate) struct Client<T> {
framed: Framed<T, codec::rtu::ClientCodec>,
slave_id: SlaveId,
}

impl<T: AsyncRead + AsyncWrite + Unpin + Debug + 'static> Context<T> {
impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(transport: T, slave: Slave) -> Self {
let framed = Framed::new(transport, codec::rtu::ClientCodec::default());
let slave_id = slave.into();
Self { framed, slave_id }
}

fn next_request_adu<R>(&self, req: R, disconnect: bool) -> RequestAdu
where
R: Into<RequestPdu>,
Expand All @@ -60,9 +52,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Debug + 'static> Context<T> {
let req_adu = self.next_request_adu(req, disconnect);
let req_hdr = req_adu.hdr;

self.service.send(req_adu).await?;
self.framed.send(req_adu).await?;
let res_adu = self
.service
.framed
.next()
.await
.unwrap_or_else(|| Err(Error::from(ErrorKind::BrokenPipe)))?;
Expand All @@ -86,14 +78,17 @@ fn verify_response_header(req_hdr: Header, rsp_hdr: Header) -> Result<(), Error>
Ok(())
}

impl<T: AsyncRead + AsyncWrite + Debug + Unpin + 'static> SlaveContext for Context<T> {
impl<T> SlaveContext for Client<T> {
fn set_slave(&mut self, slave: Slave) {
self.slave_id = slave.into();
}
}

#[async_trait::async_trait]
impl<T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static> Client for Context<T> {
impl<T> crate::client::Client for Client<T>
where
T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin,
{
async fn call(&mut self, req: Request) -> Result<Response, Error> {
self.call(req).await
}
Expand All @@ -113,42 +108,36 @@ mod tests {

impl Unpin for MockTransport {}

#[tokio::test]
async fn handle_broken_pipe() {
impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncWrite for MockTransport {
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

impl AsyncWrite for MockTransport {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
}

let transport = MockTransport {};
let mut ctx =
crate::service::rtu::connect_slave(transport, crate::service::rtu::Slave::broadcast())
.await
.unwrap();
let res = ctx
#[tokio::test]
async fn handle_broken_pipe() {
let transport = MockTransport;
let mut client =
crate::service::rtu::Client::new(transport, crate::service::rtu::Slave::broadcast());
let res = client
.call(crate::service::rtu::Request::ReadCoils(0x00, 5))
.await;
assert!(res.is_err());
Expand Down
Loading