Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Joining UPSERTs with query_one and owned parameters #927

Closed
elpiel opened this issue Aug 2, 2022 · 6 comments
Closed

Joining UPSERTs with query_one and owned parameters #927

elpiel opened this issue Aug 2, 2022 · 6 comments

Comments

@elpiel
Copy link
Sponsor Contributor

elpiel commented Aug 2, 2022

Hello,
I'm doing multiple UPSERT queries with RETURNING statement that I want to join in single future. This can be achieve only with query_raw since query_one takes a &[...] and the parameters don't live long enough, while we can use:
vec![Box::new(param1) as Box<dyn ToSql + Sync + Send>,] with query_raw.

However, this has some drawbacks, mainly I have to manually:

  • handle RowStream the same way query_one does
  • create own error and duplicate the Error::RowCount error and wrap the tokio_postgres::Error

A solution to fix this would need to either:

  • Add additional methods (or a trait maybe) for IntoIterator parameters of query_one and query_opt
  • Change the API and introduce a breaking change - not really sure how to introduce this changed API though

While it's good that it takes reference to the parameters, this has caused me multiple problems in the past too.
It would be very beneficial, in my opinion, to have such an improvement for owned types as I still think it's a rough edge that needs some polishing.

What do you think about that?

@sfackler
Copy link
Owner

sfackler commented Aug 2, 2022

You can use an async { ... } block to close over the query_one and the parameters slice.

let future1 = async {
    let params = vec![...];
    client.query_one(query_string, &params).await
};

@elpiel
Copy link
Sponsor Contributor Author

elpiel commented Aug 2, 2022

I'm generating the futures from an iterator:

let spenders_futures = delta_balances
        .spenders
        .into_iter()
        .map(|(spender, amount)| async {
            let params = vec![
                Box::new(channel_id) as Box<dyn ToSql + Sync>,
                Box::new(Side::Spender),
                Box::new(spender),
                Box::new(amount),
                Box::new(updated),
                Box::new(now),
            ];

            client
                .query_one(
                    &statement,
                    params.as_slice(),
                )
        });

This still causes issues:

mismatched types
expected reference `&[&dyn ToSql + Sync]`
   found reference `&[Box<dyn ToSql + Sync>]

@sfackler
Copy link
Owner

sfackler commented Aug 2, 2022

You shouldn't need the vec:

async {
    client.query_one(
        &statement,
        &[
            &channel_id,
            &Side::Spender,
            // ...
        ]
    ).await
}

@elpiel
Copy link
Sponsor Contributor Author

elpiel commented Aug 2, 2022

It's still causing problems and it suggests to use async move (because of the borrowed &statement and other borrowed values) but when I use it, then Client is also borrowed and it can't be cloned.

PS: I forced the client to be a &client before using it in the iterators (I'm actually using deadpool Pool but it works with it too)

I still don't think this is very pretty but it does work.
For reference:

pub async fn spend_amount(
    pool: DbPool,
    channel_id: ChannelId,
    delta_balances: Balances<CheckedState>,
) -> Result<(Vec<Accounting>, Vec<Accounting>), PoolError> {
    let client = &pool.get().await?;

    let statement = client.prepare_cached(UPDATE_ACCOUNTING_STATEMENT).await?;

    // Earners
    let earners_futures = delta_balances.earners.into_iter().map(|(earner, amount)| {
        let statement = statement.clone();

        async move {
            client
                .query_one(&statement, &[&channel_id, &Side::Earner, &earner, &amount])
                .await
                .map(|row| Accounting::from(&row))
        }
    });

    // Spenders
    let spenders_futures = delta_balances
        .spenders
        .into_iter()
        .map(|(spender, amount)| {
            let statement = statement.clone();

            async move {
                client
                    .query_one(
                        &statement,
                        &[&channel_id, &Side::Spender, &spender, &amount],
                    )
                    .await
                    .map(|row| Accounting::from(&row))
            }
        });

    let earners = join_all(earners_futures);
    let spenders = join_all(spenders_futures);

    // collect all the Accounting updates into Vectors
    let (earners, spenders) = join(earners, spenders).await;

    // Return an error if any of the Accounting updates failed
    Ok((
        earners.into_iter().collect::<Result<_, _>>()?,
        spenders.into_iter().collect::<Result<_, _>>()?,
    ))
}

@sfackler
Copy link
Owner

sfackler commented Aug 2, 2022

Ah yeah you sometimes need to use async move and manually take references as necessary. That looks like what I'd do though.

@elpiel
Copy link
Sponsor Contributor Author

elpiel commented Aug 2, 2022

Ok, thanks for the help, as always 😋

@elpiel elpiel closed this as completed Aug 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants