Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to understand asynchronous client. #53

Closed
glennpierce opened this issue Feb 21, 2020 · 5 comments
Closed

Trying to understand asynchronous client. #53

glennpierce opened this issue Feb 21, 2020 · 5 comments

Comments

@glennpierce
Copy link

Hi I am having trouble understanding how or the best way to make many requests in a multi threaded way.
Basically I was trying to simulate talking to a couple of thousand modbus devices as quickly as possible. As I don't have those devices :) I was running your tcp-server.rs example with added delays of 60-80 ms. This seems reasonable as a simulation.

My trouble is developing the client code.

I want all the requests to be asynchronous. Once they all complete I would store these results into a db.

Would you recommend running code like

let socket_addr = "192.168.0.222:502".parse().unwrap();
let mut ctx = sync::tcp::connect(socket_addr).unwrap();
let buff = ctx.read_input_registers(0x1000, 7).unwrap();

in many threads and having each result send back through a channel ? or is there a better way built in using tokio async ?

Thanks

@glennpierce
Copy link
Author

I also believe there is an issue with sync::tcp::connect not closing the socket correctly.

I have the following code that eventually fails with too many open files error.
I have tested this with modbus-rs and it works.

extern crate chrono;
extern crate crossbeam_channel;

use std::sync::{Arc, Mutex};
use tokio_core::reactor::Core;
use futures::Future;
use tokio_modbus::prelude::*;
use systemd::daemon;

use chrono::{DateTime, NaiveDate, TimeZone, Utc};
use std::thread;

#[derive(Clone, Debug)]
struct MeterResult {
    name: String,
    ip: String,
    result: Option<u32>,
    last_dt: Option<DateTime<Utc>>
}

type MeterValue = (MeterResult, u32);

const NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_METERS: usize = 1000;

pub fn main() {
    let mut meters: Vec<MeterResult> = vec![];

    for i in 1..NUMBER_OF_METERS {
        let meter: MeterResult = MeterResult {name: format!("bi{}", i),
                                              ip: "127.0.0.1:5502".to_string(),
                                              result: None,
                                              last_dt: None
                                             };

        meters.push(meter);
    }
let mut values = Arc::new(Mutex::new(vec![]));
    let mut chunks = meters.chunks(NUMBER_OF_THREADS);

    for chunk in chunks {

        let mut threads = vec![];

        for meter in chunk {
            threads.push(thread::spawn( {

                let clone = Arc::clone(&values);
                let m = meter.clone();
                    move || {

                        let socket_addr = "127.0.0.1:5502".parse().unwrap();
                        let mut ctx = sync::tcp::connect(socket_addr).unwrap();
                        //let data = ctx.read_holding_registers(0x16, 2).unwrap();
                        //let value : u32 = ((data[1] as u32) << 16) | (data[0] as u32);
                        let mut v = clone.lock().unwrap();
                        v.push((m.clone(), 2));
                        //ctx.disconnect();
        //                drop(ctx);
        //                drop(socket_addr);
                    }
                }));
        }

        for t in threads {
            t.join().unwrap();
        }
        println!("batch done");
    }
}

@DrSloth
Copy link

DrSloth commented Jan 31, 2021

For everyone encountering this issue:
I once experienced a similar problem with the sync client. If you use tokio driven libraries you are actually best off to use them with async/await. One solution for the first question of this issue, is to use tokio::spawn to spawn tasks and at the end join them using the futures crate. This solution lets tokio decide how many threads to use and to concurrently run multiple tasks on one thread if for instance one of the slaves takes long to respond to the request.

@aleksuss
Copy link

aleksuss commented Dec 3, 2021

I can't figure out how to make async requests via one Context. The problem is read_* requests expect mutable reference to Context. I'd like to write something like that:

    let mut context = tokio_modbus::client::tcp::connect("127.0.0.1:502".parse()?).await?;
    let (a, b) = tokio::join!(
        context.read_holding_registers(1, 1),
        context.read_holding_registers(2, 1),
    );
    
    context.disconnect().await?;
    dbg!(a, b);

I've tried to create a context for every request but a server doesn't like this. I thought about to wrap context in Arc<RwLock> but I think it's a not good idea.

@DrSloth
Copy link

DrSloth commented Dec 3, 2021

Because pretty much everything is uses &mut self you can just use Arc<Mutex<Context>>, i would recommend the tokio Mutex.

Internal state is mutated when making any networking request, reading from a std::net::TcpStream also requires it to be mutable for instance. It is generally not possible to make concurrent/parallel requests over the same connection.

For concurrent writes to the same server you would need two contexts but many devices can't handle parallel requests correctly.

Hope that helps.

@flosse
Copy link
Member

flosse commented Sep 12, 2024

I consider the issue to be resolved.
Feel free to open it again if it is not.

@flosse flosse closed this as completed Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants