Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transaction statements out of order #2805

Open
pythoneer opened this issue Oct 7, 2023 · 8 comments
Open

transaction statements out of order #2805

pythoneer opened this issue Oct 7, 2023 · 8 comments
Labels

Comments

@pythoneer
Copy link

pythoneer commented Oct 7, 2023

Bug Description

Not really sure what exactly the bug is but i am seeing strange behavior that makes it look like sql statements are not executed in the correct order. I have a reproduction here ( https://github.com/pythoneer/sqlx_test ) , but it is not really minimal. It uses axum and some strange wrk stuff to trigger it, because that is where i originally encountered that behavior, not sure if it could be further reduced or if it has something to do with how axum operates specifically.

I am just trying to do this in a handler

https://github.com/pythoneer/sqlx_test/blob/c220fda7c5f60fc0ef22f7ba64f3436fac7063e6/src/main2.rs#L41-L59

async fn test(State(pool): State<PgPool>) -> Result<String, (StatusCode, String)> {
    sleep(Duration::from_millis(2)).await;

    let mut transaction = pool.begin().await.map_err(internal_error)?;
    
    // -----------------------8<--------- this is optional ---------------->8----------
    transaction
        .execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
        .await
        .map_err(|err| panic!("Failed to set transaction isolation level: {:#?}", err))?;

    let data: String = sqlx::query_scalar("select 'hello world from pg'")
        .fetch_one(&pool)
        .await
        .map_err(internal_error)?;
    // -----------------------8<--------- this is optional ---------------->8----------

    transaction.commit().await.map_err(internal_error)?;

    Ok(data)
}

The SET TRANSACTION... and the select 'hello .. can also be removed it will already trigger the problem with just

let mut transaction = pool.begin().await.map_err(internal_error)?;
let transaction.commit().await.map_err(internal_error)?;

but it helps demonstrating the problem.

by running

DATABASE_URL="<your_url>" cargo run --bin binary2

and

./parallel_run.sh

to hit the endpoint we can sporadically see

2023-10-07T20:07:04.706970Z  WARN sqlx::postgres::notice: there is already a transaction in progress
2023-10-07T20:07:08.272585Z  WARN sqlx::postgres::notice: there is no transaction in progress

which i can't really explain. The only way i can think of is if the statements send to the database are not in order? Normally we would see BEGIN; COMMIT; pairs send parallel in each connection, right? But the warnings make me assume that sometime BEGIN; BEGIN; COMMIT; or BEGIN; COMMIT; COMMIT; happens.

With the additional SET TRANSACTION ... and select 'hello .. we can also see panics that i create with

.map_err(|err| panic!("Failed to set transaction isolation level: {:#?}", err))?;

that shows

        code: "25001",
        message: "SET TRANSACTION ISOLATION LEVEL must be called before any query",

and the same problem here, i can only explain this (just playing around in psql) if the select 'hello .. is executed before SET TRANSACTION ....

sqlx.mp4

Also the values (like the sleep and all the wrk parameters and connection pool size) might depend on the specific machine idk, i tuned them to be working the best on my system. I think this can change from system to system. I think it worked the best when the endpoint delivered around 2500 req/s. The reason why i am doing the "funny" stuff in parallel_run.sh is because i noticed that this is somehow triggered specifically in the beginning or the end of a wrk run, not really sure why and what is happening in detail. Maybe wrk "just" kills connection in the middle and axum reacts strange to killed connections while a handler is running idk. But i would say regardless of what axum is doing i don't expect any of the observed things to happen. You can trigger this without parallel_run.sh but you might need to wait and potentially start and stop the wrk command manually fast consecutively. That is basically what parallel_run.sh is doing.

Minimal Reproduction

https://github.com/pythoneer/sqlx_test

needs wrk installed for parallel_run.sh.

DATABASE_URL="<your_url>" cargo run --bin binary2

and

./parallel_run.sh

Info

  • SQLx version: 0.7.2
  • SQLx features enabled:
    "postgres",
    "runtime-tokio-rustls",
    "macros", "migrate",
    "chrono", "json", "uuid",
  • Database server and version: PostgreSQL 15.4 on x86_64-pc-linux-musl, compiled by gcc (Alpine 12.2.1_git20220924-r10) 12.2.1 20220924, 64-bit
  • Operating system: arch linux (x86_64 Linux 6.5.5-arch1-1)
  • rustc --version: rustc 1.72.0 (5680fa18f 2023-08-23)
@pythoneer pythoneer added the bug label Oct 7, 2023
@abonander
Copy link
Collaborator

abonander commented Oct 7, 2023

This query executes on a different connection since you pass the Pool as the executor:

    let data: String = sqlx::query_scalar("select 'hello world from pg'")
        .fetch_one(&pool)
        .await
        .map_err(internal_error)?;

I suspect this may be due to cancellation, as when a connection is closed mid-request Axum will cancel the handler future. I'm guessing when you kill the wrk process, it doesn't wait for in-flight requests to complete and just closes the connections right away.

The cancellation could be leaving database connections in weird states.

@pythoneer
Copy link
Author

pythoneer commented Oct 7, 2023

Sorry, silly mistake for the repro. It should be

    let data: String = sqlx::query_scalar("select 'hello world from pg'")
        .fetch_one(&mut *transaction)
        .await
        .map_err(internal_error)?;

of course. Unfortunately for the repro it does not seem to trigger it anymore in its current config. But its still happening in my "real" code where i also do use the created transaction and not the pool for any of the queries etc. the there is already a transaction in progress etc. are still there and i think all are based on the same phenomenon.

@pythoneer
Copy link
Author

pythoneer commented Oct 9, 2023

I could further reduce the repro to this https://github.com/pythoneer/sqlx_test/blob/main/src/main1.rs
DATABASE_URL="<your_url>" cargo run --bin binary1
It aborts the future just in the middle which is what we both suspected is happening with axum and wrk closing connections in the middle of the requets.

Looking at the postgres logs (i enabled statement logging) i have this

2023-10-09 09:59:32.403 UTC [532] LOG:  statement: BEGIN
2023-10-09 09:59:32.403 UTC [532] LOG:  statement: COMMIT
2023-10-09 09:59:32.403 UTC [532] LOG:  statement: ROLLBACK
2023-10-09 09:59:32.404 UTC [532] WARNING:  there is no transaction in progress
2023-10-09 09:59:32.451 UTC [533] LOG:  statement: BEGIN
2023-10-09 09:59:32.451 UTC [533] LOG:  statement: COMMIT
2023-10-09 09:59:32.451 UTC [533] LOG:  statement: ROLLBACK
2023-10-09 09:59:32.452 UTC [533] WARNING:  there is no transaction in progress

Which does not look correct and is what i suspected in the beginning. Looking at the code for the drop implementation for a transaction it seems to work differently than e.g. a simple commit. Did not go further down the line but it looks like that invocation is deferred into a queue(?) and executed later.

@FSMaxB
Copy link
Contributor

FSMaxB commented Oct 9, 2023

This seems to be an issue with cancellation safety of transactions in sqlx. I found a self-contained way of reproducing the issue.

Cargo.toml:

[package]
name = "sqlx_test"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
rand = "0.8"
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["macros", "time", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = "0.3.17"
sqlx = {version = "=0.7.2", default-features = false, features = ["postgres", "runtime-tokio-rustls"]}

main.rs:

use clap::Parser;
use rand::{thread_rng, Rng};
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::Instant;

use tracing::{error, info, info_span, Instrument, Level};

#[derive(Parser, Debug)]
struct CommandlineArguments {
    #[arg(long, default_value_t = 100)]
    tasks: usize,
    #[arg(long, default_value_t = 5)]
    runtime_seconds: u64,
    #[arg(long, default_value_t = 100)]
    max_keep_transaction_milliseconds: u64,
    #[arg(long, default_value_t = 100)]
    max_drop_after_milliseconds: u64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let arguments = CommandlineArguments::parse();

    let runtime = Duration::from_secs(arguments.runtime_seconds);
    let max_keep_transaction = Duration::from_millis(arguments.max_keep_transaction_milliseconds);
    let max_drop_after = Duration::from_millis(arguments.max_drop_after_milliseconds);

    tracing_subscriber::fmt().with_max_level(Level::INFO).init();

    info!(?arguments, "Commandline");

    let pool =
        Arc::new(PgPool::connect("postgres://postgres:postgres@localhost:51337/postgres").await?);

    let mut join_set = JoinSet::new();
    for index in 0..arguments.tasks {
        let span = info_span!("task", index);
        join_set.spawn(
            task(pool.clone(), max_keep_transaction, max_drop_after, runtime).instrument(span),
        );
    }

    while let Some(res) = join_set.join_next().await {
        if let Err(err) = res {
            error!("Task failed: {:?}", err);
        }
    }

    Ok(())
}

async fn task(
    pool: Arc<PgPool>,
    max_keep_transaction: Duration,
    max_drop_after: Duration,
    runtime: Duration,
) -> anyhow::Result<()> {
    let start = Instant::now();
    while start.elapsed() < runtime {
        let operation = Operation::random(pool.clone(), max_keep_transaction, max_drop_after);

        let span = info_span!(
            "operation",
            keep_transaction_for = operation.keep_transaction_for.as_millis(),
            drop_after = operation.drop_after.as_millis()
        );

        operation.run().instrument(span).await?;
    }

    Ok(())
}

struct Operation {
    pool: Arc<PgPool>,
    keep_transaction_for: Duration,
    drop_after: Duration,
}

impl Operation {
    fn random(pool: Arc<PgPool>, max_keep_transaction: Duration, max_drop_after: Duration) -> Self {
        let mut rng = thread_rng();

        Self {
            pool,
            keep_transaction_for: rng.gen_range(Duration::ZERO..=max_keep_transaction),
            drop_after: rng.gen_range(Duration::ZERO..=max_drop_after),
        }
    }

    async fn run(self) -> anyhow::Result<()> {
        tokio::select! {
            result = self.start_transaction_and_commit() => {
                result
            }
            _ = tokio::time::sleep(self.drop_after) => {
                // drops the open transaction from the other branch
                Ok(())
            }
        }
    }

    async fn start_transaction_and_commit(&self) -> anyhow::Result<()> {
        let begin_span = info_span!("begining transaction");
        let transaction = self.pool.begin().instrument(begin_span).await?;

        let inside_transaction_span = info_span!("inside transaction");
        tokio::time::sleep(self.keep_transaction_for)
            .instrument(inside_transaction_span)
            .await;

        let commit_span = info_span!("committing transaction");
        transaction.commit().instrument(commit_span).await?;
        Ok(())
    }
}

docker-compose.yml:

version: "3.2"

services:
  db:
    image: postgres:15-alpine
    container_name: sqlx_repro_db
    restart: unless-stopped
    tty: true
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "51337:5432"

This is what the output looks like (with cargo run --release):

2023-10-09T12:19:07.402371Z  INFO sqlx_test: Commandline arguments=CommandlineArguments { tasks: 100, runtime_seconds: 5, max_keep_transaction_milliseconds: 100, max_drop_after_milliseconds: 100 }
2023-10-09T12:19:07.493629Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.679796Z  WARN task{index=50}:operation{keep_transaction_for=61 drop_after=99}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:07.787577Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.869631Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.874933Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.130887Z  WARN task{index=78}:operation{keep_transaction_for=76 drop_after=91}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:08.208183Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.370002Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.420780Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.688807Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.790763Z  WARN task{index=90}:operation{keep_transaction_for=96 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.160745Z  WARN task{index=30}:operation{keep_transaction_for=36 drop_after=91}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.388145Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:09.621769Z  WARN task{index=59}:operation{keep_transaction_for=10 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.630659Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:09.754287Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.040783Z  WARN task{index=38}:operation{keep_transaction_for=80 drop_after=70}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:10.049120Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.264293Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.283822Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.644704Z  WARN task{index=53}:operation{keep_transaction_for=93 drop_after=79}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:10.764836Z  WARN task{index=60}:operation{keep_transaction_for=29 drop_after=78}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.070738Z  WARN task{index=25}:operation{keep_transaction_for=74 drop_after=98}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.074662Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.267963Z  WARN task{index=34}:operation{keep_transaction_for=65 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.270800Z  WARN task{index=59}:operation{keep_transaction_for=58 drop_after=81}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.278312Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.431716Z  WARN task{index=0}:operation{keep_transaction_for=82 drop_after=65}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.614500Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.865754Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.070070Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.084736Z  WARN task{index=37}:operation{keep_transaction_for=37 drop_after=93}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:12.137604Z  WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.298898Z  WARN task{index=42}:operation{keep_transaction_for=42 drop_after=88}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress

@FSMaxB
Copy link
Contributor

FSMaxB commented Oct 9, 2023

Note that in axum (and everything using hyper AFAIK), from what I understand every request is spawned as it's own task that is cancelled when the connection drops.

I'm just simulating a dropping connection by using tokio::select and dropping the future that performs a transaction.

@pythoneer
Copy link
Author

Not very familiar with the code base but it looks like this part (and the sibling functions) are not safely increasing or decreasing the conn.transaction_depth

fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
conn.execute(&*begin_ansi_transaction_sql(conn.transaction_depth))
.await?;
conn.transaction_depth += 1;
Ok(())
})
}

What happens if the BEGIN is send but conn.transaction_depth is not increased yet when the transaction is dropped? We would not ROLLBACK right?

@si14
Copy link

si14 commented Oct 11, 2023

Seemingly related: #2054

@1500256797
Copy link

2023-11-23T12:04:50.581515Z TRACE sqlx::query: summary: "INSERT INTO credits (twitter_id, …", db.statement: "\n\nINSERT INTO\n credits (twitter_id, activity_id, credits, updated_at)\nVALUES\n ($1, $2, $3, $4) ON CONFLICT (twitter_id, activity_id) DO\nUPDATE\nSET\n credits = $3,\n updated_at = $4\n", rows_affected: 0, rows_returned: 0, elapsed: 2.27525ms at /Users/ouhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/sqlx-core-0.7.1/src/logger.rs:117 in tower_http::trace::make_span::request with method: POST, uri: /api/twitter/verify/twitter_name, version: HTTP/1.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants