Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yet another Tokio v1 PR #78

Merged
merged 3 commits into from Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -13,3 +13,6 @@ Cargo.lock

# vi files
*.swp

# VS Code config files
/.vscode
49 changes: 42 additions & 7 deletions Cargo.toml
Expand Up @@ -11,23 +11,23 @@ repository = "https://github.com/slowtec/tokio-modbus"
edition = "2018"

[dependencies]
bytes = "0.5"
bytes = "1"
byteorder = "1"
futures = { version = "0.3", optional = true }
futures-util = { version = "0.3", default-features = false }
log = "0.4"
net2 = { version = "0.2", optional = true, default-features = false }
smallvec = { version = "1", default-features = false }
# rt-core should be enabled only with "server" feature. Waiting for https://github.com/rust-lang/cargo/issues/5954
tokio = { version = "0.2", features = ["rt-core"] }
tokio-util = { version = "0.2", features = ["codec"] }
# rt should be enabled only with "server" feature. Waiting for https://github.com/rust-lang/cargo/issues/5954
tokio = { version = "1", features = ["rt", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["codec"] }
# Disable default-features to exclude unused dependency on libudev
tokio-serial = { version = "4.3.3", optional = true, default-features = false }
tokio-serial = { version = "5.4.0-beta1", optional = true, default-features = false }

[dev-dependencies]
env_logger = "0.7"
env_logger = "0.9"
futures = "0.3"
tokio = { version = "0.2", features = ["tcp", "macros", "io-util"] }
tokio = { version = "1", features = ["net", "macros", "io-util"] }

[features]
default = ["tcp", "rtu", "sync"]
Expand All @@ -41,3 +41,38 @@ tcp-server-unstable = ["server"]
travis-ci = { repository = "slowtec/tokio-modbus" }
coveralls = { repository = "slowtec/tokio-modbus", branch = "master", service = "github" }
maintenance = { status = "actively-developed" }

[[example]]
name = "rtu-client-shared-context"
path = "examples/rtu-client-shared-context.rs"
required-features = ["rtu"]

[[example]]
name = "rtu-client-sync"
path = "examples/rtu-client-sync.rs"
required-features = ["rtu", "sync"]

[[example]]
name = "rtu-client"
path = "examples/rtu-client.rs"
required-features = ["rtu"]

[[example]]
name = "tcp-client-custom-fn"
path = "examples/tcp-client-custom-fn.rs"
required-features = ["tcp"]

[[example]]
name = "tcp-client-sync"
path = "examples/tcp-client-sync.rs"
required-features = ["tcp", "sync"]

[[example]]
name = "tcp-client"
path = "examples/tcp-client.rs"
required-features = ["tcp"]

[[example]]
name = "tcp-server"
path = "examples/tcp-server.rs"
required-features = ["tcp", "server", "tcp-server-unstable"]
@@ -1,5 +1,4 @@
#[cfg(feature = "rtu")]
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use std::{cell::RefCell, future::Future, io::Error, pin::Pin, rc::Rc};

Expand All @@ -9,20 +8,19 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
Context,
};
use tokio_modbus::prelude::*;
use tokio_serial::{Serial, SerialPortSettings};
use tokio_serial::{SerialPortBuilder, SerialStream};

const SLAVE_1: Slave = Slave(0x01);
const SLAVE_2: Slave = Slave(0x02);

#[derive(Debug)]
struct SerialConfig {
path: String,
settings: SerialPortSettings,
builder: SerialPortBuilder,
}

