diff --git a/README.md b/README.md index e838605..1a76513 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ Oracle R2DBC implements the R2DBC Service Provider Interface (SPI) as specified ### Learn More About R2DBC: [R2DBC Project Home Page](https://r2dbc.io) -[R2DBC Javadocs v0.9.0.M1](https://r2dbc.io/spec/0.9.0.M1/api/) +[R2DBC Javadocs v0.9.0.M2](https://r2dbc.io/spec/0.9.0.M2/api/) -[R2DBC Specification v0.9.0.M1](https://r2dbc.io/spec/0.9.0.M1/spec/html/) +[R2DBC Specification v0.9.0.M2](https://r2dbc.io/spec/0.9.0.M2/spec/html/) ### Learn More About Reactive Streams: [Reactive Streams Project Home Page](http://www.reactive-streams.org) @@ -19,14 +19,26 @@ Oracle R2DBC implements the R2DBC Service Provider Interface (SPI) as specified [Reactive Streams Specification v1.0.3](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md) # About This Version -Oracle R2DBC 0.2.0 updates the implemented SPI version to 0.9.0.M1. With the - 0.9.0.M1 SPI update, Oracle R2DBC 0.2.0 introduces support for procedural - calls (PL/SQL), the ```Statement.bind(...)``` methods are enhanced to accept - ```io.r2dbc.spi.Parameter``` objects, and the - ```Connection.beginTransaction(TransactionDefintion)``` method is - implemented to support named and read-only/read-write transactions. - -# Performance Goals +The 0.3.0 release Oracle R2DBC implements version 0.9.0.M2 of the R2DBC SPI. +The 0.9.0.M2 SPI update introduces support for consuming a `Result` as a + stream of `Segment` objects, configuring statement execution timeouts, and + managing pooled `Connection`s with the `LifeCycle` interface. + +The 0.3.0 release updates the Oracle JDBC dependency to 21.3. The 21.3 release + introduces several improvements for the Reactive Extensions: + - Substantial reduction in object allocation costs. + - Row mapping functions will no longer contend with other threads for + access to the JDBC connection. + - Blocking database calls are no longer required for PreparedStatements + returning values generated by DML. + +### Spring Integration +Use the 0.1.0 version of Oracle R2DBC if you are programming with Spring. +The later versions of Oracle R2DBC implement the 0.9.x versions of the R2DBC + SPI. Currently, Spring only supports drivers that implement the 0.8.x versions + of the SPI. + +### Performance Goals The primary goal of these early releases of Oracle R2DBC is to support the R2DBC SPI on Oracle Database. The only performance goal is to enable concurrent database calls to be executed by a single thread. @@ -48,18 +60,20 @@ Artifacts can also be found on Maven Central. com.oracle.database.r2dbc oracle-r2dbc - ${version} + 0.3.0 ``` Oracle R2DBC is compatible with JDK 11 (or newer), and has the following runtime dependencies: -- R2DBC SPI 0.9.0.M1 +- R2DBC SPI 0.9.0.M2 - Reactive Streams 1.0.3 -- Project Reactor 3.0.0 -- Oracle JDBC 21.1.0.0 for JDK 11 (ojdbc11.jar) - - Oracle R2DBC relies on the Oracle JDBC Driver's [Reactive Extensions](https://docs.oracle.com/en/database/oracle/oracle-database/21/jjdbc/jdbc-reactive-extensions.html#GUID-1C40C43B-3823-4848-8B5A-D2F97A82F79B) APIs. These APIs were introduced in the 21.1 release of Oracle JDBC, and are only available with the JDK 11 build (ojdbc11). +- Project Reactor 3.3.0.RELEASE +- Oracle JDBC 21.3.0.0 for JDK 11 (ojdbc11.jar) + - Oracle R2DBC relies on the Oracle JDBC Driver's [Reactive Extensions + ](https://docs.oracle.com/en/database/oracle/oracle-database/21/jjdbc/jdbc-reactive-extensions.html#GUID-1C40C43B-3823-4848-8B5A-D2F97A82F79B) APIs. -The Oracle R2DBC Driver has been verified with Oracle Database versions 19c and 21c. +The Oracle R2DBC Driver has been verified with Oracle Database versions 18, 19, + and 21. # Code Examples @@ -74,7 +88,7 @@ Mono.from(connectionFactory.create()) "SELECT 'Hello, Oracle' FROM sys.dual") .execute()) .flatMap(result -> - result.map((row, metadata) -> row.get(0, String.class))) + result.map(row -> row.get(0, String.class))) .doOnNext(System.out::println) .thenMany(connection.close())) .subscribe(); @@ -90,7 +104,7 @@ Mono.from(connectionFactory.create()) .bind("locale_name", "France") .execute()) .flatMap(result -> - result.map((row, metadata) -> + result.map(row -> String.format("%s, Oracle", row.get("greeting", String.class)))) .doOnNext(System.out::println) .thenMany(connection.close())) @@ -128,8 +142,8 @@ This document specifies the behavior of the R2DBC SPI implemented for the Oracle Database. This SPI implementation is referred to as the "Oracle R2DBC Driver" or "Oracle R2DBC" throughout the remainder of this document. -The Oracle R2DBC Driver implements behavior specified by the R2DBC 0.9.0.M1 -[Specification](https://r2dbc.io/spec/0.9.0.M1/spec/html/) +The Oracle R2DBC Driver implements behavior specified by the R2DBC 0.9.0.M2 +[Specification](https://r2dbc.io/spec/0.9.0.M2/spec/html/) and [Javadoc](https://r2dbc.io/spec/0.8.3.RELEASE/api/) Publisher objects created by Oracle R2DBC implement behavior specified by @@ -146,8 +160,8 @@ implements a ConnectionFactoryProvider located by an R2DBC URL identifing "oracle" as a driver, or by a DRIVER ConnectionFactoryOption with the value of "oracle". - The following well-known ConnectionFactory Options are supported: -DRIVER, USER, PASSWORD, HOST, PORT, DATABASE, SSL, and -CONNECT_TIMEOUT. +DRIVER, USER, PASSWORD, HOST, PORT, DATABASE, SSL, +CONNECT_TIMEOUT, STATEMENT_TIMEOUT. - The DATABASE ConnectionFactoryOption is interpreted as the [service name](https://docs.oracle.com/en/database/oracle/oracle-database/21/netag/identifying-and-accessing-database.html#GUID-153861C1-16AD-41EC-A179-074146B722E6) of an Oracle Database instance. System Identifiers (SID) are not recognized. @@ -175,7 +189,7 @@ Options. For Options having any of the following names, a CharSequence value may - [oracle.jdbc.implicitStatementCacheSize](https://docs.oracle.com/en/database/oracle/oracle-database/21/jajdb/oracle/jdbc/OracleConnection.html?is-external=true#CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE) - [oracle.jdbc.defaultLobPrefetchSize](https://docs.oracle.com/en/database/oracle/oracle-database/21/jajdb/oracle/jdbc/OracleConnection.html?is-external=true#CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE) - [oracle.net.disableOob](https://docs.oracle.com/en/database/oracle/oracle-database/21/jajdb/oracle/jdbc/OracleConnection.html?is-external=true#CONNECTION_PROPERTY_THIN_NET_DISABLE_OUT_OF_BAND_BREAK) - - Out of band (oob) breaks effect statement timeouts. Set this to "true" + - Out of band (OOB) breaks effect statement timeouts. Set this to "true" if statement timeouts are not working correctly. - Oracle Net Descriptors of the form ```(DESCRIPTION=...)``` may be specified as an io.r2dbc.spi.Option having the name `oracleNetDescriptor`. - If `oracleNetDescriptor` is specified, then it is invalid to specify any other options that might conflict with information in the descriptor, such as: `HOST`, `PORT`, `DATABASE`, and `SSL`. @@ -210,8 +224,9 @@ or Oracle JDBC Driver error message](https://docs.oracle.com/en/database/oracle/ - READ COMMITTED is the default transaction isolation level, and is the only level supported in this release. - Transaction savepoints are not supported in this release. -- TransactionDefinition.LOCK_WAIT_TIMEOUT is not supported in this release. - - Oracle Database does not support a lock wait timeout that applies to all statements within a transaction. +- Oracle Database does not support a lock wait timeout that is configurable + within the scope of a transaction or session. SPI methods that configure a + lock wait timeout throw ```UnsupportedOperationException``` ### Statements - Batch execution is only supported for DML type SQL commands (INSERT/UPDATE/DELETE). @@ -229,12 +244,6 @@ of each row affected by an INSERT or UPDATE. - The ROWID of a row may change. - After a row is deleted, its ROWID may be reassigned to a new row. - Further Reading: https://asktom.oracle.com/pls/apex/asktom.search?tag=is-it-safe-to-use-rowid-to-locate-a-row -- A **blocking database call** is executed by a Statement returning generated -values for a non-empty set of column names. - - The **blocking database call** is a known limitation that will be resolved - with a non-blocking implementation of - java.sql.Connection.prepareStatement(String, String[]) in the Oracle JDBC Driver. - The Oracle JDBC Team is aware of this problem and is working on a fix. - Returning generated values is only supported for INSERT and UPDATE commands when a RETURNING INTO clause can be appended to the end of that command. (This limitation may be resolved in a later release) - Example: `INSERT INTO my_table(val) VALUES (:val)` is supported because a RETURNING INTO clause may be appended to this command. - Example: `INSERT INTO my_table(val) SELECT 1 FROM sys.dual` is not supported because a RETURNING INTO clause may not be appended to this command. @@ -260,9 +269,9 @@ values for a non-empty set of column names. ```io.r2dbc.spi.Parameter.Out``` and ```io.r2dbc.spi.Parameter.In``` marker interfaces. - Consume out parameters by invoking - ```Result.map(BiFunction)```: + ```Result.map(Function)```: ```java - result.map((row,metadata) -> row.get("greeting_out", String.class)) + result.map(outParameters -> outParameters.get("greeting_out", String.class)) ``` - ```Statement.execute()``` returns a ```Publisher``` that emits one ```Result``` for each cursor returned by ```DBMS_SQL.RETURN_RESULT``` diff --git a/pom.xml b/pom.xml index c2b11cc..d81d899 100755 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ com.oracle.database.r2dbc oracle-r2dbc - 0.2.0 + 0.3.0 oracle-r2dbc Oracle R2DBC Driver implementing version 0.9.0.M2 of the R2DBC SPI for Oracle Database. @@ -65,7 +65,7 @@ 11 - 21.1.0.0 + 21.3.0.0 0.9.0.M2 3.3.0.RELEASE 1.0.3 diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index b5954da..796c99e 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -30,7 +30,7 @@ with oracle.r2dbc.impl.OracleConnectionFactoryProviderImpl; requires java.sql; - requires ojdbc11; + requires com.oracle.database.jdbc; requires reactor.core; requires transitive org.reactivestreams; requires transitive r2dbc.spi; diff --git a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java index c3cd269..8450932 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java @@ -175,12 +175,23 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory { OracleR2dbcExceptions.requireNonNull(options, "options is null."); adapter = ReactiveJdbcAdapter.getOracleAdapter(); dataSource = adapter.createDataSource(options); + + // Handle any Options that Oracle JDBC doesn't + if (options.hasOption(ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT)) { + throw new UnsupportedOperationException( + "Unsupported Option: " + + ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT.name() + + ". Oracle Database does not support a lock wait timeout session " + + "parameter."); + } + statementTimeout = Optional.ofNullable( options.getValue(ConnectionFactoryOptions.STATEMENT_TIMEOUT)) .map(timeout -> (timeout instanceof Duration) ? (Duration)timeout : Duration.parse(timeout.toString())) .orElse(Duration.ZERO); + } /** diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java index 1338d35..5f58c3a 100644 --- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java @@ -356,6 +356,16 @@ static OracleResultImpl createUpdateCountResult(long updateCount) { return new UpdateCountResult(updateCount); } + /** + * Creates a {@code Result} that publishes a batch of {@code updateCounts} + * as {@link UpdateCount} segments + * @return A {@code Result} for a batch DML update + * @param updateCounts Update counts to publish + */ + static OracleResultImpl createBatchUpdateResult(long[] updateCounts) { + return new BatchUpdateResult(updateCounts); + } + /** * Creates a {@code Result} that publishes update counts of a * {@code batchUpdateException} as {@link UpdateCount} segments, followed a @@ -444,18 +454,17 @@ Publisher publishSegments(Function mappingFunction) { private static final class ResultSetResult extends OracleResultImpl { private final ResultSet resultSet; + private final RowMetadataImpl metadata; private final ReactiveJdbcAdapter adapter; private ResultSetResult(ResultSet resultSet, ReactiveJdbcAdapter adapter) { this.resultSet = resultSet; + this.metadata = createRowMetadata(fromJdbc(resultSet::getMetaData)); this.adapter = adapter; } @Override Publisher publishSegments(Function mappingFunction) { - RowMetadataImpl metadata = - createRowMetadata(fromJdbc(resultSet::getMetaData)); - return adapter.publishRows(resultSet, jdbcReadable -> mappingFunction.apply( new RowSegmentImpl(createRow(jdbcReadable, metadata, adapter)))); @@ -595,7 +604,9 @@ Publisher publishSegments(Function mappingFunction) { // Invoke publishSegments(Class, Function) rather than // publishSegments(Function) to update the state of the result; Namely, // the state that has the onConsumed Publisher emit a terminal signal. - .concatWith(result.publishSegments(Segment.class, mappingFunction)); + .concatWith(result != null + ? result.publishSegments(Segment.class,mappingFunction) + : Mono.empty()); } } diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java index ba036d2..74e2cd4 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java @@ -28,7 +28,6 @@ import io.r2dbc.spi.Result; import io.r2dbc.spi.Statement; import io.r2dbc.spi.Type; -import oracle.r2dbc.impl.OracleR2dbcExceptions.ThrowingRunnable; import oracle.r2dbc.impl.OracleR2dbcExceptions.ThrowingSupplier; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -42,13 +41,14 @@ import java.sql.SQLType; import java.sql.SQLWarning; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.IntStream; @@ -59,10 +59,10 @@ import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull; import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireOpenConnection; import static oracle.r2dbc.impl.OracleR2dbcExceptions.runJdbc; -import static oracle.r2dbc.impl.SqlTypeMap.toJdbcType; import static oracle.r2dbc.impl.OracleResultImpl.createCallResult; import static oracle.r2dbc.impl.OracleResultImpl.createGeneratedValuesResult; import static oracle.r2dbc.impl.OracleResultImpl.createUpdateCountResult; +import static oracle.r2dbc.impl.SqlTypeMap.toJdbcType; /** *

