Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query pipelining #408

Open
farnoy opened this issue Jun 13, 2020 · 10 comments
Open

Query pipelining #408

farnoy opened this issue Jun 13, 2020 · 10 comments
Labels
enhancement New feature or request low priority This issue exists to acknowledge a proposal but isn't being worked on

Comments

@farnoy
Copy link

farnoy commented Jun 13, 2020

Currently, query execution functions require either a const borrow of a connection pool, or a mutable borrow of a Transaction. This makes it impossible to prepare multiple queries and execute them in a pipelined fashion within a single transaction. tokio-postgres has this and it's an impressive feature.

@mehcode
Copy link
Member

mehcode commented Jun 13, 2020

I'm wary of implicit query batching. In tokio-postgres, batching happens automatically if multiple futures from query executions are polled concurrently.

Supporting query pipelining is fairly simple at the protocol level, nearly all major sync and async clients (in other languages, in Rust, it seems only tokio-postgres does) support pipelining.

It's not the priority currently as we tend to write API servers and execute at most 2-3, complex queries per request.

With that said, I do want to see this supported in SQLx in an explicit form once we figure out the details of how that form looks. What follows is what I'm currently thinking so it's available for discussion.

// Batch or Pipeline or ?
let b = Batch::new(&mut conn);

let q1 = sqlx::query("SELECT 1").fetch_one(&b);
let q2 = sqlx::query("SELECT 2").fetch_all(&b);
let q3 = sqlx::query("SELECT 3").fetch_optional(&b);

// run all queries in a try_join_all
// this would entirely be optional sugar over using try_join_all on all the queries
b.join().await?;

// these now return immediately
let v1 = q1.await?;
let v2 = q2.await?;
let v3 = q3.now_or_never().unwrap()?;

@mehcode mehcode added enhancement New feature or request low priority This issue exists to acknowledge a proposal but isn't being worked on labels Jun 13, 2020
@farnoy
Copy link
Author

farnoy commented Jun 14, 2020 via email

@unrealhoang
Copy link

unrealhoang commented Jun 17, 2020

@mehcode
In your example, if q2.await? is called before q1.await?, you have to either allocate to store the result of q1 or skip/ignore the response of q1 in the response stream and return error on q1.await?. Also you need to be in charge of mapping between request / response which is also quite cumbersome.

My suggestion:
As batch/pipelining is only make sense in case of sending a lot of query constantly, you push all those responsibility back to the user.
Batch should be a Sink<Query> for sending side and a Stream<Item = Cursor> for receiving size. With that we will be able to:

// Batch insert
// futures::stream::StreamExt
let (s, r) = Batch::new(&mut conn).split();

let item_stream = stream::iter(items.into_iter().map(|i| sqlx::query("INSERT ... RETURNING").bind(i)));
let send_all = s.send_all(&mut item_stream);
let result = r.map(|cursor| cursor.next()).collect();

send_all.join(result).await

Or for infinite pipeline

let (s, r) = Batch::new(&mut conn);

// forward an infinite stream of input to DB
let stream_in = socket.map(|data| sqlx::query("...").bind(data)).forward(s);
// forward an infinite stream of output to socket
let stream_out = r.map(|cursor| rows_to_data(cursor)).forward(socket);

stream_in.join(stream_out).await

@Cocalus
Copy link

Cocalus commented Jun 26, 2020

I tend to use futures::stream::StreamExt::for_each_concurrent a lot when I need to batch any async work. Would this Batch type work with that API. tokio-postgres's implicit approach should.

@mehcode
Copy link
Member

mehcode commented Jun 28, 2020

Instead of a new object Batch, could we make it work with the existing Transaction?

We can make pipeline creation generic similar to how transaction creation is, so you can start a new pipeline with tx.pipeline() or conn.pipeline().


In your example, if q2.await? is called before q1.await?, you have to either allocate to store the result of q1 or skip/ignore the response of q1 in the response stream and return error on q1.await?

In my idea of how this would be implemented, there is no difference in behavior depending on the order you await the queries. The batch ran them all at once and they are all thin wrappers over oneshot channels.


Also you need to be in charge of mapping between request / response which is also quite cumbersome.
Batch should be a Sink for sending side and a Stream<Item = Cursor> for receiving size. With that we will be able to:

It may not seem like it but we actually need something similar to Sink<impl Execute> and Stream<?>. Its' not possible to express the stream side uniformly as how we execute the query needs to be chosen up front.

With the same API I proposed, this is possible:

// I'm now leaning towards `Pipeline` as a type name over `Batch`
let p = Pipeline::new(&mut conn);
let r = Vec::new();

for _ in 0..10 {
  r.push(query("INSERT INTO ...").bind(10).bind(20).fetch_one(&p));
}

p.join().await?;

for cursor in r {
  let row = cursor.await?; // result from fetch_one
  
  // do something with that row
}

Please note, that just like the spawn API, the returned futures from using Pipeline as an executor do not need to be awaited.

let p = Pipeline::new(&mut conn);

for _ in 0..10 {
  r.push(query("INSERT INTO ...").bind(10).bind(20).fetch_one(&p));
}

p.join().await?;

The examples I've shown so far are "prepare a ton of queries, then execute them all in one pipeline". There is another use case which is "setup a pipeline and let me shove queries at it and keep them executing".

// let p = conn.pipeline();
let p = Pipeline::new(&mut conn);

// spawns us in the background now so we can execute queries while more are added
p.spawn();

for _ in 0..10 {
  p.execute("INSERT INTO ..");
}

p.join().await?;

@Cocalus We could do something like:

let p = conn.pipeline(None); // no limit
let p = conn.pipeline(5); // max 5 queries at once

@farnoy
Copy link
Author

farnoy commented Jul 22, 2020

@mehcode What would be the type safety story for SELECT queries? Your example seems to deal with a Vec of homogeneous queries and runs them in a pipelined fashion. What about heterogeneous read only queries?

Could something like this work?

let q1 = sqlx::query("SELECT 1").fetch_one(&b);
let q2 = sqlx::query("SELECT 2").fetch_all(&b);

let (res1, res2) = conn.pipeline((q1, q2)).await?;

I think that to have type safety, this pipeline needs to know the shape (how many queries) and the type of each.

Either way, this feels less ergonomic than tokio-postgres. I don't share your view that using let (res1, res2) = join!(q1, q2); is too implicit.

@lacasaprivata2
Copy link

+1

@BratSinot
Copy link

Either way, this feels less ergonomic than tokio-postgres. I don't share your view that using let (res1, res2) = join!(q1, q2); is too implicit.

Imho, when I used tokio-postgress it was non-obvious.

@DXist
Copy link
Contributor

DXist commented Sep 1, 2022

I've made an implementation attempt and was invited to this discussion.

The implemented pipeline runs queries in a single transaction. The main motivation was not to run explicit transactions against CockroachDB and rely on its mechanism of automatic transaction retries. The mechanism handles serialization failures of concurrent implicit transactions.

My current needs to run several related INSERTs transactionally are covered just by pipeline.execute() but I also provided fetch_pipeline method.

The raw streaming API is definitely not easy and not user friendly but doesn't set any limits on how query results are processed.

@DXist
Copy link
Contributor

DXist commented Sep 2, 2022

Guys, I extended this discussion to explicit transaction pipelines and providing the API that doesn't return stale data - #2082.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request low priority This issue exists to acknowledge a proposal but isn't being worked on
Development

No branches or pull requests

7 participants