Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow server to handle slave address #141

Merged
merged 9 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ name = "rtu-server"
path = "examples/rtu-server.rs"
required-features = ["rtu", "server"]

[[example]]
name = "rtu-server-address"
path = "examples/rtu-server-address.rs"
required-features = ["rtu", "server"]

[[example]]
name = "tcp-client-custom-fn"
path = "examples/tcp-client-custom-fn.rs"
Expand Down
63 changes: 63 additions & 0 deletions examples/rtu-server-address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-FileCopyrightText: Copyright (c) 2017-2022 slowtec GmbH <post@slowtec.de>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! RTU server example

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use futures::future;
use std::{thread, time::Duration};

use tokio_modbus::prelude::*;
use tokio_modbus::server::{self, Service};

struct MbServer {
slave: Slave,
}

impl Service for MbServer {
type Request = SlaveRequest;
type Response = Option<Response>;
type Error = std::io::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
if req.slave != self.slave.into() {
return future::ready(Ok(None));
}
match req.request {
Request::ReadInputRegisters(_addr, cnt) => {
let mut registers = vec![0; cnt.into()];
registers[2] = 0x77;
future::ready(Ok(Some(Response::ReadInputRegisters(registers))))
}
_ => unimplemented!(),
}
}
}

let slave = Slave(12);
let builder = tokio_serial::new("/dev/ttyS10", 19200);
let server_serial = tokio_serial::SerialStream::open(&builder).unwrap();

println!("Starting up server...");
let _server = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let server = server::rtu::Server::new(server_serial);
rt.block_on(async {
server.serve_forever(move || Ok(MbServer { slave })).await;
});
});

// Give the server some time for stating up
thread::sleep(Duration::from_secs(1));

println!("Connecting client...");
let client_serial = tokio_serial::SerialStream::open(&builder).unwrap();
let mut ctx = rtu::connect_slave(client_serial, slave).await?;
println!("Reading input registers...");
let rsp = ctx.read_input_registers(0x00, 7).await?;
println!("The result is '{rsp:#x?}'"); // The result is '[0x0,0x0,0x77,0x0,0x0,0x0,0x0,]'

Ok(())
}
13 changes: 6 additions & 7 deletions src/codec/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{frame::rtu::*, slave::SlaveId};

