Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't run an async function that operate MySQL via Connection Pool. #8

Closed
rts-gordon opened this issue Dec 31, 2021 · 8 comments
Closed

Comments

@rts-gordon
Copy link

Hi @mvniekerk,
Thank you for offering the wonderful project "tokio-cron-scheduler".
I use tokio-cron-scheduler run an async function to write MySQL, but there are some errors, would you like to have a look at those, thank you very much.

Code:

#[tokio::main]
async fn main()  {
    let db_pool = connect("mysql://root:123456@localhost:3306", 10, 30).await.unwrap();

    let mut sched = JobScheduler::new();
    let job_async = Job::new_async("0 */1 * * * * *", |_uuid, _l| Box::pin(async move {
            cron_sync_ohlc_to_db(&db_pool).await.unwrap_or(());
        })).unwrap();
    sched.add(job_async).unwrap();
    tokio::spawn(sched.start());

    let server = TcpListener::bind("127.0.0.1:3000").await.unwrap();
    println!("Sync listening on: 127.0.0.1:3000");

    loop {
            server.accept().await.unwrap();
    }
}

pub async fn connect(url: &str, min_conn: u32, max_conn: u32) -> Result<MySqlPool, sqlx::Error> {
    println!("Connecting to MySql...");

    let pool = MySqlPoolOptions::new()
    .min_connections(min_conn)
    .max_connections(max_conn)
    .connect(url).await?;

    Ok(pool)
}

pub async fn cron_sync_ohlc_to_db(
    db_pool: &MySqlPool,
) -> anyhow::Result<()> {
    println!("Begin to cron sync ohlc data to databse...");

    let mut tx = db_pool.begin().await.unwrap();
    let sql_string = "INSERT INTO test1 (u,a,b,c,d) VALUES (1613347800, 1, 2, 3, 4)";
    let sql_query = sqlx::query(&sql_string)
        .execute(&mut tx)
        .await?
        .last_insert_id();

    tx.commit().await.unwrap();

    Ok(())
}

Compile errors:

 `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely

More details:

error[E0277]: `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely
  --> src/main.rs:24:9
   |
24 | /         Box::pin(async move {
25 | |             cron_sync_ohlc_to_db(&db_pool).await.unwrap_or(());
26 | |         })
   | |__________^ `dyn Future<Output = Result<bool, sqlx::Error>> + Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn Future<Output = Result<bool, sqlx::Error>> + Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn Future<Output = Result<bool, sqlx::Error>> + Send>`
   = note: required because it appears within the type `Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>`
   = note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>>`      
   = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}`
   = note: required because it appears within the type `[static generator@pool::inner::check_conn<'_, '<empty>, MySql>::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@pool::inner::check_conn<'_, '<empty>, MySql>::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13> {ResumeTy, pool::connection::Floating<'r, pool::connection::Idle<MySql>>, &'s PoolOptions<MySql>, impl Future, (), &'t2 mut pool::connection::Floating<'t3, pool::connection::Idle<MySql>>, impl Future, Result<(), sqlx::Error>, sqlx::Error, PoolOptions<MySql>, std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> 
+ Send>> + Send + Sync + 't7)>>, &'t8 std::option::Option<Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't9)>>, &'t10 Box<(dyn for<'t14> Fn(&'t14 mut MySqlConnection) -> Pin<Box<dyn Future<Output = Result<bool, sqlx::Error>> + Send>> + Send + Sync + 't11)>, pool::connection::Idle<MySql>, pool::connection::Live<MySql>, MySqlConnection, &'t12 mut MySqlConnection, Pin<Box<(dyn Future<Output = Result<bool, sqlx::Error>> + Send + 't13)>>, Result<bool, sqlx::Error>, bool}]>`
@mvniekerk
Copy link
Owner

Hi @Chcp
Thanks for the bug report. Can you perhaps provide an example repository that replicates your issue, that I can maybe look into it?

@rts-gordon
Copy link
Author

@mvniekerk
Thanks for your reply.

The source code is simple, there is the repository, it can't be compiled because of errors.

scheduler-test

@mvniekerk
Copy link
Owner

I had a quick look. Seems like the linked list implementation used somewhere in the sqlx crate was not marked as Sync / Send... This is bad news for your implementation.
I also need to have something similar (cron job + sql commands). Will let you know on this .

@rts-gordon
Copy link
Author

Hi @mvniekerk
May I know the progress of this issue? Thanks for you help.

@saabye-io
Copy link

Hi
I have the same issue, I think. It's just with the tokio_modbus Client:

future cannot be shared between threads safely
the trait `Sync` is not implemented for `(dyn tokio_modbus::prelude::Client + 'static)`

Any work around is appreciated.

@saabye-io
Copy link

An easy workaround is to send a message in the job, using mpsc:

    sched.add(Job::new_async("1/10 * * * * *", move |_uuid, _l| {
        let tx = tx.clone();
        Box::pin(async move {
            tx.send(1).await.unwrap();
        }
    })}).unwrap()).unwrap();

And then do the action in another thread:

    let (tx, mut rx) =  mpsc::channel::<u8>(1);
    tokio::spawn(async move {
        loop {
            for _ in rx.recv().await {
                do_whatever_wasnt_possible_before().await;
            }
        }
    });

This works for me at least.

@rts-gordon
Copy link
Author

It is a possible way, I will try this. Thanks for sharing code.

@mvniekerk
Copy link
Owner

Thank you @saabye-io - that's about the only solution right now is if you "escape" out of sync+send land into non-sync+send land using mpsc.
I'm going to close this ticket @Chcp because there's not much I can do. Thank you for the bug report - I'm sure this is a gotcha that others will also encounter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants