diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index e9da15b8b2..14f260ed2d 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -75,6 +75,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.jooq.BindingGetResultSetContext; import org.jooq.Configuration; @@ -114,6 +115,7 @@ import io.r2dbc.spi.ConnectionFactories; import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.ConnectionFactoryOptions.Builder; +import io.r2dbc.spi.Result.RowSegment; import io.r2dbc.spi.Option; import io.r2dbc.spi.Result; import io.r2dbc.spi.Row; @@ -247,7 +249,10 @@ public final void onComplete() { private final void complete(boolean cancelled) { resultSubscriber.downstream.forwarders.remove(forwarderIndex); - resultSubscriber.complete(cancelled); + + // [#13343] [#13669] Prevent premature completion + if (cancelled || resultSubscriber.downstream.forwarders.isEmpty() && resultSubscriber.completionRequested.get()) + resultSubscriber.complete(cancelled); } } @@ -255,10 +260,12 @@ static abstract class AbstractResultSubscriber implements Subscriber final AbstractNonBlockingSubscription downstream; final AtomicBoolean completed; + final AtomicBoolean completionRequested; AbstractResultSubscriber(AbstractNonBlockingSubscription downstream) { this.downstream = downstream; this.completed = new AtomicBoolean(); + this.completionRequested = new AtomicBoolean(); } @Override @@ -269,22 +276,26 @@ public final void onSubscribe(Subscription s) { @Override public final void onError(Throwable t) { downstream.subscriber.onError(translate(downstream.sql(), t)); + complete(true); } @Override public final void onComplete() { - completed.set(true); complete(false); } final void complete(boolean cancelled) { - if (completed.get() && downstream.forwarders.isEmpty()) + completionRequested.set(true); + + // [#13343] [#13669] Delay completion of the downstream in case this + // completion happens before each forwarder's + // completion. + if (downstream.forwarders.isEmpty() && !completed.getAndSet(true)) downstream.complete(cancelled); } } static final class RowCountSubscriber extends AbstractResultSubscriber { - RowCountSubscriber(AbstractNonBlockingSubscription downstream) { super(downstream); } @@ -339,8 +350,6 @@ public final void onNext(Result r) { return record; }); } - - // TODO: More specific error handling catch (Throwable t) { onError(t); return null; @@ -432,8 +441,9 @@ final void onNext0(Connection c) { stmt.execute().subscribe(resultSubscriber.apply(query, downstream)); } - // TODO: More specific error handling + // [#13343] Cancel the downstream in case of a rendering bug in jOOQ catch (Throwable t) { + downstream.cancel(); onError(t); } } @@ -463,8 +473,9 @@ final void onNext0(Connection c) { b.execute().subscribe(new RowCountSubscriber(downstream)); } - // TODO: More specific error handling + // [#13343] Cancel the downstream in case of a rendering bug in jOOQ catch (Throwable t) { + downstream.cancel(); onError(t); } } @@ -514,7 +525,10 @@ final void onNext0(Connection c) { stmt.execute().subscribe(new RowCountSubscriber(downstream)); } + + // [#13343] Cancel the downstream in case of a rendering bug in jOOQ catch (Throwable t) { + downstream.cancel(); onError(t); } } @@ -642,7 +656,7 @@ final QueryExecutionSubscriber delegate() { @Override final String sql() { String result = queryExecutionSubscriber.sql; - return result != null ? result : "" + queryExecutionSubscriber.query; + return result != null ? result : sql0(() -> "" + queryExecutionSubscriber.query); } } @@ -669,7 +683,7 @@ final ConnectionSubscriber delegate() { @Override final String sql() { - return batch.toString(); + return sql0(() -> batch.toString()); } } @@ -1510,4 +1524,16 @@ final void request0() { static final boolean isR2dbc(java.sql.Statement statement) { return statement instanceof R2DBCPreparedStatement; } + + /** + * [#13343] Prevent debug rendering errors from influencing control flow. + */ + static final String sql0(Supplier supplier) { + try { + return supplier.get(); + } + catch (Throwable t) { + return "Error while rendering SQL: " + t.getMessage(); + } + } }