impl NewContext for SerialConfig {
fn new_context(&self) -> Pin<Box<dyn Future<Output = Result<Context, Error>>>> {
let serial = Serial::from_path(&self.path, &self.settings);
let serial = SerialStream::open(&self.builder);
Box::pin(async {
let port = serial?;
rtu::connect(port).await
Expand All @@ -31,11 +29,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

let serial_config = SerialConfig {
path: "/dev/ttyUSB0".into(),
settings: SerialPortSettings {
baud_rate: 19200,
..Default::default()
},
builder: tokio_serial::new("/dev/ttyUSB0", 19200),
};
println!("Configuration: {:?}", serial_config);

Expand Down Expand Up @@ -68,9 +62,3 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[cfg(not(feature = "rtu"))]
pub fn main() {
println!("feature `rtu` is required to run this example");
std::process::exit(1);
}
15 changes: 15 additions & 0 deletions examples/rtu-client-sync.rs
@@ -0,0 +1,15 @@
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_modbus::prelude::*;

let tty_path = "/dev/ttyUSB0";
let slave = Slave(0x17);

let builder = tokio_serial::new(tty_path, 19200);

let mut ctx = sync::rtu::connect_slave(&builder, slave)?;
println!("Reading a sensor value");
let rsp = ctx.read_holding_registers(0x082B, 2)?;
println!("Sensor value is: {:?}", rsp);

Ok(())
}
18 changes: 5 additions & 13 deletions examples/rtu-client.rs
@@ -1,16 +1,14 @@
#[cfg(feature = "rtu")]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_serial::{Serial, SerialPortSettings};
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_serial::SerialStream;

use tokio_modbus::prelude::*;

let tty_path = "/dev/ttyUSB0";
let slave = Slave(0x17);

let mut settings = SerialPortSettings::default();
settings.baud_rate = 19200;
let port = Serial::from_path(tty_path, &settings).unwrap();
let builder = tokio_serial::new(tty_path, 19200);
let port = SerialStream::open(&builder).unwrap();

let mut ctx = rtu::connect_slave(port, slave).await?;
println!("Reading a sensor value");
Expand All @@ -19,9 +17,3 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[cfg(not(feature = "rtu"))]
pub fn main() {
println!("feature `rtu` is required to run this example");
std::process::exit(1);
}
11 changes: 2 additions & 9 deletions examples/tcp-client-custom-fn.rs
@@ -1,5 +1,4 @@
#[cfg(feature = "tcp")]
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_modbus::prelude::*;

Expand All @@ -15,15 +14,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Result for function {} is '{:?}'", f, rsp);
}
_ => {
panic!("unexpeted result");
panic!("unexpected result");
}
}

Ok(())
}

#[cfg(not(feature = "tcp"))]
pub fn main() {
println!("feature `tcp` is required to run this example");
std::process::exit(1);
}
9 changes: 1 addition & 8 deletions examples/tcp-client.rs
@@ -1,5 +1,4 @@
#[cfg(feature = "tcp")]
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_modbus::prelude::*;

Expand All @@ -20,9 +19,3 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[cfg(not(feature = "tcp"))]
pub fn main() {
println!("feature `tcp` is required to run this example");
std::process::exit(1);
}
9 changes: 1 addition & 8 deletions examples/tcp-server.rs
@@ -1,5 +1,4 @@
#[cfg(all(feature = "tcp", feature = "server"))]
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
use futures::future;
use std::{thread, time::Duration};
Expand Down Expand Up @@ -44,9 +43,3 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[cfg(not(all(feature = "tcp", feature = "server")))]
pub fn main() {
println!("both `tcp` and `server` features is required to run this example");
std::process::exit(1);
}
18 changes: 8 additions & 10 deletions src/client/sync/rtu.rs
Expand Up @@ -3,22 +3,20 @@ use super::{Context, Result};
use crate::client::rtu::connect_slave as async_connect_slave;
use crate::slave::Slave;

use tokio_serial::{Serial, SerialPortSettings};
use tokio_serial::{SerialPortBuilder, SerialStream};

