Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Extract builder API to enable flexibility in providing copy data. Add safeguards to terminate copy data in cases of cancellation or errors. Reorder methods.

[resolves #500][#183]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed May 27, 2022
1 parent 1f4c436 commit 6d2c1a8
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 109 deletions.
13 changes: 6 additions & 7 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.Notification;
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
Expand Down Expand Up @@ -50,7 +50,6 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -215,6 +214,11 @@ public Mono<Void> commitTransaction() {
});
}

@Override
public CopyInBuilder copyIn(String sql) {
return new PostgresqlCopyIn.Builder(this.resources, sql);
}

@Override
public PostgresqlBatch createBatch() {
return new PostgresqlBatch(this.resources);
Expand Down Expand Up @@ -408,11 +412,6 @@ public void onComplete() {
});
}

@Override
public Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
return new PostgresqlCopyIn(resources).copy(sql, stdin);
}

private static Function<TransactionStatus, String> getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) {
return transactionStatus -> {
if (transactionStatus == OPEN) {
Expand Down
128 changes: 93 additions & 35 deletions src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.api.CopyInBuilder;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.CopyInResponse;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyData;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.CopyFail;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

import java.util.concurrent.atomic.AtomicBoolean;

import static io.r2dbc.postgresql.PostgresqlResult.toResult;

Expand All @@ -43,49 +48,70 @@ final class PostgresqlCopyIn {

private final ConnectionResources context;

PostgresqlCopyIn(ConnectionResources context) {
this.context = Assert.requireNonNull(context, "context must not be null");
PostgresqlCopyIn(ConnectionResources resources) {
this.context = Assert.requireNonNull(resources, "resources must not be null");
}

Mono<Long> copy(String sql, Publisher<ByteBuf> stdin) {
Mono<Long> copy(String sql, Publisher<? extends Publisher<ByteBuf>> stdin) {

ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);

return Flux.from(stdin)
.map(CopyData::new)
.as(messages -> copyIn(sql, messages));
}
.<FrontendMessage>concatMap(data -> {

private Mono<Long> copyIn(String sql, Flux<CopyData> copyDataMessages) {
Client client = context.getClient();
CompositeByteBuf composite = this.context.getClient().getByteBufAllocator().compositeBuffer();

Flux<BackendMessage> backendMessages = copyDataMessages
.doOnNext(client::send)
.doOnError((e) -> sendCopyFail(e.getMessage()))
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
.thenMany(client.exchange(Mono.just(CopyDone.INSTANCE)));
return Flux.from(data)
.reduce(composite, (l, r) -> l.addComponent(true, r))
.map(CopyData::new)
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);

return startCopy(sql)
.concatWith(backendMessages)
.doOnCancel(() -> sendCopyFail("Cancelled"))
.as(Operators::discardOnCancel)
.as(messages -> toResult(context, messages, ExceptionFactory.INSTANCE).getRowsUpdated());
}).concatWithValues(CopyDone.INSTANCE).startWith(new Query(sql))
.as(messages -> copyIn(exceptionFactory, messages));
}

private Flux<BackendMessage> startCopy(String sql) {
return context.getClient().exchange(
// ReadyForQuery is returned when an invalid query is provided
backendMessage -> backendMessage instanceof CopyInResponse || backendMessage instanceof ReadyForQuery,
Mono.just(new Query(sql))
)
.doOnNext(message -> {
if (message instanceof CommandComplete) {
throw new IllegalArgumentException("Copy from stdin query expected, sql='" + sql + "', message=" + message);
private Mono<Long> copyIn(ExceptionFactory exceptionFactory, Flux<FrontendMessage> copyDataMessages) {

Client client = this.context.getClient();
AtomicBoolean stop = new AtomicBoolean();
Sinks.Many<FrontendMessage> sink = Sinks.many().unicast().onBackpressureBuffer();
Flux<FrontendMessage> requestMessages = sink.asFlux().mergeWith(copyDataMessages
.doOnComplete(sink::tryEmitComplete)
.filter(it -> !stop.get())
.onErrorResume(e -> {
copyFail(sink, stop, "Copy operation failed: " + e.getMessage());
return Mono.empty();
}));

return client.exchange(backendMessage -> backendMessage instanceof ReadyForQuery, requestMessages)
.doOnNext(it -> {
if (it instanceof ErrorResponse) {
stop.set(true);
sink.tryEmitComplete();
}
});
})
.doOnComplete(() -> {
stop.set(true);
sink.tryEmitComplete();
})
.doOnError((e) -> {
copyFail(sink, stop, "Copy operation failed: " + e.getMessage());
})
.doOnCancel(() -> {
copyFail(sink, stop, "Copy operation failed: Cancelled");
})
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
.as(Operators::discardOnCancel)
.doOnCancel(() -> {
copyFail(sink, stop, "Copy operation failed: Cancelled");
})
.as(messages -> toResult(this.context, messages, exceptionFactory).getRowsUpdated());
}

private void sendCopyFail(String message) {
context.getClient().exchange(Mono.just(new CopyFail("Copy operation failed: " + message)))
.as(Operators::discardOnCancel)
.subscribe();
private void copyFail(Sinks.Many<FrontendMessage> sink, AtomicBoolean stop, String e) {
sink.tryEmitNext(new CopyFail(e));
sink.tryEmitComplete();
stop.set(true);
}

@Override
Expand All @@ -95,4 +121,36 @@ public String toString() {
'}';
}

static final class Builder implements CopyInBuilder {

private final ConnectionResources resources;

private final String sql;

@Nullable
private Publisher<? extends Publisher<ByteBuf>> stdin;

Builder(ConnectionResources resources, String sql) {
this.resources = resources;
this.sql = sql;
}

@Override
public CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> stdin) {
this.stdin = Assert.requireNonNull(stdin, "stdin must not be null");
return this;
}

@Override
public Mono<Long> build() {

if (this.stdin == null) {
throw new IllegalArgumentException("No stdin configured for COPY IN");
}

return new PostgresqlCopyIn(this.resources).copy(this.sql, this.stdin);
}

}

}
1 change: 1 addition & 0 deletions src/main/java/io/r2dbc/postgresql/PostgresqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Mono<Long> getRowsUpdated() {
sink.next(rowCount);
}
}

}).collectList().handle((list, sink) -> {

if (list.isEmpty()) {
Expand Down
112 changes: 112 additions & 0 deletions src/main/java/io/r2dbc/postgresql/api/CopyInBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql.api;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.r2dbc.postgresql.message.frontend.CopyData;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;

/**
* Interface specifying a builder contract to configure a {@code COPY FROM STDIN} operation.
*
* @since 1.0
*/
public interface CopyInBuilder {

/**
* Postgres parse limit for large messages {@code 2^30 - 1} bytes.
*/
int MAX_FRAME_SIZE = 0x3fffffff - 1;

/**
* Configure a {@link Publisher} emitting publishers of buffers that to write data to a {@link CopyData} frame per emitted publisher.
* This method allows controlling flush behavior and chunking of buffers. The provided stream must ensure to not exceed size limits ({@link #MAX_FRAME_SIZE}) of the {@link CopyData} frame.
* <p>If a provided publisher terminates with an error signal then the copy operation terminates with a failure and gets cancelled on the server.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @return {@code this} {@link CopyInBuilder builder}.
*/
CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> stdin);

/**
* Configure a {@link Publisher} emitting buffers that are written to a single {@link CopyData} frame.
* If the total amount of data to be written exceeds the copy frame size limitation ({@link #MAX_FRAME_SIZE}), then use {@link #fromMany(Publisher)} to split up the input data to many
* {@link CopyData} frames.
* <p>If the provided publisher terminates with an error signal then the copy operation terminates with a failure and gets cancelled on the server.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @return {@code this} {@link CopyInBuilder builder}.
*/
default CopyInBuilder from(Publisher<ByteBuf> stdin) {
return fromMany(Mono.just(stdin));
}

/**
* Configure an input buffer that is written to a single {@link CopyData} frame.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @return {@code this} {@link CopyInBuilder builder}.
*/
default CopyInBuilder from(ByteBuf stdin) {
return from(Mono.just(stdin));
}

/**
* Configure an input buffer that is written to a single {@link CopyData} frame.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @return {@code this} {@link CopyInBuilder builder}.
*/
default CopyInBuilder from(ByteBuffer stdin) {
return from(Unpooled.wrappedBuffer(stdin));
}

/**
* Configure an input buffer that is written to a single {@link CopyData} frame.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @return {@code this} {@link CopyInBuilder builder}.
*/
default CopyInBuilder from(byte[] stdin) {
return from(Unpooled.wrappedBuffer(stdin));
}

/**
* Configure an input buffer along with {@code offset} and {@code length} whose specified chunk is written to a single {@link CopyData} frame.
*
* @param stdin the bytes to write to a {@link CopyData} frame.
* @param offset the start offset in the data.
* @param length the number of bytes to write.
* @return {@code this} {@link CopyInBuilder builder}.
*/
default CopyInBuilder from(byte[] stdin, int offset, int length) {
return from(Unpooled.wrappedBuffer(stdin, offset, length));
}

/**
* Build the final publisher that initiates the {@code COPY} operation. The copy data messages sent to the server are triggered by the provided input buffer.
* Cancelling the copy operation sends a failure frame to the server to terminate the copy operation with an error.
*
* @return the publisher that initiates the {@code COPY} operation upon subscription.
*/
Mono<Long> build();

}
31 changes: 22 additions & 9 deletions src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ public interface PostgresqlConnection extends Connection {
@Override
Mono<Void> commitTransaction();

/**
* Obtain a {@link CopyInBuilder} to configure a {@code COPY FROM STDIN} operation for very fast copying into a database table.
*
* @param sql the COPY … FROM STDIN sql statement
* @return the builder to configure the copy operation.
* @since 1.0
*/
CopyInBuilder copyIn(String sql);

/**
* Use {@code COPY FROM STDIN} for very fast copying into a database table.
*
* @param sql the COPY … FROM STDIN sql statement
* @param stdin the ByteBuf publisher
* @return a {@link Mono} with the amount of rows inserted
* @see CopyInBuilder
* @since 1.0
*/
default Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
return copyIn(sql).from(stdin).build();
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -172,13 +194,4 @@ public interface PostgresqlConnection extends Connection {
@Override
Mono<Boolean> validate(ValidationDepth depth);

/**
* Use COPY FROM STDIN for very fast copying into a database table.
*
* @param sql the COPY … FROM STDIN sql statement
* @param stdin the ByteBuf publisher
* @return a {@link Mono} with the amount of rows inserted
*/
Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin);

}

0 comments on commit 6d2c1a8

Please sign in to comment.