@@ -717,7 +717,7 @@ private Publisher executeSql() { (preparedStatement, discardQueue) -> setBindValues(preparedStatement, currentBindValues, discardQueue), preparedStatement -> - publishSqlResult(preparedStatement, currentFetchSize)); + publishSqlResult(preparedStatement, currentFetchSize, true)); } /** @@ -793,48 +793,105 @@ private void requireAllParametersSet() { * returned {@code Publisher} emits 0 {@code Result}s if the {@code * preparedStatement} returns no update count, row data, or implicit results. *

+ * @param isCursorClosable {@code true} if the cursor can be closed if no + * result is a {@code ResultSet} * @return A {@code Publisher} that emits the {@code Result}s of executing a * {@code preparedStatement}. */ private Publisher publishSqlResult( - PreparedStatement preparedStatement, int fetchSize) { + PreparedStatement preparedStatement, int fetchSize, + boolean isCursorClosable) { return Mono.from(publishSqlExecution(preparedStatement, fetchSize)) .flatMapMany(isResultSet -> { + // Retain Publishers that complete when an implicit ResultSet is + // consumed. + List> implicitResultConsumptions = new ArrayList<>(0); + + // Collect all Results into a List before any are emitted to user + // code. Ideally, no JDBC API calls should occur after the first + // Result is emitted; Once a Result has been emitted, user code may + // initiate a new Statement execution, and the JDBC connection + // becomes locked. If a JDBC call occurs while the connection is + // locked, it will block the calling thread. This can potentially + // cause a deadlock where all threads are blocked until the JDBC + // connection is unlocked, and the JDBC connection can not become + // unlocked until a thread is available. + List results = new ArrayList<>(1); OracleResultImpl firstResult = getSqlResult(adapter, preparedStatement, isResultSet); - if (firstResult != null) { - return Mono.just(firstResult) - .concatWith(Mono.from(firstResult.onConsumed()) - .thenMany(publishMoreResults(adapter, preparedStatement))); + if (firstResult != null) + results.add(firstResult); + + do { + // Move the statement to the next result, if any + boolean isNextResultSet = fromJdbc(() -> + preparedStatement.getMoreResults( + PreparedStatement.KEEP_CURRENT_RESULT)); + + // Get the next result, if any + OracleResultImpl nextResult = + getSqlResult(adapter, preparedStatement, isNextResultSet); + + // Break out of this loop if there is no next result + if (nextResult == null) + break; + + // If the result is an implicit ResultSet, then retain it's + // consumption publisher + if (isNextResultSet) + implicitResultConsumptions.add(nextResult.onConsumed()); + + // Add the next result to the list of all results + results.add(nextResult); + } while (true); + + Publisher resultPublisher = + Flux.fromIterable(results); + + if (!isCursorClosable) { + // Don't attempt to close the cursor if the caller provided + // isCursorClosable as false + return resultPublisher; + } + else if (implicitResultConsumptions.isEmpty()) { + // If no result is a ResultSet, then the cursor can be closed now. + // Otherwise, PreparedStatement.closeOnCompletion() will close the + // cursor after the ResultSet emits the last row + if (!isResultSet) + runJdbc(preparedStatement::close); + else + runJdbc(preparedStatement::closeOnCompletion); + + return resultPublisher; } else { - return publishMoreResults(adapter, preparedStatement); + // If at least one Result is an implicit ResultSet, then + // PreparedStatement.closeOnCompletion() + return Flux.from(resultPublisher) + .concatWith(Flux.merge(implicitResultConsumptions) + .doFinally(signalType -> runJdbc(preparedStatement::close)) + .cast(OracleResultImpl.class)); } }); } + /** + * Publish the result of executing a {@code preparedStatement}. This method + * will configure the execution to use the specified {@code fetchSize} and + * {@link #timeout} specified to the constructor. + * @param preparedStatement Statement to execute + * @param fetchSize Fetch size to configure + * @return A {@code Publisher} that emits {@code true} if the + * first result is a ResultSet, otherwise {@code false}. + */ private Publisher publishSqlExecution( PreparedStatement preparedStatement, int fetchSize) { runJdbc(() -> preparedStatement.setFetchSize(fetchSize)); setQueryTimeout(preparedStatement); - return Mono.from(adapter.publishSQLExecution(preparedStatement)) - // Work around a bug in 21.1 Oracle JDBC that has the - // OraclePreparedStatement.executeAsyncOracle Publisher emit onError - // with a SQLException when the database returns a warning. In 21.3 it's - // fixed so that the Publisher emits onComplete and the SQLWarning is - // obtained from the usual call to PreparedStatement.getWarnings() - // TODO: Remove this when 21.1 Oracle JDBC is no longer supported - .onErrorResume(error -> - // ORA-17110 is the error code for warnings. If the R2dbcException has - // this code, then ignore it and return the normal boolean value - // indicating if the result is a ResultSet or not - error instanceof R2dbcException - && ((R2dbcException) error).getErrorCode() == 17110 - ? Mono.just(null != fromJdbc(preparedStatement::getResultSet)) - : Mono.error(error)); + return Mono.from(adapter.publishSQLExecution(preparedStatement)); } private void setQueryTimeout(PreparedStatement preparedStatement) { @@ -846,44 +903,6 @@ private void setQueryTimeout(PreparedStatement preparedStatement) { timeout.toSeconds() + (timeout.getNano() == 0 ? 0 : 1)))); } - /** - *

- * Publishes implicit {@code Result}s of update counts or row data - * indicated by {@link PreparedStatement#getMoreResults()}. - *

- * The returned {@code Publisher} terminates with {@code onComplete} after - * {@code getMoreResults} and {@link PreparedStatement#getUpdateCount()} - * indicate that all results have been published. The returned - * {@code Publisher} may emit 0 {@code Results}. - *

- * The returned {@code Publisher} does not emit the next {@code Result} - * until a previous {@code Result} has been fully consumed. The - * {@link java.sql.ResultSet} of a previous {@code Result} is closed when - * it has been fully consumed. - *

- * @param adapter Adapts JDBC calls into reactive streams. - * @param preparedStatement JDBC statement - * @return {@code Publisher} of implicit results. - */ - static Publisher publishMoreResults( - ReactiveJdbcAdapter adapter, PreparedStatement preparedStatement) { - - return Flux.defer(() -> { - OracleResultImpl next = - getSqlResult(adapter, preparedStatement, - fromJdbc(preparedStatement::getMoreResults)); - - if (next == null) { - return Mono.empty(); - } - else { - return Mono.just(next) - .concatWith(Mono.from(next.onConsumed()) - .thenMany(publishMoreResults(adapter,preparedStatement))); - } - }); - } - /** * Returns the current {@code Result} of a {@code preparedStatement}, which * is row data if {@code isResultSet} is {@code true}, or an update count if @@ -899,7 +918,8 @@ static Publisher publishMoreResults( private static OracleResultImpl getSqlResult( ReactiveJdbcAdapter adapter, PreparedStatement preparedStatement, boolean isResultSet) { - return fromJdbc(() -> { + + return getWarnings(preparedStatement, fromJdbc(() -> { if (isResultSet) { return OracleResultImpl.createQueryResult( preparedStatement.getResultSet(), adapter); @@ -913,7 +933,7 @@ private static OracleResultImpl getSqlResult( return null; } } - }); + })); } /** @@ -926,9 +946,19 @@ private static OracleResultImpl getSqlResult( private Publisher publishCallResult( CallableStatement callableStatement, Object[] bindValues, int fetchSize) { - return Flux.from(publishSqlResult(callableStatement, fetchSize)) - .concatWith(Mono.just(createCallResult( - createOutParameterRow(callableStatement, bindValues)))); + + // Create a Result of OutParameters that are read from the + // CallableStatement. + OracleResultImpl callResult = + createCallResult(createOutParameterRow(callableStatement, bindValues)); + + return Flux.concat( + publishSqlResult(callableStatement, fetchSize, false), + Mono.just(callResult) + // Close the CallableStatement after the Result is consumed. + .concatWith(Mono.from(callResult.onConsumed()) + .doOnTerminate(() -> runJdbc(callableStatement::close)) + .cast(OracleResultImpl.class))); } /** @@ -1049,6 +1079,7 @@ private Publisher executeGeneratingValues() { " has executed a query that returns row data"); } else { + runJdbc(preparedStatement::closeOnCompletion); return createGeneratedValuesResult( fromJdbc(preparedStatement::getUpdateCount), fromJdbc(preparedStatement::getGeneratedKeys), @@ -1097,8 +1128,11 @@ Publisher executeBatch() { addImplicit(); Queue currentBatch = batch; + int batchSize = batch.size(); batch = new LinkedList<>(); + // Index incremented with each update count + AtomicInteger index = new AtomicInteger(0); return execute( () -> jdbcConnection.prepareStatement(sql), (preparedStatement, discardQueue) -> @@ -1106,12 +1140,37 @@ Publisher executeBatch() { preparedStatement -> { setQueryTimeout(preparedStatement); return Flux.from(adapter.publishBatchUpdate(preparedStatement)) - .map(OracleResultImpl::createUpdateCountResult) - .onErrorResume(error -> - error.getCause() instanceof BatchUpdateException - ? Mono.just(OracleResultImpl.createBatchUpdateErrorResult( - (BatchUpdateException) error.getCause())) - : Mono.error(error)); + // All update counts are collected into a single long[] + .collect( + () -> new long[batchSize], + (updateCounts, updateCount) -> + updateCounts[index.getAndIncrement()] = updateCount) + .map(updateCounts -> { + // Map the long[] to a batch update count Result + OracleResultImpl result = getWarnings( + preparedStatement, + OracleResultImpl.createBatchUpdateResult(updateCounts)); + + // Close the cursor before emitting the Result + runJdbc(preparedStatement::close); + return result; + }) + .onErrorResume(error -> { + final Mono resultPublisher; + + if (error.getCause() instanceof BatchUpdateException) { + resultPublisher = Mono.just( + OracleResultImpl.createBatchUpdateErrorResult( + (BatchUpdateException) error.getCause())); + } + else { + resultPublisher = Mono.error(error); + } + + // Close the cursor before emitting the Result + runJdbc(preparedStatement::close); + return resultPublisher; + }); }); } @@ -1431,32 +1490,27 @@ private static void registerOutParameters( BiFunction>, Publisher> bindFunction, Function> resultFunction) { - // Reference to the last emitted result - AtomicReference lastResultRef = - new AtomicReference<>(null); - - return Flux.usingWhen(Mono.fromSupplier(statementSupplier), - - preparedStatement -> - Flux.usingWhen( - Mono.just(new LinkedList>()), - discardQueue -> - Flux.from(bindFunction.apply(preparedStatement, discardQueue)) - .thenMany(resultFunction.apply(preparedStatement)) - .onErrorResume(R2dbcException.class, r2dbcException -> - Mono.just(OracleResultImpl.createErrorResult(r2dbcException))) - .defaultIfEmpty(createUpdateCountResult(-1L)) - .map(result -> getWarnings(preparedStatement, result)) - .doOnNext(lastResultRef::set), - discardQueue -> - Flux.fromIterable(discardQueue) - .concatMapDelayError(Function.identity())), - - preparedStatement -> - Mono.justOrEmpty(lastResultRef.get()) - .flatMap(result -> Mono.from(result.onConsumed())) - .doOnTerminate((ThrowingRunnable)preparedStatement::close) - .doOnCancel((ThrowingRunnable)preparedStatement::close)); + T preparedStatement = statementSupplier.get(); + AtomicBoolean isResultEmitted = new AtomicBoolean(false); + return Flux.usingWhen( + Mono.just(new LinkedList>()), + discardQueue -> + Flux.from(bindFunction.apply(preparedStatement, discardQueue)) + .thenMany(resultFunction.apply(preparedStatement)), + discardQueue -> + Flux.concatDelayError(Flux.fromIterable(discardQueue))) + .doOnNext(result -> isResultEmitted.set(true)) + .onErrorResume(R2dbcException.class, r2dbcException -> + Mono.just(OracleResultImpl.createErrorResult(r2dbcException))) + .defaultIfEmpty(createUpdateCountResult(-1L)) + .doFinally(signalType -> { + // Close the cursor if the publisher is cancelled or emits an error + // before a Result is emitted. Otherwise, the resultFunction should + // arrange for the cursor to be closed as it may need to remain open + // until the Result is consumed + if (! isResultEmitted.get()) + runJdbc(preparedStatement::close); + }); } /** @@ -1473,6 +1527,7 @@ private static void registerOutParameters( private static OracleResultImpl getWarnings( PreparedStatement preparedStatement, OracleResultImpl result) { SQLWarning warning = fromJdbc(preparedStatement::getWarnings); + runJdbc(preparedStatement::clearWarnings); return warning == null ? result : OracleResultImpl.createWarningResult(warning, result); diff --git a/src/test/java/oracle/r2dbc/impl/OracleReableImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleReadableImplTest.java similarity index 96% rename from src/test/java/oracle/r2dbc/impl/OracleReableImplTest.java rename to src/test/java/oracle/r2dbc/impl/OracleReadableImplTest.java index b0d8300..49f9b44 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleReableImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleReadableImplTest.java @@ -41,7 +41,7 @@ * {@link OracleReadableImpl} implements behavior that is specified in it's class * and method level javadocs. */ -public class OracleReableImplTest { +public class OracleReadableImplTest { /** * Verifies the implementation of @@ -66,7 +66,7 @@ public void testGetByIndex() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndex") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(-1)))); // Expect IllegalArgumentException for an index greater than or equal @@ -75,7 +75,7 @@ public void testGetByIndex() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndex") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(2)))); // Expect valid indexes to return the INSERTed values @@ -134,7 +134,7 @@ public void testGetByName() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(null)))); // Expect IllegalArgumentException for unmatched names @@ -142,37 +142,37 @@ public void testGetByName() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("z")))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("xx")))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("x ")))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(" x")))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(" ")))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("")))); // Expect valid names to return the INSERTed values @@ -290,7 +290,7 @@ public void testGetByIndexAndType() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndexAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(0, null)))); // Expect IllegalArgumentException for an unsupported type @@ -299,7 +299,7 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndexAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(0, Unsupported.class)))); @@ -308,7 +308,7 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndexAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(-1, Integer.class)))); // Expect IllegalArgumentException for an index greater than or equal @@ -317,7 +317,7 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByIndexAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(2, Integer.class)))); // Expect valid indexes to return the INSERTed values @@ -379,7 +379,7 @@ public void testGetByNameAndType() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("x", null)))); // Expect IllegalArgumentException for an unsupported type @@ -388,7 +388,7 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("x", Unsupported.class)))); @@ -397,7 +397,7 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(null, Integer.class)))); // Expect IllegalArgumentException for unmatched names @@ -405,37 +405,37 @@ class Unsupported {} Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("z", Integer.class)))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("xx", Integer.class)))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("x ", Integer.class)))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(" x", Integer.class)))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get(" ", Integer.class)))); awaitError(IllegalArgumentException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetByNameAndType") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> row.get("", Integer.class)))); // Expect valid names to return the INSERTed values diff --git a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java index 0a83636..13428b9 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java @@ -22,7 +22,6 @@ package oracle.r2dbc.impl; import io.r2dbc.spi.Connection; -import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.Result; import io.r2dbc.spi.Result.Message; import io.r2dbc.spi.Result.RowSegment; @@ -37,7 +36,6 @@ import java.util.Iterator; import java.util.List; -import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -56,7 +54,10 @@ import static oracle.r2dbc.util.Awaits.consumeOne; import static oracle.r2dbc.util.Awaits.tryAwaitExecution; import static oracle.r2dbc.util.Awaits.tryAwaitNone; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Verifies that @@ -264,22 +265,23 @@ public void testMap() { }); // Expect no rows from SELECT of zero rows - consumeOne(connection.createStatement( + awaitNone(Flux.from(connection.createStatement( "SELECT x, y FROM testMap WHERE x = 99") - .execute(), - noRowsResult -> { + .execute()) + .flatMap(noRowsResult -> { Publisher noRowsPublisher = noRowsResult.map((row, metadata) -> row.get(0)); - awaitNone(noRowsPublisher); // Expect IllegalStateException from multiple Result consumptions. assertThrows(IllegalStateException.class, () -> noRowsResult.map((row, metadata) -> "unexpected")); assertThrows(IllegalStateException.class, noRowsResult::getRowsUpdated); - // Expect row data publisher to reject multiple subscribers - awaitError(IllegalStateException.class, noRowsPublisher); - }); + return Flux.from(noRowsPublisher) + .doOnTerminate(() -> + // Expect row data publisher to reject multiple subscribers + awaitError(IllegalStateException.class, noRowsPublisher)); + })); // Expect 2 rows from SELECT of 2 rows awaitMany(asList(asList(0, 1), asList(0, 2)), @@ -303,7 +305,7 @@ public void testMap() { assertThrows(IllegalStateException.class, selectResult::getRowsUpdated); return Flux.from(selectRowPublisher) - .doFinally(signalType -> + .doOnTerminate(() -> // Expect row data publisher to reject multiple subscribers awaitError(IllegalStateException.class, selectRowPublisher)); })); diff --git a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java index a58c329..33c2beb 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java @@ -84,7 +84,7 @@ public void testGetColumnMetadataByIndex() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByIndex") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata(-1).getPrecision()))); @@ -94,7 +94,7 @@ public void testGetColumnMetadataByIndex() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByIndex") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata(2).getPrecision()))); @@ -165,7 +165,7 @@ public void testGetColumnMetadataByName() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata(null)) )); @@ -174,42 +174,42 @@ public void testGetColumnMetadataByName() { Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata("z")) )); awaitError(NoSuchElementException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata("xx")) )); awaitError(NoSuchElementException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata("x ")) )); awaitError(NoSuchElementException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata(" x")) )); awaitError(NoSuchElementException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata(" ")) )); awaitError(NoSuchElementException.class, Flux.from(connection.createStatement( "SELECT x, y FROM testGetColumnMetadataByName") .execute()) - .flatMap(result -> + .concatMap(result -> result.map((row, metadata) -> metadata.getColumnMetadata("")) )); diff --git a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java index 74139dd..d945f75 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java @@ -948,7 +948,10 @@ public void testReturnGeneratedValues() { // Expect a failure with invalid column name "eye-d" assertEquals(statement, statement.returnGeneratedValues("x", "eye-d")); - awaitError(R2dbcException.class, statement.bind(0, "test").execute()); + awaitError(R2dbcException.class, + Flux.from(statement.bind(0, "test").execute()) + .flatMap(result -> + result.map(generatedValues -> fail("Unexpected row")))); // Expect a ROWID value when no column names are specified Statement rowIdQuery = connection.createStatement( @@ -1755,8 +1758,8 @@ public void testNoOutImplicitResult() { IntStream.rangeClosed(0, 100) .forEach(i -> insert.bind(0, i).add()); awaitOne(101, Flux.from(insert.execute()) - .reduce(0, (updateCount, result) -> - updateCount + awaitOne(result.getRowsUpdated()))); + .flatMap(Result::getRowsUpdated) + .reduce(0, (total, updateCount) -> total + updateCount)); // Create a procedure that returns a cursor awaitExecution(connection.createStatement( @@ -1865,8 +1868,8 @@ public void testOutAndImplicitResult() { IntStream.rangeClosed(0, 100) .forEach(i -> insert.bind(0, i).add()); awaitOne(101, Flux.from(insert.execute()) - .reduce(0, (updateCount, result) -> - updateCount + awaitOne(result.getRowsUpdated()))); + .flatMap(Result::getRowsUpdated) + .reduce(0, (total, updateCount) -> total + updateCount)); // Create a procedure that returns a cursor awaitExecution(connection.createStatement( diff --git a/src/test/java/oracle/r2dbc/test/OracleTestKit.java b/src/test/java/oracle/r2dbc/test/OracleTestKit.java index 7cf912a..028fda5 100755 --- a/src/test/java/oracle/r2dbc/test/OracleTestKit.java +++ b/src/test/java/oracle/r2dbc/test/OracleTestKit.java @@ -24,7 +24,13 @@ package oracle.r2dbc.test; -import io.r2dbc.spi.*; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactories; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryOptions; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Row; +import io.r2dbc.spi.Statement; import io.r2dbc.spi.test.TestKit; import oracle.jdbc.datasource.OracleDataSource; import org.junit.jupiter.api.Disabled; @@ -38,8 +44,8 @@ import java.math.BigDecimal; import java.sql.SQLException; import java.util.Arrays; -import java.util.Collection; import java.util.function.Function; +import java.util.stream.IntStream; import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER; @@ -195,6 +201,20 @@ private Object extractColumn(String name, Row row) { return value; } + /** + * {@inheritDoc} + *

+ * Override the default implementation to extract multiple update counts + * from a single {@code result} and return a {@code Mono} that emits the + * sum of all update counts. + *

+ */ + @Override + public Mono extractRowsUpdated(Result result) { + return Flux.from(result.getRowsUpdated()) + .reduce(0, (total, updateCount) -> total + updateCount); + } + @Override public String getPlaceholder(int index) { return String.format(":%d", index + 1); @@ -242,6 +262,36 @@ public void duplicateColumnNames() { .verifyComplete(); } + /** + * {@inheritDoc} + *

+ * Overrides the default implementation to expect 10 {@link io.r2dbc.spi.Result.UpdateCount} + * segments from a single {@code Result}. The default implementation expects + * 10 {@code Result}s each with a single {@code UpdateCount}. Batch DML + * execution is a single call to Oracle Database, and so Oracle R2DBC + * returns a signle {@code Result} + *

+ */ + @Override + @Test + public void prepareStatement() { + Flux.usingWhen(getConnectionFactory().create(), + connection -> { + Statement statement = connection.createStatement(expand(TestStatement.INSERT_VALUE_PLACEHOLDER, getPlaceholder(0))); + + IntStream.range(0, 10) + .forEach(i -> TestKit.bind(statement, getIdentifier(0), i).add()); + + return Flux.from(statement + .execute()) + .flatMap(Result::getRowsUpdated); + }, + Connection::close) + .as(StepVerifier::create) + .expectNextCount(10).as("values from insertions") + .verifyComplete(); + } + @Disabled("Compound statements are not supported by Oracle Database") @Test @Override diff --git a/src/test/java/oracle/r2dbc/util/Awaits.java b/src/test/java/oracle/r2dbc/util/Awaits.java index d26958b..b985b07 100644 --- a/src/test/java/oracle/r2dbc/util/Awaits.java +++ b/src/test/java/oracle/r2dbc/util/Awaits.java @@ -22,7 +22,6 @@ package oracle.r2dbc.util; import io.r2dbc.spi.Result; -import io.r2dbc.spi.Row; import io.r2dbc.spi.Statement; import oracle.r2dbc.test.DatabaseConfig; import org.reactivestreams.Publisher;