diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java index 965d31d..39ea95b 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java @@ -7,7 +7,6 @@ import java.util.concurrent.atomic.AtomicReference; import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbQueryResult; @@ -15,7 +14,6 @@ import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.common.YdbTypes; -import tech.ydb.jdbc.exception.YdbRetryableException; import tech.ydb.jdbc.impl.YdbQueryResultReader; import tech.ydb.jdbc.impl.YdbQueryResultStatic; import tech.ydb.jdbc.impl.YdbResultSetMemory; @@ -89,24 +87,6 @@ public void ensureOpened() throws SQLException { } } - @Override - public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) - throws SQLException { - boolean insideTx = isInsideTransaction(); - while (true) { - try { - return executeQueryImpl(statement, query, preparedYql, params); - } catch (YdbRetryableException ex) { - if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) { - throw ex; - } - } - } - } - - protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, - Params params) throws SQLException; - @Override public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException { ensureOpened(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index b113f4c..1a52ae5 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -13,7 +13,6 @@ import tech.ydb.core.Status; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbQueryResult; -import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.impl.YdbQueryResultExplain; @@ -225,12 +224,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } @Override - protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params) throws SQLException { ensureOpened(); YdbValidator validator = statement.getValidator(); - int timeout = statement.getQueryTimeout(); ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); if (timeout > 0) { @@ -251,41 +249,6 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query String yql = prefixPragma + preparedYql; - if (useStreamResultSet) { - tracer.trace("--> stream query"); - tracer.query(yql); - String msg = "STREAM_QUERY >>\n" + yql; - - final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) { - @Override - public void onClose(Status status, Throwable th) { - if (th != null) { - tracer.trace("<-- " + th.getMessage()); - } - if (status != null) { - validator.addStatusIssues(status); - tracer.trace("<-- " + status.toString()); - } - - if (localTx.isActive()) { - tracer.setId(localTx.getId()); - } else { - if (tx.compareAndSet(localTx, null)) { - localTx.getSession().close(); - } - tracer.close(); - } - - super.onClose(status, th); - } - }; - - settings = settings.withGrpcFlowControl(reader); - QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build()); - validator.execute(msg, tracer, () -> reader.load(validator, stream)); - return updateCurrentResult(reader); - } - try { tracer.trace("--> data query"); tracer.query(yql); @@ -296,11 +259,12 @@ public void onClose(Status status, Throwable th) { ); validator.addStatusIssues(result.getIssueList()); - YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()]; + YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()]; for (int idx = 0; idx < readers.length; idx++) { readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); } - return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + + return readers; } finally { if (!localTx.isActive()) { if (tx.compareAndSet(localTx, null)) { @@ -316,6 +280,71 @@ public void onClose(Status status, Throwable th) { } } + @Override + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { + ensureOpened(); + + if (!useStreamResultSet) { + YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params); + return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + } + + YdbValidator validator = statement.getValidator(); + int timeout = statement.getQueryTimeout(); + ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); + if (timeout > 0) { + settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS); + } + + QueryTransaction nextTx = tx.get(); + while (nextTx == null) { + nextTx = createNewQuerySession(validator).createNewTransaction(txMode); + if (!tx.compareAndSet(null, nextTx)) { + nextTx.getSession().close(); + nextTx = tx.get(); + } + } + + final QueryTransaction localTx = nextTx; + YdbTracer tracer = statement.getConnection().getCtx().getTracer(); + + String yql = prefixPragma + preparedYql; + + tracer.trace("--> stream query"); + tracer.query(yql); + String msg = "STREAM_QUERY >>\n" + yql; + + YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) { + @Override + public void onClose(Status status, Throwable th) { + if (th != null) { + tracer.trace("<-- " + th.getMessage()); + } + if (status != null) { + validator.addStatusIssues(status); + tracer.trace("<-- " + status.toString()); + } + + if (localTx.isActive()) { + tracer.setId(localTx.getId()); + } else { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } + tracer.close(); + } + + super.onClose(status, th); + } + }; + + settings = settings.withGrpcFlowControl(reader); + QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build()); + validator.execute(msg, tracer, () -> reader.load(validator, stream)); + return updateCurrentResult(reader); + } + @Override public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException { ensureOpened(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index 28fe73a..8d65857 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -7,7 +7,6 @@ import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbQueryResult; -import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; import tech.ydb.jdbc.impl.YdbQueryResultExplain; @@ -201,40 +200,53 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query } } - @Override - protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params) - throws SQLException { - ensureOpened(); - - YdbValidator validator = statement.getValidator(); + private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql, + ExecuteDataQuerySettings settings, Params prms) throws SQLException { Session session = tx.getSession(validator); - String yql = prefixPragma + preparedYql; - YdbTracer tracer = statement.getConnection().getCtx().getTracer(); - tracer.trace("--> data query"); - tracer.query(yql); - try { + tracer.trace("--> data query"); + tracer.query(yql); + DataQueryResult result = validator.call( - QueryType.DATA_QUERY + " >>\n" + yql, tracer, - () -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(statement)) + QueryType.DATA_QUERY + " >>\n" + yql, + tracer, + () -> session.executeDataQuery(yql, tx.txControl(), prms, settings) ); updateState(tx.withDataQuery(session, result.getTxId())); - YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()]; - for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { - ResultSetReader rs = result.getResultSet(idx); - if (failOnTruncatedResult && rs.isTruncated()) { - String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount()); - throw new SQLException(msg); + if (failOnTruncatedResult) { + for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { + ResultSetReader rs = result.getResultSet(idx); + if (rs.isTruncated()) { + String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount()); + throw new SQLException(msg); + } } - - readers[idx] = new YdbResultSetMemory(types, statement, rs); } - return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + return result; } catch (SQLException | RuntimeException ex) { updateState(tx.withRollback(session)); throw ex; + } + } + + @Override + public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params) + throws SQLException { + ensureOpened(); + + YdbValidator validator = statement.getValidator(); + String yql = prefixPragma + preparedYql; + YdbTracer tracer = statement.getConnection().getCtx().getTracer(); + + try { + DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params); + YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()]; + for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { + readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); + } + return readers; } finally { if (tx.isInsideTransaction()) { tracer.setId(tx.txID()); @@ -244,6 +256,13 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query } } + @Override + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) + throws SQLException { + YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params); + return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + } + @Override public boolean isValid(YdbValidator validator, int timeout) throws SQLException { ensureOpened(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java index 50ec095..8532c91 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java @@ -4,6 +4,7 @@ import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.impl.YdbResultSetMemory; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.table.query.Params; import tech.ydb.table.values.ListValue; @@ -34,6 +35,7 @@ public interface YdbExecutor { YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException; YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; + YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException; void commit(YdbContext ctx, YdbValidator validator) throws SQLException; void rollback(YdbContext ctx, YdbValidator validator) throws SQLException; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java index ef456f6..429843f 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java @@ -22,6 +22,7 @@ import java.sql.Types; import java.util.Arrays; import java.util.Calendar; +import java.util.List; import java.util.Objects; import java.util.logging.Logger; @@ -97,9 +98,14 @@ public int[] executeBatch() throws SQLException { YdbQueryResult newState = executeBulkUpsert(query, bulk.getTablePath(), bulk.getBatchedBulk()); updateState(newState); } else { - for (Params prm: prepared.getBatchParams()) { + List prms = prepared.getBatchParams(); + if (prms.size() == 1) { + Params prm = prms.get(0); YdbQueryResult newState = executeDataQuery(query, prepared.getBatchText(prm), prm); updateState(newState); + } else { + YdbQueryResult newState = executeBatchQuery(query, prepared::getBatchText, prms); + updateState(newState); } } } finally { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java index 122285d..a8d813a 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java @@ -34,6 +34,10 @@ public YdbResultSetMemory(YdbTypes types, YdbStatement statement, ResultSetReade this.totalCount = total; } + public ResultSetReader[] getResultSets() { + return rs; + } + @Override protected ValueReader getValue(int columnIndex) throws SQLException { if (!isRowIndexValid()) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java index af961a0..47dc843 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java @@ -5,17 +5,26 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Objects; +import java.util.function.Function; import java.util.logging.Logger; +import tech.ydb.core.Issue; +import tech.ydb.core.StatusCode; import tech.ydb.jdbc.YdbConnection; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbResultSet; import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.common.YdbTypes; import tech.ydb.jdbc.context.QueryStat; import tech.ydb.jdbc.context.YdbContext; +import tech.ydb.jdbc.context.YdbExecutor; import tech.ydb.jdbc.context.YdbValidator; +import tech.ydb.jdbc.exception.YdbRetryableException; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.jdbc.settings.FakeTxMode; import tech.ydb.jdbc.settings.YdbOperationProperties; @@ -190,6 +199,8 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par prepareNewExecution(); YdbContext ctx = connection.getCtx(); + YdbExecutor executor = connection.getExecutor(); + if (ctx.isFullScanDetectorEnabled()) { if (QueryStat.isPrint(yql)) { ResultSetReader rsr = QueryStat.toResultSetReader(ctx.getFullScanDetectorStats()); @@ -201,9 +212,65 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par return null; } } - ctx.traceQueryByFullScanDetector(query, yql); - return connection.getExecutor().executeDataQuery(this, query, yql, params); + + boolean isInsideTx = executor.isInsideTransaction(); + while (true) { + try { + return executor.executeDataQuery(this, query, yql, params); + } catch (YdbRetryableException ex) { + if (isInsideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) { + throw ex; + } + // TODO: Move this logic to YdbValidator + Issue warning = Issue.of("Operation retried because of of BAD_SESSION", Issue.Severity.INFO); + validator.addStatusIssues(Arrays.asList(warning)); + } + } + } + + protected YdbQueryResult executeBatchQuery(YdbQuery query, Function queryFunc, List params) + throws SQLException { + prepareNewExecution(); + + if (params.isEmpty()) { + return new YdbQueryResultEmpty(); + } + + YdbExecutor executor = connection.getExecutor(); + YdbTypes types = connection.getCtx().getTypes(); + List batchResults = new ArrayList<>(); + int count = 0; + + boolean autoCommit = executor.isAutoCommit(); + try { + if (autoCommit) { + executor.setAutoCommit(false); + } + for (Params prm: params) { + YdbResultSetMemory[] res = executor.executeInMemoryQuery(this, queryFunc.apply(prm), prm); + count = Math.max(count, res.length); + batchResults.add(res); + } + if (autoCommit) { + executor.commit(connection.getCtx(), validator); + } + } finally { + executor.setAutoCommit(autoCommit); + } + + YdbResultSetMemory[] merged = new YdbResultSetMemory[count]; + for (int idx = 0; idx < count; idx += 1) { + List expressionResults = new ArrayList<>(); + for (YdbResultSetMemory[] res: batchResults) { + if (idx < res.length) { + expressionResults.addAll(Arrays.asList(res[idx].getResultSets())); + } + } + merged[idx] = new YdbResultSetMemory(types, this, expressionResults.toArray(new ResultSetReader[0])); + } + + return new YdbQueryResultStatic(query, merged); } protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbPreparedQuery.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbPreparedQuery.java index 7779962..83b280b 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbPreparedQuery.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbPreparedQuery.java @@ -14,7 +14,7 @@ */ public interface YdbPreparedQuery { String getQueryText(Params prms) throws SQLException; - String getBatchText(Params prms) throws SQLException; + String getBatchText(Params prms); void clearParameters(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/params/InMemoryQuery.java b/jdbc/src/main/java/tech/ydb/jdbc/query/params/InMemoryQuery.java index b4356df..bcfb376 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/params/InMemoryQuery.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/params/InMemoryQuery.java @@ -43,7 +43,7 @@ public InMemoryQuery(YdbQuery query, boolean isAutoDeclare) { } @Override - public String getQueryText(Params prms) throws SQLException { + public String getQueryText(Params prms) { if (!isAutoDeclare) { return yql; } @@ -62,7 +62,7 @@ public String getQueryText(Params prms) throws SQLException { } @Override - public String getBatchText(Params prms) throws SQLException { + public String getBatchText(Params prms) { return getQueryText(prms); } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/GeneratedKeysTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/GeneratedKeysTest.java index 8ffe5c5..71f3378 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/GeneratedKeysTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/GeneratedKeysTest.java @@ -11,7 +11,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -64,7 +63,7 @@ public void inMemoryQueriesTest() throws SQLException { ps.setString(1, "first value"); Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING *"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -82,7 +81,7 @@ public void inMemoryQueriesTest() throws SQLException { ps.setString(1, "first value"); Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING `id`"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -101,7 +100,7 @@ public void inMemoryQueriesTest() throws SQLException { ps.setString(2, "first value"); Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING `id`, `value`"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -123,7 +122,7 @@ public void inMemoryQueriesTest() throws SQLException { "$i=select 1; DELETE FROM serial_test", Statement.RETURN_GENERATED_KEYS)) { Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING *"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -144,7 +143,6 @@ public void inMemoryQueriesTest() throws SQLException { } @Test - @Disabled public void inMemoryBatchQueriesTest() throws SQLException { try (Connection connection = DriverManager.getConnection(jdbcURL.build())) { TestTxTracer tracer = YdbTracerImpl.use(new TestTxTracer()); @@ -159,7 +157,7 @@ public void inMemoryBatchQueriesTest() throws SQLException { Assertions.assertEquals(2, ps.executeBatch().length); - tracer.assertQueriesCount(2); + tracer.assertQueriesCount(2, true); tracer.assertLastQueryContains("RETURNING *"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -193,7 +191,7 @@ public void standardQueriesTest() throws SQLException { ps.addBatch(); Assertions.assertEquals(3, ps.executeBatch().length); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("FROM AS_TABLE($batch)"); tracer.assertLastQueryContains("RETURNING *"); @@ -219,7 +217,7 @@ public void standardQueriesTest() throws SQLException { ps.setString(2, "first value"); Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING `id`, `value`"); try (ResultSet rs = ps.getGeneratedKeys()) { @@ -238,7 +236,7 @@ public void standardQueriesTest() throws SQLException { ps.setString(1, "second value"); Assertions.assertEquals(1, ps.executeUpdate()); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); tracer.assertLastQueryContains("RETURNING *"); try (ResultSet rs = ps.getGeneratedKeys()) { diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbPreparedStatementTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbPreparedStatementTest.java index 6471c67..0d5cc82 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbPreparedStatementTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbPreparedStatementTest.java @@ -277,7 +277,7 @@ public void batchUpsertTest(SqlQueries.JdbcQuery query) throws SQLException { statement.addBatch(); statement.executeBatch(); - tracer.assertQueriesCount(batched ? 1 : 2); + tracer.assertQueriesCount(batched ? 1 : 2, !batched); // ----- executeBatch without addBatch ----- statement.setInt(1, 3); @@ -288,7 +288,7 @@ public void batchUpsertTest(SqlQueries.JdbcQuery query) throws SQLException { statement.setString(2, "value-4"); statement.executeBatch(); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); // ----- execute instead of executeBatch ----- statement.setInt(1, 5); @@ -299,7 +299,7 @@ public void batchUpsertTest(SqlQueries.JdbcQuery query) throws SQLException { statement.setString(2, "value-6"); statement.execute(); - tracer.assertQueriesCount(1); + tracer.assertQueriesCount(1, false); } String select = TEST_TABLE.selectColumn("c_Text"); @@ -313,6 +313,153 @@ public void batchUpsertTest(SqlQueries.JdbcQuery query) throws SQLException { } } + @ParameterizedTest(name = "with {0}") + @EnumSource(SqlQueries.JdbcQuery.class) + public void batchInsertTest(SqlQueries.JdbcQuery query) throws SQLException { + String insert = TEST_TABLE.insertOne(query, "c_Text", "Text"); + boolean batched = query != SqlQueries.JdbcQuery.TYPED && query != SqlQueries.JdbcQuery.IN_MEMORY; + try (PreparedStatement statement = jdbc.connection().prepareStatement(insert)) { + TestTxTracer tracer = YdbTracerImpl.use(new TestTxTracer()); + + // ----- base usage ----- + statement.setInt(1, 1); + statement.setString(2, "value-1"); + statement.addBatch(); + + statement.setInt(1, 2); + statement.setString(2, "value-2"); + statement.addBatch(); + + statement.executeBatch(); + tracer.assertQueriesCount(batched ? 1 : 2, !batched); + + // ----- confict error----- + statement.setInt(1, 3); + statement.setString(2, "value-3"); // ok, but must be rollbacked + statement.addBatch(); + + statement.setInt(1, 2); // conflict with existing key + statement.setString(2, "value-4"); + statement.addBatch(); + + ExceptionAssert.ydbException("Conflict with existing key.", statement::executeBatch); + tracer.assertQueriesCount(batched ? 1 : 2, false); + + // repeat with correct keys + statement.setInt(1, 5); + statement.setString(2, "value-5"); + statement.addBatch(); + + statement.setInt(1, 6); + statement.setString(2, "value-6"); + statement.addBatch(); + + statement.executeBatch(); + tracer.assertQueriesCount(batched ? 1 : 2, !batched); + } + + String select = TEST_TABLE.selectColumn("c_Text"); + try (Statement statement = jdbc.connection().createStatement()) { + TextSelectAssert.of(statement.executeQuery(select), "c_Text", "Text") + .nextRow(1, "value-1") + .nextRow(2, "value-2") + .nextRow(5, "value-5") + .nextRow(6, "value-6") + .noNextRows(); + } + } + + @ParameterizedTest(name = "with {0}") + @EnumSource(SqlQueries.JdbcQuery.class) + public void batchTransactionTest(SqlQueries.JdbcQuery query) throws SQLException { + String insert = TEST_TABLE.insertOne(query, "c_Text", "Text"); + String select = TEST_TABLE.selectColumn("c_Text"); + + boolean batched = query != SqlQueries.JdbcQuery.TYPED && query != SqlQueries.JdbcQuery.IN_MEMORY; + jdbc.connection().setAutoCommit(false); + + try (PreparedStatement statement = jdbc.connection().prepareStatement(insert)) { + TestTxTracer tracer = YdbTracerImpl.use(new TestTxTracer()); + + // ----- base usage ----- + statement.setInt(1, 1); + statement.setString(2, "value-1"); + statement.addBatch(); + + statement.setInt(1, 2); + statement.setString(2, "value-2"); + statement.addBatch(); + + statement.executeBatch(); + tracer.assertQueriesCount(batched ? 1 : 2, false); + + // ----- confict error----- + statement.setInt(1, 3); + statement.setString(2, "value-3"); // ok, but must be rollbacked + statement.addBatch(); + + statement.setInt(1, 2); // conflict with existing key + statement.setString(2, "value-4"); + statement.addBatch(); + + statement.setInt(1, 5); // ignored + statement.setString(2, "value-5"); + statement.addBatch(); + + ExceptionAssert.ydbException("Conflict with existing key.", statement::executeBatch); + tracer.assertQueriesCount(batched ? 1 : 2, false); + + jdbc.connection().commit(); // ignored + } + + try (Statement statement = jdbc.connection().createStatement()) { + TextSelectAssert.of(statement.executeQuery(select), "c_Text", "Text").noNextRows(); + } + + try (PreparedStatement statement = jdbc.connection().prepareStatement(insert)) { + TestTxTracer tracer = YdbTracerImpl.use(new TestTxTracer()); + + // ----- base usage ----- + statement.setInt(1, 1); + statement.setString(2, "value-1"); + statement.addBatch(); + + statement.setInt(1, 2); + statement.setString(2, "value-2"); + statement.addBatch(); + + statement.executeBatch(); + tracer.assertQueriesCount(batched ? 1 : 2, false); + + statement.setInt(1, 3); + statement.setString(2, "value-3"); // ok, but must be rollbacked + statement.addBatch(); + + statement.setInt(1, 4); + statement.setString(2, "value-4"); + statement.addBatch(); + + statement.setInt(1, 5); // ignored + statement.setString(2, "value-5"); + statement.addBatch(); + + statement.executeBatch(); + tracer.assertQueriesCount(batched ? 1 : 3, false); + + jdbc.connection().commit(); + } + + try (Statement statement = jdbc.connection().createStatement()) { + TextSelectAssert.of(statement.executeQuery(select), "c_Text", "Text") + .nextRow(1, "value-1") + .nextRow(2, "value-2") + .nextRow(3, "value-3") + .nextRow(4, "value-4") + .nextRow(5, "value-5") + .noNextRows(); + } + } + private int ydbType(PrimitiveType type) { return YdbConst.SQL_KIND_PRIMITIVE + type.ordinal(); } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/SqlQueries.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/SqlQueries.java index 09c48dc..8adf347 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/SqlQueries.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/SqlQueries.java @@ -55,7 +55,7 @@ public enum YqlQuery { JdbcQuery.IN_MEMORY, "" + "$ignored = select 1;\n" + - "upsert into #tableName (key, #column) values (?, ?); select 1;", + "upsert into #tableName (key, #column) values (?, ?);", JdbcQuery.TYPED, "" + "declare $p1 as Int32;\n" + @@ -68,6 +68,28 @@ public enum YqlQuery { "upsert into #tableName select * from as_table(ListMap($values, $mapper));" ); + private static final Map JDBC_INSERT_ONE = ImmutableMap.of( + JdbcQuery.STANDARD, "" + + "insert into #tableName (key, #column) values (?, ?)", + + JdbcQuery.BULK, "" + // Bulk insert is not supported + "insert into #tableName (key, #column) values (?, ?)", + + JdbcQuery.IN_MEMORY, "" + + "$ignored = select 1;\n" + + "insert into #tableName (key, #column) values (?, ?);", + + JdbcQuery.TYPED, "" + + "declare $p1 as Int32;\n" + + "declare $p2 as #type;\n" + + "insert into #tableName (key, #column) values ($p1, $p2)", + + JdbcQuery.BATCHED, "" + + "declare $values as List>;\n" + + "$mapper = ($row) -> (AsStruct($row.p1 as key, $row.p2 as #column));\n" + + "insert into #tableName select * from as_table(ListMap($values, $mapper));" + ); + private static final Map YQL_UPSERT_ONE = ImmutableMap.of( YqlQuery.SIMPLE, "" + "declare $key as Int32;\n" + @@ -186,6 +208,13 @@ public String upsertOne(JdbcQuery query, String column, String type) { .replaceAll("#tableName", tableName); } + public String insertOne(JdbcQuery query, String column, String type) { + return JDBC_INSERT_ONE.get(query) + .replaceAll("#column", column) + .replaceAll("#type", type) + .replaceAll("#tableName", tableName); + } + public String upsertOne(YqlQuery query, String column, String type) { return YQL_UPSERT_ONE.get(query) .replaceAll("#column", column) diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/TestTxTracer.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/TestTxTracer.java index 35f4ab4..0a2d533 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/TestTxTracer.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/TestTxTracer.java @@ -12,21 +12,28 @@ */ public class TestTxTracer extends YdbTracerNone { private volatile String lastQuery; + private volatile boolean wasCommit = false; private final AtomicInteger counter = new AtomicInteger(0); @Override public void query(String queryText) { - counter.incrementAndGet(); - lastQuery = queryText; + if (queryText != null) { + counter.incrementAndGet(); + lastQuery = queryText; + } else { + wasCommit = true; + } } public String getQueryText() { return lastQuery; } - public void assertQueriesCount(int count) { + public void assertQueriesCount(int count, boolean withCommit) { Assertions.assertEquals(count, counter.get(), "Incorrect count of queries"); + Assertions.assertEquals(withCommit, wasCommit, "Incorrect transaction status"); counter.set(0); + wasCommit = false; } public void assertLastQueryContains(String query) {