Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COPY FROM support #500

Closed
wants to merge 3 commits into from
Closed

Add COPY FROM support #500

wants to merge 3 commits into from

Conversation

ArjanSchouten
Copy link
Contributor

Add copy in support for r2dbc postgresql driver. Postgresql supports COPY statements to insert data fast: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY

The implementation is inspired by CopyManager and QueryExecutor from https://github.com/pgjdbc/pgjdbc

Issue description

This PR contains the code for issue #183.

New Public APIs

    /**
     * Copy bulk data from client into a PostgreSQL table very fast.
     *
     * @param sql the COPY sql statement
     * @param stdin the ByteBuffer publisher
     * @return a {@link Mono} with the amount of rows inserted
     */
    Mono<Long> copyIn(String sql, Publisher<ByteBuffer> stdin);

Additional context

Note: we added a benchmark test. It turned out that reading the csv files causes much of the differences in our testing.

* feat: add copy in support for r2dbc postgresql driver.
@mp911de mp911de self-assigned this Apr 4, 2022
@mp911de mp911de added the type: enhancement A general enhancement label Apr 4, 2022
@mp911de mp911de linked an issue Apr 4, 2022 that may be closed by this pull request
Copy link
Collaborator

@mp911de mp911de left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for your contribution. The API looks pretty decent. I added a few comments. Care to have a look?


Flux<BackendMessage> backendMessages = copyDataMessages
.doOnNext(client::send)
.doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for filtering IllegalArgumentException? Generally, when a failure happens, then the upstream subscription is canceled and the copy state remains partially active without closing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, it was meant for the IllegalArgumentException thrown in startCopy but I moved the doOnError onto the backendMessages only. Thanks

.doOnNext(client::send)
.doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage()))
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
.thenMany(client.exchange(Mono.just(CopyDone.INSTANCE)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the client exchange, it makes sense to start with a many-sink (Sinks.Many.asFlux()) where we keep a single conversation/publisher with the server open. Otherwise, we call exchange multiple times and that generates a bit of overhead and may interfere with other connection activity causing protocol resynchronization.

I can apply this change during the merge as it bears some complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an attempt but it is a little bit complex since we have to wait for the CopyInResponse before sending the data frames.

@harmvanderwal
Copy link

Are there any updates on when this'll be merged and released?

@mp911de
Copy link
Collaborator

mp911de commented May 19, 2022

Not yet, but I'd like to include this API for the 1.0 release. @harmvanderwal did you get a chance to build the PR locally so you can verify the API is suitable for your use case?

@mp911de mp911de added this to the 1.0.0.RELEASE milestone May 19, 2022
@harmvanderwal
Copy link

I did and it works perfect for my specific use case.

@mp911de mp911de changed the title Add copy in support for r2dbc postgresql driver Add COPY FROM support May 27, 2022
mp911de pushed a commit that referenced this pull request May 27, 2022
The driver now provides an API to support COPY FROM STDIN.

[#500][resolves #183]
@mp911de mp911de closed this in 6d2c1a8 May 27, 2022
@mp911de
Copy link
Collaborator

mp911de commented May 27, 2022

Thank you for your contribution. That's merged and polished now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add COPY FROM support
3 participants