Skip to content

Commit

Permalink
[#13669] More Connection::close calls fixed
Browse files Browse the repository at this point in the history
- In batches
- When there are rendering problems in jOOQ
- This includes: [#13343] R2DBC implementation may hang when
there's an exception in the rendering logic
  • Loading branch information
lukaseder committed Jun 29, 2022
1 parent 1ef9b89 commit bbe029c
Showing 1 changed file with 36 additions and 10 deletions.
46 changes: 36 additions & 10 deletions jOOQ/src/main/java/org/jooq/impl/R2DBC.java
Expand Up @@ -75,6 +75,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.jooq.BindingGetResultSetContext;
import org.jooq.Configuration;
Expand Down Expand Up @@ -114,6 +115,7 @@
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryOptions.Builder;
import io.r2dbc.spi.Result.RowSegment;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
Expand Down Expand Up @@ -247,18 +249,23 @@ public final void onComplete() {

private final void complete(boolean cancelled) {
resultSubscriber.downstream.forwarders.remove(forwarderIndex);
resultSubscriber.complete(cancelled);

// [#13343] [#13669] Prevent premature completion
if (cancelled || resultSubscriber.downstream.forwarders.isEmpty() && resultSubscriber.completionRequested.get())
resultSubscriber.complete(cancelled);
}
}

static abstract class AbstractResultSubscriber<T> implements Subscriber<Result> {

final AbstractNonBlockingSubscription<? super T> downstream;
final AtomicBoolean completed;
final AtomicBoolean completionRequested;

AbstractResultSubscriber(AbstractNonBlockingSubscription<? super T> downstream) {
this.downstream = downstream;
this.completed = new AtomicBoolean();
this.completionRequested = new AtomicBoolean();
}

@Override
Expand All @@ -269,22 +276,26 @@ public final void onSubscribe(Subscription s) {
@Override
public final void onError(Throwable t) {
downstream.subscriber.onError(translate(downstream.sql(), t));
complete(true);
}

@Override
public final void onComplete() {
completed.set(true);
complete(false);
}

final void complete(boolean cancelled) {
if (completed.get() && downstream.forwarders.isEmpty())
completionRequested.set(true);

// [#13343] [#13669] Delay completion of the downstream in case this
// completion happens before each forwarder's
// completion.
if (downstream.forwarders.isEmpty() && !completed.getAndSet(true))
downstream.complete(cancelled);
}
}

static final class RowCountSubscriber extends AbstractResultSubscriber<Integer> {

RowCountSubscriber(AbstractNonBlockingSubscription<? super Integer> downstream) {
super(downstream);
}
Expand Down Expand Up @@ -339,8 +350,6 @@ public final void onNext(Result r) {
return record;
});
}

// TODO: More specific error handling
catch (Throwable t) {
onError(t);
return null;
Expand Down Expand Up @@ -432,8 +441,9 @@ final void onNext0(Connection c) {
stmt.execute().subscribe(resultSubscriber.apply(query, downstream));
}

// TODO: More specific error handling
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
Expand Down Expand Up @@ -463,8 +473,9 @@ final void onNext0(Connection c) {
b.execute().subscribe(new RowCountSubscriber(downstream));
}

// TODO: More specific error handling
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
Expand Down Expand Up @@ -514,7 +525,10 @@ final void onNext0(Connection c) {

stmt.execute().subscribe(new RowCountSubscriber(downstream));
}

// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
Expand Down Expand Up @@ -642,7 +656,7 @@ final QueryExecutionSubscriber<T, Q> delegate() {
@Override
final String sql() {
String result = queryExecutionSubscriber.sql;
return result != null ? result : "" + queryExecutionSubscriber.query;
return result != null ? result : sql0(() -> "" + queryExecutionSubscriber.query);
}
}

Expand All @@ -669,7 +683,7 @@ final ConnectionSubscriber<Integer> delegate() {

@Override
final String sql() {
return batch.toString();
return sql0(() -> batch.toString());
}
}

Expand Down Expand Up @@ -1510,4 +1524,16 @@ final void request0() {
static final boolean isR2dbc(java.sql.Statement statement) {
return statement instanceof R2DBCPreparedStatement;
}

/**
* [#13343] Prevent debug rendering errors from influencing control flow.
*/
static final String sql0(Supplier<String> supplier) {
try {
return supplier.get();
}
catch (Throwable t) {
return "Error while rendering SQL: " + t.getMessage();
}
}
}

0 comments on commit bbe029c

Please sign in to comment.