Skip to content

Commit

Permalink
Simplify and align the RTU client with the TCP client
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde authored and flosse committed Mar 9, 2023
1 parent bd6e76e commit d4f07b3
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 79 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
- TCP Client: Allow to attach a generic transport layer, e.g. for TLS support
- RTU Server: Remove `NewService` trait and pass instance directly

### Breaking Changes

- RTU: Replaced the async and fallible `connect()` and `connect_slave()`
functions with synchronous and infallible variants named `attach()` and
`attach_slave()` in correspondence to their TCP counterparts.

## v0.7.1 (2023-02-28)

- Fix (sync): Panic when using client-side timeouts [#155](https://github.com/slowtec/tokio-modbus/issues/155)
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let builder = tokio_serial::new(tty_path, 19200);
let port = SerialStream::open(&builder).unwrap();

let mut ctx = rtu::connect_slave(port, slave).await?;
let mut ctx = rtu::attach_slave(port, slave);
println!("Reading a sensor value");
let rsp = ctx.read_holding_registers(0x082B, 2).await?;
println!("Sensor value is: {rsp:?}");
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-server-address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Connecting client...");
let client_serial = tokio_serial::SerialStream::open(&builder).unwrap();
let mut ctx = rtu::connect_slave(client_serial, slave).await?;
let mut ctx = rtu::attach_slave(client_serial, slave);
println!("Reading input registers...");
let rsp = ctx.read_input_registers(0x00, 7).await?;
println!("The result is '{rsp:#x?}'"); // The result is '[0x0,0x0,0x77,0x0,0x0,0x0,0x0,]'
Expand Down
2 changes: 1 addition & 1 deletion examples/rtu-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("Connecting client...");
let client_serial = tokio_serial::SerialStream::open(&builder).unwrap();
let mut ctx = rtu::connect(client_serial).await?;
let mut ctx = rtu::attach(client_serial);
println!("Reading input registers...");
let rsp = ctx.read_input_registers(0x00, 7).await?;
println!("The result is '{rsp:#x?}'"); // The result is '[0x0,0x0,0x77,0x0,0x0,0x0,0x0,]'
Expand Down
20 changes: 8 additions & 12 deletions src/client/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,26 @@

//! RTU client connections

use super::*;

use crate::service;

use std::io::Error;
use tokio::io::{AsyncRead, AsyncWrite};

use super::*;

/// Connect to no particular Modbus slave device for sending
/// broadcast messages.
pub async fn connect<T>(transport: T) -> Result<Context, Error>
pub fn attach<T>(transport: T) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
{
connect_slave(transport, Slave::broadcast()).await
attach_slave(transport, Slave::broadcast())
}

/// Connect to any kind of Modbus slave device.
pub async fn connect_slave<T>(transport: T, slave: Slave) -> Result<Context, Error>
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
{
let client = service::rtu::connect_slave(transport, slave).await?;

Ok(Context {
let client = crate::service::rtu::Client::new(transport, slave);
Context {
client: Box::new(client),
})
}
}
117 changes: 53 additions & 64 deletions src/service/rtu.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
// SPDX-FileCopyrightText: Copyright (c) 2017-2023 slowtec GmbH <post@slowtec.de>
// SPDX-License-Identifier: MIT OR Apache-2.0

use crate::{
client::Client,
codec,
frame::{rtu::*, *},
slave::*,
};

use futures_util::{future, sink::SinkExt as _, stream::StreamExt as _};
use std::{
fmt::Debug,
future::Future,
fmt,
io::{Error, ErrorKind},
};

use futures_util::{sink::SinkExt as _, stream::StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

pub(crate) fn connect_slave<T>(
transport: T,
slave: Slave,
) -> impl Future<Output = Result<Context<T>, Error>>
where
T: AsyncRead + AsyncWrite + Debug + Unpin + 'static,
{
let framed = Framed::new(transport, codec::rtu::ClientCodec::default());

let slave_id = slave.into();
future::ok(Context {
service: framed,
slave_id,
})
}
use crate::{
codec,
frame::{rtu::*, *},
slave::*,
};

/// Modbus RTU client
#[derive(Debug)]
pub(crate) struct Context<T: AsyncRead + AsyncWrite + Debug + Unpin + 'static> {
service: Framed<T, codec::rtu::ClientCodec>,
pub(crate) struct Client<T> {
framed: Framed<T, codec::rtu::ClientCodec>,
slave_id: SlaveId,
}

impl<T: AsyncRead + AsyncWrite + Unpin + Debug + 'static> Context<T> {
impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub(crate) fn new(transport: T, slave: Slave) -> Self {
let framed = Framed::new(transport, codec::rtu::ClientCodec::default());
let slave_id = slave.into();
Self { framed, slave_id }
}

fn next_request_adu<R>(&self, req: R, disconnect: bool) -> RequestAdu
where
R: Into<RequestPdu>,
Expand All @@ -60,9 +52,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Debug + 'static> Context<T> {
let req_adu = self.next_request_adu(req, disconnect);
let req_hdr = req_adu.hdr;

self.service.send(req_adu).await?;
self.framed.send(req_adu).await?;
let res_adu = self
.service
.framed
.next()
.await
.unwrap_or_else(|| Err(Error::from(ErrorKind::BrokenPipe)))?;
Expand All @@ -86,14 +78,17 @@ fn verify_response_header(req_hdr: Header, rsp_hdr: Header) -> Result<(), Error>
Ok(())
}

impl<T: AsyncRead + AsyncWrite + Debug + Unpin + 'static> SlaveContext for Context<T> {
impl<T> SlaveContext for Client<T> {
fn set_slave(&mut self, slave: Slave) {
self.slave_id = slave.into();
}
}

#[async_trait::async_trait]
impl<T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static> Client for Context<T> {
impl<T> crate::client::Client for Client<T>
where
T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin,
{
async fn call(&mut self, req: Request) -> Result<Response, Error> {
self.call(req).await
}
Expand All @@ -113,42 +108,36 @@ mod tests {

impl Unpin for MockTransport {}

#[tokio::test]
async fn handle_broken_pipe() {
impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncWrite for MockTransport {
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

impl AsyncWrite for MockTransport {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
}

let transport = MockTransport {};
let mut ctx =
crate::service::rtu::connect_slave(transport, crate::service::rtu::Slave::broadcast())
.await
.unwrap();
let res = ctx
#[tokio::test]
async fn handle_broken_pipe() {
let transport = MockTransport;
let mut client =
crate::service::rtu::Client::new(transport, crate::service::rtu::Slave::broadcast());
let res = client
.call(crate::service::rtu::Request::ReadCoils(0x00, 5))
.await;
assert!(res.is_err());
Expand Down

0 comments on commit d4f07b3

Please sign in to comment.