/// Connect to no particular Modbus slave device for sending
/// broadcast messages.
pub fn connect(tty_path: &str, settings: &SerialPortSettings) -> Result<Context> {
connect_slave(tty_path, settings, Slave::broadcast())
pub fn connect(builder: &SerialPortBuilder) -> Result<Context> {
connect_slave(builder, Slave::broadcast())
}

/// Connect to any kind of Modbus slave device.
pub fn connect_slave(
tty_path: &str,
settings: &SerialPortSettings,
slave: Slave,
) -> Result<Context> {
let mut rt = tokio::runtime::Runtime::new()?;
let serial = Serial::from_path(tty_path, settings)?;
pub fn connect_slave(builder: &SerialPortBuilder, slave: Slave) -> Result<Context> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
let serial = SerialStream::open(builder).unwrap();
let async_ctx = rt.block_on(async_connect_slave(serial, slave))?;
let sync_ctx = Context {
core: rt,
Expand Down
4 changes: 3 additions & 1 deletion src/client/sync/tcp.rs
Expand Up @@ -13,7 +13,9 @@ pub fn connect(socket_addr: SocketAddr) -> Result<Context> {
/// gateway that is forwarding messages to/from the corresponding unit identified
/// by the slave parameter.
pub fn connect_slave(socket_addr: SocketAddr, slave: Slave) -> Result<Context> {
let mut rt = tokio::runtime::Runtime::new()?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()?;
let async_ctx = rt.block_on(async_connect_slave(socket_addr, slave))?;
let sync_ctx = Context {
core: rt,
Expand Down
4 changes: 2 additions & 2 deletions src/codec/mod.rs
Expand Up @@ -462,8 +462,8 @@ mod tests {

#[test]
fn convert_coil_to_bool() {
assert_eq!(coil_to_bool(0xFF00).unwrap(), true);
assert_eq!(coil_to_bool(0x0000).unwrap(), false);
assert!(coil_to_bool(0xFF00).unwrap());
assert!(!coil_to_bool(0x0000).unwrap());
}

#[test]
Expand Down
9 changes: 5 additions & 4 deletions src/codec/rtu.rs
Expand Up @@ -191,6 +191,9 @@ fn calc_crc(data: &[u8]) -> u16 {
for x in data {
crc ^= u16::from(*x);
for _ in 0..8 {
// if we followed clippy's suggestion to move out the crc >>= 1, the condition may not be met any more
// the recommended action therefore makes no sense and it is better to allow this lint
#[allow(clippy::branches_sharing_code)]
if (crc & 0x0001) != 0 {
crc >>= 1;
crc ^= 0xA001;
Expand Down Expand Up @@ -343,8 +346,7 @@ impl Decoder for ServerCodec {
}
}

impl Encoder for ClientCodec {
type Item = RequestAdu;
impl Encoder<RequestAdu> for ClientCodec {
type Error = Error;

fn encode(&mut self, adu: RequestAdu, buf: &mut BytesMut) -> Result<()> {
Expand All @@ -369,8 +371,7 @@ impl Encoder for ClientCodec {
}
}

impl Encoder for ServerCodec {
type Item = ResponseAdu;
impl Encoder<ResponseAdu> for ServerCodec {
type Error = Error;

fn encode(&mut self, adu: ResponseAdu, buf: &mut BytesMut) -> Result<()> {
Expand Down
6 changes: 2 additions & 4 deletions src/codec/tcp.rs
Expand Up @@ -128,8 +128,7 @@ impl Decoder for ServerCodec {
}
}

impl Encoder for ClientCodec {
type Item = RequestAdu;
impl Encoder<RequestAdu> for ClientCodec {
type Error = Error;

fn encode(&mut self, adu: RequestAdu, buf: &mut BytesMut) -> Result<()> {
Expand All @@ -154,8 +153,7 @@ impl Encoder for ClientCodec {
}
}

impl Encoder for ServerCodec {
type Item = ResponseAdu;
impl Encoder<ResponseAdu> for ServerCodec {
type Error = Error;

fn encode(&mut self, adu: ResponseAdu, buf: &mut BytesMut) -> Result<()> {
Expand Down