Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for COPY ... FROM STDIN and COPY ... TO STDOUT statements in PostgreSQL #36

Closed
alexwl opened this issue Jan 7, 2020 · 15 comments · Fixed by #1345
Closed
Labels
db:postgres Related to PostgreSQL enhancement New feature or request help wanted Extra attention is needed

Comments

@alexwl
Copy link

alexwl commented Jan 7, 2020

Hi!

COPY is the most efficient way to import data to PostgreSQL database (it is less flexible than INSERT, but has significantly less overhead).
https://www.postgresql.org/docs/current/sql-copy.html
https://www.postgresql.org/docs/current/populate.html#POPULATE-COPY-FROM

It would be great to have a low-level interface to stream data to/from PostgreSQL using COPY ... FROM STDIN and COPY ... TO STDOUT statements. For example, similar to https://docs.rs/tokio-postgres/0.5.1/tokio_postgres/struct.Client.html#method.copy_in and https://docs.rs/tokio-postgres/0.5.1/tokio_postgres/struct.Client.html#method.copy_out.

@abonander
Copy link
Collaborator

This could be implemented as an inherent method on PgConnection.

@abonander abonander added the enhancement New feature or request label Jan 7, 2020
@mehcode mehcode added the db:postgres Related to PostgreSQL label Jan 7, 2020
@mehcode
Copy link
Member

mehcode commented Mar 25, 2020

Marking this as a good contribution for someone looking to contribute to the internals.

The following is my initial thoughts on what a good API would look like.

impl PgCopyOut {
    // Returns a cursor over the rows of the data stream
    fn into_cursor(self) -> PgCursor;

    // Returns a stream of bytes
    fn into_stream(self) -> impl Stream<Item = u8>;
    fn into_async_read(self) -> impl AsyncRead;
}

impl PgCopyIn {
    // Expect a stream of bytes
    fn into_sink(self) -> impl Sink<Item = u8>;
    fn into_async_write(self) -> impl AsyncWrite;
}

pub trait PgExecutorExt: Executor {
    // Expected COPY TO ... STDOUT
    fn copy_out<'q, 'e, E>(
        &'e mut self,
        statement: &'q str,
    ) -> PgCopyOut<'q, 'e>;

    // Expected COPY FROM ... STDIN
    fn copy_in<'q, 'e>
        &'e mut self,
        statement: &'q str,
    ) -> PgCopyIn<'q, 'e>;
}

// Copy from one database to another

let rx = con1.copy_out("COPY articles TO STDOUT");
let tx = con2.copy_in("COPY articles FROM STDIN");

rx.into_stream().forward(tx.into_sink()).await?;

@mehcode mehcode added the help wanted Extra attention is needed label Mar 25, 2020
@abonander
Copy link
Collaborator

abonander commented Mar 27, 2020

@mehcode one hiccup with into_cursor() is that we're never given the type OIDs of columns in the data stream, so we have no way to give informed errors in Row::try_get().

@mehcode
Copy link
Member

mehcode commented Mar 27, 2020

Good point. We just get a stream of DataRow messages essentially.

@abonander
Copy link
Collaborator

We can have a similar API but I'm thinking specialized types that clearly document that they're semi-unchecked. PgCopyCursor and PgCopyRow or something.

Maybe on top of that we can have a proc-macro API that looks at the table schema and makes it "safe" or something.

@abonander
Copy link
Collaborator

abonander commented Mar 30, 2020

Additionally, the main API probably shouldn't take raw statements but instead either a table name and list of columns or the data source query.

fn copy_out(&mut self, table; &str, columns: impl IntoIterator<Item = &str>) -> PgCopyOut

/// Accepts just the SELECT/INSERT/UPDATE/DELETE RETURNING portion
fn copy_out_with(&mut self, query: &str) -> PgCopyOut

fn copy_in(&mut self, table: &str, columns: impl IntoIterator<Item = &str>) -> PgCopyIn

I'd be okay with _raw() variants that take the full statement (e.g. to force text format).

into_stream returning a Stream<Item = u8> seems kinda inefficient/pointless. Maybe Box<[u8]> instead, which also lets it work with futures::io even on Tokio. into_sink(self) has dubious utility I think, unless it's generic over T: AsRef<[u8]>.

I'm assuming the types that into_async_read(self) and into_async_write() return depend on the current runtime.

@mehcode
Copy link
Member

mehcode commented Mar 30, 2020

SQLx is not a query builder (yet). I'm really hesitant to provide an API on Executor that is anything more than "raw SQL here".

It's simple to die immediately if they don't send the right command as postgres responds in a specific way and tells us the details about their copy command.

@mehcode
Copy link
Member

mehcode commented Mar 30, 2020

into_stream and into_sink are kind of pointless, yeah. AsyncRead / AsyncWrite are more than fine probably.

@abonander
Copy link
Collaborator

I mainly suggest not just taking the full statement because by default COPY uses the text format which isn't going to be very fast since we'll have to decode each row as UTF-8 first before actually decoding it. The user would have to remember every time to specify binary format if they want full performance.

@mehcode
Copy link
Member

mehcode commented Mar 30, 2020

I don't disagree. My point is that something like query construction is better suited to a DSL if/when we get it:

// Not sure a good name for the _ method
let cin: PgCopyIn = copy_in("table").columns(&["a", "b"])._(&mut conn).await?;

Macro version perhaps:

let cin: PgCopyIn = copy_in!(table(a, b))._(&mut conn).await?;

@abonander
Copy link
Collaborator

Maybe we should do a builder here then. There's not really that much to configure if you're using binary format, it's just whether or not you supply a column list or subquery.

@mehcode
Copy link
Member

mehcode commented Mar 30, 2020

I think the Executor trait should still be the low-level operational trait that takes the raw query. I'm not against adding a DSL on top of it to make things simpler.

Just that we should do this task in phases.

  1. Raw COPY ... STDIN and COPY ... STDOUT statements where the "deliverable" is two distinct use cases.

    • Emit a CSV from a table or query
    • Push a table or query into another table ( on another connection )
  2. Cursor support to iterate rows coming from a COPY ... STDOUT statement.

  3. Some way to push rows to a COPY ... STDIN statement. I believe we talked about a IntoArguments trait.

  4. A DSL around the above to make things simpler and less error-prone to setup.

@abonander
Copy link
Collaborator

Yeah, I can get behind that.

@abonander
Copy link
Collaborator

abonander commented Mar 30, 2020

For the first step, should we still do copy_in_raw() and copy_out_raw() so we have some space to play with naming for the higher-level API? Maybe it could directly return impl AsyncRead or impl AsyncWrite.

@shiftrtech
Copy link

Any news about this?, is there something i can do meanwhile to make a massive database ingest, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
db:postgres Related to PostgreSQL enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
4 participants