use byteorder::BigEndian;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::{debug, error, warn};
use smallvec::SmallVec;
use std::io::{Cursor, Error, ErrorKind, Result};
use tokio_util::codec::{Decoder, Encoder};
Expand Down Expand Up @@ -46,7 +45,7 @@ impl FrameDecoder {
Ok(crc) => match check_crc(&adu_buf, crc) {
Ok(()) => {
if !self.dropped_bytes.is_empty() {
warn!(
log::warn!(
"Successfully decoded frame after dropping {} byte(s): {:X?}",
self.dropped_bytes.len(),
self.dropped_bytes
Expand Down Expand Up @@ -82,9 +81,9 @@ impl FrameDecoder {
// Skip and record the first byte of the buffer
{
let first = buf.first().unwrap();
debug!("Dropped first byte: {:X?}", first);
log::debug!("Dropped first byte: {:X?}", first);
if self.dropped_bytes.len() >= MAX_FRAME_LEN {
error!(
log::error!(
"Giving up to decode frame after dropping {} byte(s): {:X?}",
self.dropped_bytes.len(),
self.dropped_bytes
Expand Down Expand Up @@ -252,7 +251,7 @@ where
}
})
.or_else(|err| {
warn!("Failed to decode {} frame: {}", pdu_type, err);
log::warn!("Failed to decode {} frame: {}", pdu_type, err);
frame_decoder.recover_on_error(buf);
retry = true;
Ok(None)
Expand Down Expand Up @@ -280,7 +279,7 @@ impl Decoder for ClientCodec {
.map(|pdu| Some(ResponseAdu { hdr, pdu }))
.map_err(|err| {
// Unrecoverable error
error!("Failed to decode response PDU: {}", err);
log::error!("Failed to decode response PDU: {}", err);
err
})
} else {
Expand Down Expand Up @@ -318,7 +317,7 @@ impl Decoder for ServerCodec {
})
.map_err(|err| {
// Unrecoverable error
error!("Failed to decode request PDU: {}", err);
log::error!("Failed to decode request PDU: {}", err);
err
})
} else {
Expand Down
34 changes: 34 additions & 0 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ pub enum Request {
Disconnect,
}

/// A Modbus request with slave included
#[cfg(feature = "server")]
#[derive(Debug, Clone, PartialEq, Eq)]
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
pub struct SlaveRequest {
/// Slave Id from the request
pub slave: crate::slave::SlaveId,
/// A `Request` enum
pub request: Request,
}

/// The data of a successful request.
///
/// ReadCoils/ReadDiscreteInputs: The length of the result Vec is always a
Expand Down Expand Up @@ -251,6 +261,30 @@ impl From<Result<Response, ExceptionResponse>> for ResponsePdu {
}
}

#[cfg(feature = "server")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OptionalResponsePdu(pub(crate) Option<ResponsePdu>);

#[cfg(feature = "server")]
impl<T> From<Option<T>> for OptionalResponsePdu
where
T: Into<ResponsePdu>,
{
fn from(from: Option<T>) -> Self {
Self(from.map(Into::into))
}
}

#[cfg(feature = "server")]
impl<T> From<T> for OptionalResponsePdu
where
T: Into<ResponsePdu>,
{
fn from(from: T) -> Self {
Self(Some(from.into()))
}
}

impl From<ResponsePdu> for Result<Response, ExceptionResponse> {
fn from(from: ResponsePdu) -> Self {
from.0
Expand Down
18 changes: 17 additions & 1 deletion src/frame/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) struct Header {
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RequestAdu {
pub struct RequestAdu {
pub(crate) hdr: Header,
pub(crate) pdu: RequestPdu,
pub(crate) disconnect: bool,
Expand All @@ -22,3 +22,19 @@ pub(crate) struct ResponseAdu {
pub(crate) hdr: Header,
pub(crate) pdu: ResponsePdu,
}

impl From<RequestAdu> for Request {
fn from(from: RequestAdu) -> Self {
from.pdu.into()
}
}

#[cfg(feature = "server")]
impl From<RequestAdu> for SlaveRequest {
fn from(from: RequestAdu) -> Self {
Self {
slave: from.hdr.slave_id,
request: from.pdu.into(),
}
}
}
18 changes: 17 additions & 1 deletion src/frame/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct Header {
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RequestAdu {
pub struct RequestAdu {
pub(crate) hdr: Header,
pub(crate) pdu: RequestPdu,
pub(crate) disconnect: bool,
Expand All @@ -24,3 +24,19 @@ pub(crate) struct ResponseAdu {
pub(crate) hdr: Header,
pub(crate) pdu: ResponsePdu,
}

impl From<RequestAdu> for Request {
fn from(from: RequestAdu) -> Self {
from.pdu.into()
}
}

#[cfg(feature = "server")]
impl From<RequestAdu> for SlaveRequest {
fn from(from: RequestAdu) -> Self {
Self {
slave: from.hdr.unit_id,
request: from.pdu.into(),
}
}
}
3 changes: 3 additions & 0 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub mod tcp {
#[cfg(feature = "server")]
pub use crate::server;

#[cfg(feature = "server")]
uklotzde marked this conversation as resolved.
Show resolved Hide resolved
pub use crate::frame::SlaveRequest;

///////////////////////////////////////////////////////////////////
/// Structs
///////////////////////////////////////////////////////////////////
Expand Down
45 changes: 28 additions & 17 deletions src/server/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ impl Server {
}

/// serve Modbus RTU requests based on the provided service until it finishes
pub async fn serve_forever<S>(self, new_service: S)
pub async fn serve_forever<S, Req, Res>(self, new_service: S)
where
S: NewService<Request = Request, Response = Response> + Send + Sync + 'static,
S: NewService<Request = Req, Response = Res> + Send + Sync + 'static,
Req: From<rtu::RequestAdu> + Send,
Res: Into<OptionalResponsePdu> + Send,
S::Instance: Send + Sync + 'static,
S::Error: Into<Error>,
S::Instance: 'static + Send + Sync,
{
self.serve_until(new_service, futures::future::pending())
.await;
}

/// serve Modbus RTU requests based on the provided service until it finishes or a shutdown signal is received
pub async fn serve_until<S, Sd>(self, new_service: S, shutdown_signal: Sd)
pub async fn serve_until<S, Req, Res, Sd>(self, new_service: S, shutdown_signal: Sd)
where
S: NewService<Request = Request, Response = Response> + Send + Sync + 'static,
S: NewService<Request = Req, Response = Res> + Send + Sync + 'static,
Sd: Future<Output = ()> + Sync + Send + Unpin + 'static,
S::Request: From<Request>,
S::Response: Into<Response>,
S::Error: Into<Error>,
Req: From<rtu::RequestAdu> + Send,
Res: Into<OptionalResponsePdu> + Send,
S::Instance: Send + Sync + 'static,
S::Error: Into<Error>,
{
let framed = Framed::new(self.serial, codec::rtu::ServerCodec::default());
let service = new_service.new_service().unwrap();
Expand All @@ -74,12 +76,14 @@ impl Server {
}

/// frame wrapper around the underlying service's responses to forwarded requests
async fn process<S>(
async fn process<S, Req, Res>(
mut framed: Framed<SerialStream, codec::rtu::ServerCodec>,
service: S,
) -> Result<(), Error>
where
S: Service<Request = Request, Response = Response> + Send + Sync + 'static,
S: Service<Request = Req, Response = Res> + Send + Sync + 'static,
S::Request: From<rtu::RequestAdu> + Send,
S::Response: Into<OptionalResponsePdu> + Send,
S::Error: Into<Error>,
{
loop {
Expand All @@ -90,13 +94,20 @@ where
}?;

let hdr = request.hdr;
let response = service.call(request.pdu.0).await.map_err(Into::into)?;
framed
.send(rtu::ResponseAdu {
hdr,
pdu: response.into(),
})
.await?;
let response: OptionalResponsePdu = service
.call(request.into())
.await
.map_err(Into::into)?
.into();

match response.0 {
Some(pdu) => {
framed.send(rtu::ResponseAdu { hdr, pdu }).await?;
}
None => {
log::debug!("No response for request {hdr:?}");
}
}
}
Ok(())
}
Loading