From 6cc21d03b7eea529d998012f21a08c58281b85d6 Mon Sep 17 00:00:00 2001 From: Bardin Petr Date: Thu, 13 Nov 2025 17:05:31 +0300 Subject: [PATCH 1/3] feature: query execution interceptor --- .../main/java/tech/ydb/jdbc/YdbStatement.java | 5 + .../jdbc/context/QueryServiceExecutor.java | 7 +- .../java/tech/ydb/jdbc/context/QueryStat.java | 2 +- .../jdbc/context/TableServiceExecutor.java | 6 +- .../tech/ydb/jdbc/context/YdbContext.java | 16 ++ .../ydb/jdbc/impl/YdbQueryResultStatic.java | 18 ++ .../tech/ydb/jdbc/impl/YdbStatementBase.java | 28 ++- .../ydb/jdbc/query/UnifiedQueryStats.java | 230 ++++++++++++++++++ .../jdbc/spi/YDBQueryExtensionService.java | 55 +++++ 9 files changed, 363 insertions(+), 4 deletions(-) create mode 100644 jdbc/src/main/java/tech/ydb/jdbc/query/UnifiedQueryStats.java create mode 100644 jdbc/src/main/java/tech/ydb/jdbc/spi/YDBQueryExtensionService.java diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java index ed130559..4d6e73a2 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java @@ -4,6 +4,7 @@ import java.sql.Statement; import tech.ydb.jdbc.context.YdbValidator; +import tech.ydb.table.query.stats.QueryStatsCollectionMode; public interface YdbStatement extends Statement { /** @@ -58,4 +59,8 @@ public interface YdbStatement extends Statement { @Override int getMaxRows(); + + void setStatsCollectionMode(QueryStatsCollectionMode mode); + + QueryStatsCollectionMode getStatsCollectionMode(); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index b113f4c2..31d089b3 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -32,6 +32,7 @@ import tech.ydb.query.settings.CommitTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.QueryStatsMode; import tech.ydb.query.settings.RollbackTransactionSettings; import tech.ydb.query.tools.QueryReader; import tech.ydb.table.query.Params; @@ -233,6 +234,7 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query int timeout = statement.getQueryTimeout(); ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); + settings = settings.withStatsMode(QueryStatsMode.valueOf(statement.getStatsCollectionMode().name())); if (timeout > 0) { settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS); } @@ -300,7 +302,10 @@ public void onClose(Status status, Throwable th) { for (int idx = 0; idx < readers.length; idx++) { readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); } - return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + + YdbQueryResultStatic queryResult = new YdbQueryResultStatic(query, readers); + queryResult.setQueryStats(result.getQueryInfo().getStats()); + return updateCurrentResult(queryResult); } finally { if (!localTx.isActive()) { if (tx.compareAndSet(localTx, null)) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java index ccdc15eb..8736a539 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java @@ -66,7 +66,7 @@ public String getPreparedYQL() { return preparedYQL; } - public String getAat() { + public String getAst() { return ast; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index 28fe73af..e90ee9b7 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -173,6 +173,8 @@ private ExecuteDataQuerySettings dataQuerySettings(YdbStatement statement) { settings = settings.disableQueryCache(); } + settings = settings.setCollectStats(statement.getStatsCollectionMode()); + return settings; } @@ -231,7 +233,9 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query readers[idx] = new YdbResultSetMemory(types, statement, rs); } - return updateCurrentResult(new YdbQueryResultStatic(query, readers)); + YdbQueryResultStatic queryResult = new YdbQueryResultStatic(query, readers); + queryResult.setQueryStats(result.getQueryStats()); + return updateCurrentResult(queryResult); } catch (SQLException | RuntimeException ex) { updateState(tx.withRollback(session)); throw ex; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 7fe48859..764830b1 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -3,6 +3,8 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collection; +import java.util.Iterator; +import java.util.ServiceLoader; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -23,6 +25,7 @@ import tech.ydb.jdbc.settings.YdbConnectionProperties; import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.jdbc.settings.YdbQueryProperties; +import tech.ydb.jdbc.spi.YDBQueryExtensionService; import tech.ydb.query.QueryClient; import tech.ydb.query.impl.QueryClientImpl; import tech.ydb.scheme.SchemeClient; @@ -58,6 +61,8 @@ public class YdbContext implements AutoCloseable { private final boolean autoResizeSessionPool; private final AtomicInteger connectionsCount = new AtomicInteger(); + private YDBQueryExtensionService queryExtensionService = null; + private YdbContext( YdbConfig config, YdbOperationProperties operationProperties, @@ -96,6 +101,13 @@ private YdbContext( this.cache = new YdbCache(this, queryProperties, config.getPreparedStatementsCachecSize(), config.isFullScanDetectorEnabled()); } + + Iterator extLoaderIterator = ServiceLoader + .load(YDBQueryExtensionService.class) + .iterator(); + if (extLoaderIterator.hasNext()) { + queryExtensionService = extLoaderIterator.next(); + } } public YdbTypes getTypes() { @@ -312,4 +324,8 @@ public YdbQuery parseYdbQuery(QueryKey key) throws SQLException { public YdbPreparedQuery prepareYdbQuery(YdbQuery query, YdbPrepareMode mode) throws SQLException { return cache.prepareYdbQuery(query, mode); } + + public YDBQueryExtensionService getQueryExtensionService() { + return queryExtensionService; + } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java index f95a7f8e..20cadf0f 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java @@ -4,6 +4,7 @@ import java.sql.SQLException; import tech.ydb.jdbc.YdbResultSet; +import tech.ydb.jdbc.query.UnifiedQueryStats; import tech.ydb.jdbc.query.YdbQuery; /** @@ -12,6 +13,7 @@ */ public class YdbQueryResultStatic extends YdbQueryResultBase { private final YdbResultSet[] rs; + private UnifiedQueryStats queryStats; public YdbQueryResultStatic(YdbQuery query, YdbResultSet... rs) { super(query, rs != null ? rs.length : 0); @@ -33,4 +35,20 @@ protected void closeResultSet(int index) throws SQLException { } rs[index].close(); } + + public UnifiedQueryStats getQueryStats() { + return queryStats; + } + + public void setQueryStats(tech.ydb.table.query.stats.QueryStats src) { + if (src != null) { + queryStats = new UnifiedQueryStats(src); + } + } + + public void setQueryStats(tech.ydb.query.result.QueryStats src) { + if (src != null) { + queryStats = new UnifiedQueryStats(src); + } + } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java index af961a08..79650487 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java @@ -19,7 +19,9 @@ import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.jdbc.settings.FakeTxMode; import tech.ydb.jdbc.settings.YdbOperationProperties; +import tech.ydb.jdbc.spi.YDBQueryExtensionService; import tech.ydb.table.query.Params; +import tech.ydb.table.query.stats.QueryStatsCollectionMode; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.values.ListValue; @@ -41,6 +43,7 @@ public abstract class YdbStatementBase implements YdbStatement { private int queryTimeout; private boolean isPoolable; private boolean isClosed = false; + private QueryStatsCollectionMode statsMode = QueryStatsCollectionMode.NONE; /** @see Statement#getMaxRows() */ private int maxRows = 0; // no limit @@ -203,7 +206,20 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par } ctx.traceQueryByFullScanDetector(query, yql); - return connection.getExecutor().executeDataQuery(this, query, yql, params); + + YDBQueryExtensionService ext = ctx.getQueryExtensionService(); + if (ext != null) { + ext.dataQueryPreExecute(ctx, this, query, yql, params); + } + + YdbQueryResult result; + result = connection.getExecutor().executeDataQuery(this, query, yql, params); + + if (ext != null) { + ext.dataQueryPostExecute(ctx, this, query, yql, params, result); + } + + return result; } protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException { @@ -319,4 +335,14 @@ public int getFetchSize() { public int getResultSetConcurrency() { return ResultSet.CONCUR_READ_ONLY; } + + @Override + public QueryStatsCollectionMode getStatsCollectionMode() { + return statsMode; + } + + @Override + public void setStatsCollectionMode(QueryStatsCollectionMode mode) { + this.statsMode = mode == null ? QueryStatsCollectionMode.NONE : mode; + } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/UnifiedQueryStats.java b/jdbc/src/main/java/tech/ydb/jdbc/query/UnifiedQueryStats.java new file mode 100644 index 00000000..780e4acd --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/UnifiedQueryStats.java @@ -0,0 +1,230 @@ +package tech.ydb.jdbc.query; + +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * Data class to wrap QueryStats of Table and Query services + */ +public class UnifiedQueryStats { + private final String queryPlan; + private final String queryAst; + private final long totalDurationUs; + private final long totalCpuTimeUs; + private final long processCpuTimeUs; + private final long compilationTimeUs; + private final long compilationCpuTimeUs; + private final boolean compilationIsCached; + private final List queryPhases; + + public UnifiedQueryStats(tech.ydb.query.result.QueryStats src) { + this.queryPlan = src.getQueryPlan(); + this.queryAst = src.getQueryAst(); + this.totalDurationUs = src.getTotalDurationUs(); + this.totalCpuTimeUs = src.getTotalCpuTimeUs(); + this.processCpuTimeUs = src.getProcessCpuTimeUs(); + this.compilationTimeUs = src.getCompilationStats().getDurationUs(); + this.compilationCpuTimeUs = src.getCompilationStats().getCpuTimeUs(); + this.compilationIsCached = src.getCompilationStats().isFromCache(); + this.queryPhases = src.getPhases().stream().map(PhaseStats::new).collect(toList()); + } + + public UnifiedQueryStats(tech.ydb.table.query.stats.QueryStats src) { + this.queryPlan = src.getQueryPlan(); + this.queryAst = src.getQueryAst(); + this.totalDurationUs = src.getTotalDurationUs(); + this.totalCpuTimeUs = src.getTotalCpuTimeUs(); + this.processCpuTimeUs = src.getProcessCpuTimeUs(); + this.compilationTimeUs = src.getCompilation().getDurationUs(); + this.compilationCpuTimeUs = src.getCompilation().getCpuTimeUs(); + this.compilationIsCached = src.getCompilation().getFromCache(); + this.queryPhases = src.getQueryPhasesList().stream().map(PhaseStats::new).collect(toList()); + } + + public String getQueryPlan() { + return queryPlan; + } + + public String getQueryAst() { + return queryAst; + } + + public long getTotalDurationUs() { + return totalDurationUs; + } + + public long getTotalCpuTimeUs() { + return totalCpuTimeUs; + } + + public long getProcessCpuTimeUs() { + return processCpuTimeUs; + } + + public long getCompilationTimeUs() { + return compilationTimeUs; + } + + public long getCompilationCpuTimeUs() { + return compilationCpuTimeUs; + } + + public boolean isCompilationIsCached() { + return compilationIsCached; + } + + public List getQueryPhases() { + return queryPhases; + } + + @Override + public String toString() { + return "UnifiedQueryStats{" + + "queryPlan='" + queryPlan + '\'' + + ", queryAst='" + queryAst + '\'' + + ", totalDurationUs=" + totalDurationUs + + ", totalCpuTimeUs=" + totalCpuTimeUs + + ", processCpuTimeUs=" + processCpuTimeUs + + ", compilationTimeUs=" + compilationTimeUs + + ", compilationCpuTimeUs=" + compilationCpuTimeUs + + ", compilationIsCached=" + compilationIsCached + + ", queryPhases=" + queryPhases + + '}'; + } + + public static class PhaseStats { + private final long durationUs; + private final long cpuTimeUs; + private final long affectedShards; + private final boolean isLiteralPhase; + private final List tableAccesses; + + public PhaseStats(tech.ydb.query.result.QueryStats.QueryPhase src) { + this.durationUs = src.getDurationUs(); + this.cpuTimeUs = src.getCpuTimeUs(); + this.affectedShards = src.getAffectedShards(); + this.isLiteralPhase = src.isLiteralPhase(); + this.tableAccesses = src.getTableAccesses().stream().map(TableAccess::new).collect(toList()); + } + + public PhaseStats(tech.ydb.table.query.stats.QueryPhaseStats src) { + this.durationUs = src.getDurationUs(); + this.cpuTimeUs = src.getCpuTimeUs(); + this.affectedShards = src.getAffectedShards(); + this.isLiteralPhase = src.getLiteralPhase(); + this.tableAccesses = src.getTableAccessList().stream().map(TableAccess::new).collect(toList()); + } + + public long getDurationUs() { + return durationUs; + } + + public long getCpuTimeUs() { + return cpuTimeUs; + } + + public long getAffectedShards() { + return affectedShards; + } + + public boolean isLiteralPhase() { + return isLiteralPhase; + } + + public List getTableAccesses() { + return tableAccesses; + } + + @Override + public String toString() { + return "PhaseStats{" + + "durationUs=" + durationUs + + ", cpuTimeUs=" + cpuTimeUs + + ", affectedShards=" + affectedShards + + ", isLiteralPhase=" + isLiteralPhase + + ", tableAccesses=" + tableAccesses + + '}'; + } + + public static class TableAccess { + private final String name; + private final long partitionsCount; + private final TableOperation reads; + private final TableOperation updates; + private final TableOperation deletes; + + public TableAccess(tech.ydb.query.result.QueryStats.TableAccess src) { + this.name = src.getTableName(); + this.partitionsCount = src.getPartitionsCount(); + this.reads = new TableOperation(src.getReads().getRows(), src.getReads().getBytes()); + this.updates = new TableOperation(src.getUpdates().getRows(), src.getUpdates().getBytes()); + this.deletes = new TableOperation(src.getDeletes().getRows(), src.getDeletes().getBytes()); + } + + public TableAccess(tech.ydb.table.query.stats.TableAccessStats src) { + this.name = src.getName(); + this.partitionsCount = src.getPartitionsCount(); + this.reads = new TableOperation(src.getReads().getRows(), src.getReads().getBytes()); + this.updates = new TableOperation(src.getUpdates().getRows(), src.getUpdates().getBytes()); + this.deletes = new TableOperation(src.getDeletes().getRows(), src.getDeletes().getBytes()); + } + + public String getName() { + return name; + } + + public long getPartitionsCount() { + return partitionsCount; + } + + public TableOperation getReads() { + return reads; + } + + public TableOperation getUpdates() { + return updates; + } + + public TableOperation getDeletes() { + return deletes; + } + + @Override + public String toString() { + return "TableAccess{" + + "name='" + name + '\'' + + ", partitionsCount=" + partitionsCount + + ", reads=" + reads + + ", updates=" + updates + + ", deletes=" + deletes + + '}'; + } + + public static class TableOperation { + private final long rows; + private final long bytes; + + public TableOperation(long rows, long bytes) { + this.rows = rows; + this.bytes = bytes; + } + + public long getRows() { + return rows; + } + + public long getBytes() { + return bytes; + } + + @Override + public String toString() { + return " data query"); tracer.query(yql); @@ -266,8 +271,19 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); } -// queryResult.setQueryStats(result.getQueryInfo().getStats()); + if (result.getQueryInfo().hasStats()) { + spi.onQueryStats(result.getQueryInfo().getStats()); + } + + spi.onQueryResult(Status.SUCCESS, null); return readers; + } catch (SQLException | RuntimeException ex) { + if (ex instanceof YdbStatusable) { + spi.onQueryResult(((YdbStatusable) ex).getStatus(), null); + } else { + spi.onQueryResult(null, ex); + } + throw ex; } finally { if (!localTx.isActive()) { if (tx.compareAndSet(localTx, null)) { @@ -289,16 +305,20 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S ensureOpened(); if (!useStreamResultSet) { - YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params); + YdbResultSetMemory[] readers = executeInMemoryQuery(statement, query, preparedYql, params); return updateCurrentResult(new YdbQueryResultStatic(query, readers)); } YdbValidator validator = statement.getValidator(); + String yql = prefixPragma + preparedYql; + YdbQueryExtentionService.QueryCall spi = querySpi.newDataQuery(statement, query, yql); + int timeout = statement.getQueryTimeout(); ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder(); if (timeout > 0) { settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS); } + settings = spi.prepareQuerySettings(settings); QueryTransaction nextTx = tx.get(); while (nextTx == null) { @@ -309,11 +329,8 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S } } - final QueryTransaction localTx = nextTx; + QueryTransaction localTx = nextTx; YdbTracer tracer = statement.getConnection().getCtx().getTracer(); - - String yql = prefixPragma + preparedYql; - tracer.trace("--> stream query"); tracer.query(yql); String msg = "STREAM_QUERY >>\n" + yql; @@ -321,6 +338,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) { @Override public void onClose(Status status, Throwable th) { + spi.onQueryResult(status, th); if (th != null) { tracer.trace("<-- " + th.getMessage()); } @@ -344,7 +362,7 @@ public void onClose(Status status, Throwable th) { settings = settings.withGrpcFlowControl(reader); QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build()); - validator.execute(msg, tracer, () -> reader.load(validator, stream)); + validator.execute(msg, tracer, () -> reader.load(validator, stream, spi::onQueryStats)); return updateCurrentResult(reader); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index ab891376..e4a568ad 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -5,16 +5,20 @@ import java.sql.SQLFeatureNotSupportedException; import java.time.Duration; +import tech.ydb.core.Status; import tech.ydb.jdbc.YdbConst; import tech.ydb.jdbc.YdbQueryResult; import tech.ydb.jdbc.YdbStatement; import tech.ydb.jdbc.YdbTracer; +import tech.ydb.jdbc.exception.YdbStatusable; import tech.ydb.jdbc.impl.YdbQueryResultExplain; import tech.ydb.jdbc.impl.YdbQueryResultStatic; import tech.ydb.jdbc.impl.YdbResultSetMemory; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.jdbc.settings.YdbOperationProperties; +import tech.ydb.jdbc.spi.YdbQueryExtentionService; +import tech.ydb.query.result.QueryStats; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; @@ -33,6 +37,7 @@ */ public class TableServiceExecutor extends BaseYdbExecutor { private final boolean failOnTruncatedResult; + private final YdbQueryExtentionService querySpi; private volatile TxState tx; public TableServiceExecutor(YdbContext ctx) throws SQLException { @@ -40,6 +45,7 @@ public TableServiceExecutor(YdbContext ctx) throws SQLException { YdbOperationProperties options = ctx.getOperationProperties(); this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit()); this.failOnTruncatedResult = options.isFailOnTruncatedResult(); + this.querySpi = ctx.getQuerySpi(); } @Override @@ -172,8 +178,6 @@ private ExecuteDataQuerySettings dataQuerySettings(YdbStatement statement) { settings = settings.disableQueryCache(); } - settings = settings.setCollectStats(statement.getStatsCollectionMode()); - return settings; } @@ -202,8 +206,18 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query } } - private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql, - ExecuteDataQuerySettings settings, Params prms) throws SQLException { + @Override + public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, YdbQuery query, String preparedYql, + Params params) throws SQLException { + ensureOpened(); + + YdbValidator validator = statement.getValidator(); + String yql = prefixPragma + preparedYql; + YdbQueryExtentionService.QueryCall spi = querySpi.newDataQuery(statement, query, yql); + + YdbTracer tracer = statement.getConnection().getCtx().getTracer(); + ExecuteDataQuerySettings settings = spi.prepareDataQuerySettings(dataQuerySettings(statement)); + Session session = tx.getSession(validator); try { tracer.trace("--> data query"); @@ -212,10 +226,14 @@ private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer trac DataQueryResult result = validator.call( QueryType.DATA_QUERY + " >>\n" + yql, tracer, - () -> session.executeDataQuery(yql, tx.txControl(), prms, settings) + () -> session.executeDataQuery(yql, tx.txControl(), params, settings) ); updateState(tx.withDataQuery(session, result.getTxId())); + if (result.hasQueryStats()) { + spi.onQueryStats(new QueryStats(result.getRawQueryStats())); + } + if (failOnTruncatedResult) { for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { ResultSetReader rs = result.getResultSet(idx); @@ -226,30 +244,22 @@ private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer trac } } - return result; - } catch (SQLException | RuntimeException ex) { - updateState(tx.withRollback(session)); - throw ex; - } - } - - @Override - public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params) - throws SQLException { - ensureOpened(); - - YdbValidator validator = statement.getValidator(); - String yql = prefixPragma + preparedYql; - YdbTracer tracer = statement.getConnection().getCtx().getTracer(); - - try { - DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params); YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()]; for (int idx = 0; idx < result.getResultSetCount(); idx += 1) { readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx)); } -// queryResult.setQueryStats(result.getQueryStats()); + + spi.onQueryResult(Status.SUCCESS, null); return readers; + } catch (SQLException | RuntimeException ex) { + if (ex instanceof YdbStatusable) { + spi.onQueryResult(((YdbStatusable) ex).getStatus(), null); + } else { + spi.onQueryResult(null, ex); + } + + updateState(tx.withRollback(session)); + throw ex; } finally { if (tx.isInsideTransaction()) { tracer.setId(tx.txID()); @@ -262,7 +272,7 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String @Override public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params) throws SQLException { - YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params); + YdbResultSetMemory[] readers = executeInMemoryQuery(statement, query, preparedYql, params); return updateCurrentResult(new YdbQueryResultStatic(query, readers)); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 2712088b..f2b1b1f0 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -3,8 +3,6 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Collection; -import java.util.Iterator; -import java.util.ServiceLoader; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -26,7 +24,7 @@ import tech.ydb.jdbc.settings.YdbConnectionProperties; import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.jdbc.settings.YdbQueryProperties; -import tech.ydb.jdbc.spi.YDBQueryExtensionService; +import tech.ydb.jdbc.spi.YdbQueryExtentionService; import tech.ydb.query.QueryClient; import tech.ydb.query.impl.QueryClientImpl; import tech.ydb.scheme.SchemeClient; @@ -62,7 +60,7 @@ public class YdbContext implements AutoCloseable { private final boolean autoResizeSessionPool; private final AtomicInteger connectionsCount = new AtomicInteger(); - private YDBQueryExtensionService queryExtensionService = null; + private final YdbQueryExtentionService querySpi; private YdbContext( YdbConfig config, @@ -103,12 +101,7 @@ private YdbContext( queryProperties, config.getPreparedStatementsCachecSize(), config.isFullScanDetectorEnabled()); } - Iterator extLoaderIterator = ServiceLoader - .load(YDBQueryExtensionService.class) - .iterator(); - if (extLoaderIterator.hasNext()) { - queryExtensionService = extLoaderIterator.next(); - } + this.querySpi = YdbServiceLoader.loadQuerySpi(); } public YdbTypes getTypes() { @@ -127,6 +120,10 @@ public YdbTracer getTracer() { return config.isTxTracedEnabled() ? YdbTracer.current() : YdbTracerNone.DISABLED; } + public YdbQueryExtentionService getQuerySpi() { + return querySpi; + } + static String joined(String path1, String path2) { return path1.endsWith("/") || path2.startsWith("/") ? path1 + path2 : path1 + "/" + path2; } @@ -325,8 +322,4 @@ public YdbQuery parseYdbQuery(QueryKey key) throws SQLException { public YdbPreparedQuery prepareYdbQuery(YdbQuery query, YdbPrepareMode mode) throws SQLException { return cache.prepareYdbQuery(query, mode); } - - public YDBQueryExtensionService getQueryExtensionService() { - return queryExtensionService; - } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java index 8532c91e..7815f453 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java @@ -35,7 +35,8 @@ public interface YdbExecutor { YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException; YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException; - YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException; + YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, YdbQuery query, String yql, Params prms) + throws SQLException; void commit(YdbContext ctx, YdbValidator validator) throws SQLException; void rollback(YdbContext ctx, YdbValidator validator) throws SQLException; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbServiceLoader.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbServiceLoader.java new file mode 100644 index 00000000..9f456217 --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbServiceLoader.java @@ -0,0 +1,88 @@ +package tech.ydb.jdbc.context; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +import tech.ydb.core.Status; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.spi.YdbQueryExtentionService; +import tech.ydb.query.result.QueryStats; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbServiceLoader { + private static final YdbQueryExtentionService.QueryCall DUMP = new YdbQueryExtentionService.QueryCall() { }; + private static final YdbQueryExtentionService EMPTY = (statement, query, yql) -> DUMP; + + private YdbServiceLoader() { } + + public static YdbQueryExtentionService loadQuerySpi() { + List spis = new ArrayList<>(); + ServiceLoader.load(YdbQueryExtentionService.class).forEach(spis::add); + + if (spis.isEmpty()) { + return EMPTY; + } + + if (spis.size() == 1) { + return spis.get(0); + } + + return new ProxySpi(spis); + } + + private static class ProxySpi implements YdbQueryExtentionService { + private final List spis; + + ProxySpi(List spis) { + this.spis = spis; + } + + @Override + public QueryCall newDataQuery(YdbStatement statement, YdbQuery query, String yql) { + List proxed = spis.stream().map(spi -> newDataQuery(statement, query, yql)) + .collect(Collectors.toList()); + + return new QueryCall() { + @Override + public ExecuteQuerySettings.Builder prepareQuerySettings(ExecuteQuerySettings.Builder builder) { + ExecuteQuerySettings.Builder local = builder; + for (QueryCall proxy: proxed) { + local = proxy.prepareQuerySettings(local); + } + return local; + } + + @Override + public ExecuteDataQuerySettings prepareDataQuerySettings(ExecuteDataQuerySettings settings) { + ExecuteDataQuerySettings local = settings; + for (QueryCall proxy: proxed) { + local = proxy.prepareDataQuerySettings(local); + } + return local; + } + + @Override + public void onQueryResult(Status status, Throwable th) { + for (QueryCall proxy: proxed) { + proxy.onQueryResult(status, th); + } + } + + @Override + public void onQueryStats(QueryStats stats) { + for (QueryCall proxy: proxed) { + proxy.onQueryStats(stats); + } + } + }; + } + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java index cfd68ad0..2ee5dd73 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultReader.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -28,6 +29,7 @@ import tech.ydb.jdbc.query.YdbQuery; import tech.ydb.query.QueryStream; import tech.ydb.query.result.QueryResultPart; +import tech.ydb.query.result.QueryStats; import tech.ydb.table.result.ResultSetReader; /** @@ -180,7 +182,7 @@ public CompletableFuture load(GrpcReadStream stream) { } - public CompletableFuture load(YdbValidator validator, QueryStream stream) { + public CompletableFuture load(YdbValidator validator, QueryStream stream, Consumer stats) { CompletableFuture resultIsReady = new CompletableFuture<>(); canceller = stream::cancel; @@ -197,7 +199,13 @@ public void onNextPart(QueryResultPart part) { } } }).whenComplete((result, th) -> { - Status status = result != null ? result.getStatus() : null; + Status status = null; + if (result != null) { + status = result.getStatus(); + if (result.isSuccess() && result.getValue().hasStats()) { + stats.accept(result.getValue().getStats()); + } + } onClose(status, th); if (status != null) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java index 20cadf0f..f95a7f8e 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbQueryResultStatic.java @@ -4,7 +4,6 @@ import java.sql.SQLException; import tech.ydb.jdbc.YdbResultSet; -import tech.ydb.jdbc.query.UnifiedQueryStats; import tech.ydb.jdbc.query.YdbQuery; /** @@ -13,7 +12,6 @@ */ public class YdbQueryResultStatic extends YdbQueryResultBase { private final YdbResultSet[] rs; - private UnifiedQueryStats queryStats; public YdbQueryResultStatic(YdbQuery query, YdbResultSet... rs) { super(query, rs != null ? rs.length : 0); @@ -35,20 +33,4 @@ protected void closeResultSet(int index) throws SQLException { } rs[index].close(); } - - public UnifiedQueryStats getQueryStats() { - return queryStats; - } - - public void setQueryStats(tech.ydb.table.query.stats.QueryStats src) { - if (src != null) { - queryStats = new UnifiedQueryStats(src); - } - } - - public void setQueryStats(tech.ydb.query.result.QueryStats src) { - if (src != null) { - queryStats = new UnifiedQueryStats(src); - } - } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java index 90103b3c..02250f64 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementBase.java @@ -29,7 +29,6 @@ import tech.ydb.jdbc.settings.FakeTxMode; import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.table.query.Params; -import tech.ydb.table.query.stats.QueryStatsCollectionMode; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.values.ListValue; @@ -51,7 +50,6 @@ public abstract class YdbStatementBase implements YdbStatement { private int queryTimeout; private boolean isPoolable; private boolean isClosed = false; - private QueryStatsCollectionMode statsMode = QueryStatsCollectionMode.NONE; /** @see Statement#getMaxRows() */ private int maxRows = 0; // no limit @@ -216,21 +214,6 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par } ctx.traceQueryByFullScanDetector(query, yql); -//<<<<<<< HEAD -// YDBQueryExtensionService ext = ctx.getQueryExtensionService(); -// if (ext != null) { -// ext.dataQueryPreExecute(ctx, this, query, yql, params); -// } -// -// YdbQueryResult result; -// result = connection.getExecutor().executeDataQuery(this, query, yql, params); -// -// if (ext != null) { -// ext.dataQueryPostExecute(ctx, this, query, yql, params, result); -// } -// -// return result; -//======= boolean isInsideTx = executor.isInsideTransaction(); while (true) { try { @@ -265,7 +248,7 @@ protected YdbQueryResult executeBatchQuery(YdbQuery query, Function queryPhases; - - public UnifiedQueryStats(tech.ydb.query.result.QueryStats src) { - this.queryPlan = src.getQueryPlan(); - this.queryAst = src.getQueryAst(); - this.totalDurationUs = src.getTotalDurationUs(); - this.totalCpuTimeUs = src.getTotalCpuTimeUs(); - this.processCpuTimeUs = src.getProcessCpuTimeUs(); - this.compilationTimeUs = src.getCompilationStats().getDurationUs(); - this.compilationCpuTimeUs = src.getCompilationStats().getCpuTimeUs(); - this.compilationIsCached = src.getCompilationStats().isFromCache(); - this.queryPhases = src.getPhases().stream().map(PhaseStats::new).collect(toList()); - } - - public UnifiedQueryStats(tech.ydb.table.query.stats.QueryStats src) { - this.queryPlan = src.getQueryPlan(); - this.queryAst = src.getQueryAst(); - this.totalDurationUs = src.getTotalDurationUs(); - this.totalCpuTimeUs = src.getTotalCpuTimeUs(); - this.processCpuTimeUs = src.getProcessCpuTimeUs(); - this.compilationTimeUs = src.getCompilation().getDurationUs(); - this.compilationCpuTimeUs = src.getCompilation().getCpuTimeUs(); - this.compilationIsCached = src.getCompilation().getFromCache(); - this.queryPhases = src.getQueryPhasesList().stream().map(PhaseStats::new).collect(toList()); - } - - public String getQueryPlan() { - return queryPlan; - } - - public String getQueryAst() { - return queryAst; - } - - public long getTotalDurationUs() { - return totalDurationUs; - } - - public long getTotalCpuTimeUs() { - return totalCpuTimeUs; - } - - public long getProcessCpuTimeUs() { - return processCpuTimeUs; - } - - public long getCompilationTimeUs() { - return compilationTimeUs; - } - - public long getCompilationCpuTimeUs() { - return compilationCpuTimeUs; - } - - public boolean isCompilationIsCached() { - return compilationIsCached; - } - - public List getQueryPhases() { - return queryPhases; - } - - @Override - public String toString() { - return "UnifiedQueryStats{" + - "queryPlan='" + queryPlan + '\'' + - ", queryAst='" + queryAst + '\'' + - ", totalDurationUs=" + totalDurationUs + - ", totalCpuTimeUs=" + totalCpuTimeUs + - ", processCpuTimeUs=" + processCpuTimeUs + - ", compilationTimeUs=" + compilationTimeUs + - ", compilationCpuTimeUs=" + compilationCpuTimeUs + - ", compilationIsCached=" + compilationIsCached + - ", queryPhases=" + queryPhases + - '}'; - } - - public static class PhaseStats { - private final long durationUs; - private final long cpuTimeUs; - private final long affectedShards; - private final boolean isLiteralPhase; - private final List tableAccesses; - - public PhaseStats(tech.ydb.query.result.QueryStats.QueryPhase src) { - this.durationUs = src.getDurationUs(); - this.cpuTimeUs = src.getCpuTimeUs(); - this.affectedShards = src.getAffectedShards(); - this.isLiteralPhase = src.isLiteralPhase(); - this.tableAccesses = src.getTableAccesses().stream().map(TableAccess::new).collect(toList()); - } - - public PhaseStats(tech.ydb.table.query.stats.QueryPhaseStats src) { - this.durationUs = src.getDurationUs(); - this.cpuTimeUs = src.getCpuTimeUs(); - this.affectedShards = src.getAffectedShards(); - this.isLiteralPhase = src.getLiteralPhase(); - this.tableAccesses = src.getTableAccessList().stream().map(TableAccess::new).collect(toList()); - } - - public long getDurationUs() { - return durationUs; - } - - public long getCpuTimeUs() { - return cpuTimeUs; - } - - public long getAffectedShards() { - return affectedShards; - } - - public boolean isLiteralPhase() { - return isLiteralPhase; - } - - public List getTableAccesses() { - return tableAccesses; - } - - @Override - public String toString() { - return "PhaseStats{" + - "durationUs=" + durationUs + - ", cpuTimeUs=" + cpuTimeUs + - ", affectedShards=" + affectedShards + - ", isLiteralPhase=" + isLiteralPhase + - ", tableAccesses=" + tableAccesses + - '}'; - } - - public static class TableAccess { - private final String name; - private final long partitionsCount; - private final TableOperation reads; - private final TableOperation updates; - private final TableOperation deletes; - - public TableAccess(tech.ydb.query.result.QueryStats.TableAccess src) { - this.name = src.getTableName(); - this.partitionsCount = src.getPartitionsCount(); - this.reads = new TableOperation(src.getReads().getRows(), src.getReads().getBytes()); - this.updates = new TableOperation(src.getUpdates().getRows(), src.getUpdates().getBytes()); - this.deletes = new TableOperation(src.getDeletes().getRows(), src.getDeletes().getBytes()); - } - - public TableAccess(tech.ydb.table.query.stats.TableAccessStats src) { - this.name = src.getName(); - this.partitionsCount = src.getPartitionsCount(); - this.reads = new TableOperation(src.getReads().getRows(), src.getReads().getBytes()); - this.updates = new TableOperation(src.getUpdates().getRows(), src.getUpdates().getBytes()); - this.deletes = new TableOperation(src.getDeletes().getRows(), src.getDeletes().getBytes()); - } - - public String getName() { - return name; - } - - public long getPartitionsCount() { - return partitionsCount; - } - - public TableOperation getReads() { - return reads; - } - - public TableOperation getUpdates() { - return updates; - } - - public TableOperation getDeletes() { - return deletes; - } - - @Override - public String toString() { - return "TableAccess{" + - "name='" + name + '\'' + - ", partitionsCount=" + partitionsCount + - ", reads=" + reads + - ", updates=" + updates + - ", deletes=" + deletes + - '}'; - } - - public static class TableOperation { - private final long rows; - private final long bytes; - - public TableOperation(long rows, long bytes) { - this.rows = rows; - this.bytes = bytes; - } - - public long getRows() { - return rows; - } - - public long getBytes() { - return bytes; - } - - @Override - public String toString() { - return "() { + boolean hasNext = true; + + @Override + public boolean hasMoreElements() { + return hasNext; + } + + @Override + public URL nextElement() { + hasNext = false; + return url; + } + }; + } + return super.getResources(name); + } +} \ No newline at end of file diff --git a/jdbc/src/test/java/tech/ydb/jdbc/context/YdbDriverQuerySpiTest.java b/jdbc/src/test/java/tech/ydb/jdbc/context/YdbDriverQuerySpiTest.java new file mode 100644 index 00000000..36befaad --- /dev/null +++ b/jdbc/src/test/java/tech/ydb/jdbc/context/YdbDriverQuerySpiTest.java @@ -0,0 +1,260 @@ +package tech.ydb.jdbc.context; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.impl.helper.ExceptionAssert; +import tech.ydb.jdbc.impl.helper.JdbcUrlHelper; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.spi.YdbQueryExtentionService; +import tech.ydb.query.result.QueryStats; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.QueryStatsMode; +import tech.ydb.table.query.stats.QueryStatsCollectionMode; +import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.test.junit5.YdbHelperExtension; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbDriverQuerySpiTest { + + @RegisterExtension + private static final YdbHelperExtension ydb = new YdbHelperExtension(); + + private static final JdbcUrlHelper jdbcURL = new JdbcUrlHelper(ydb) + .withArg("cacheConnectionsInDriver", "false"); + + @BeforeEach + public void clean() { + EmptiSpi.COUNT.set(0); + FullStatsSpi.QUEUE.clear(); + } + + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void defaultUsageTest(String useQS) throws SQLException { + ClassLoader prev = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(new QuerySpiTestLoader(prev, EmptiSpi.class)); + + try (Connection conn = DriverManager.getConnection(jdbcURL.withArg("useQueryService", useQS).build())) { + Assertions.assertEquals(0, EmptiSpi.COUNT.get()); + + try (ResultSet rs = conn.createStatement().executeQuery("SELECT 1 + 2")) { + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.next()); + } + + Assertions.assertEquals(1, EmptiSpi.COUNT.get()); + + try (PreparedStatement ps = conn.prepareStatement("SELECT ? + ?")) { + ps.setInt(1, 1); + ps.setInt(2, 2); + Assertions.assertTrue(ps.execute()); + + Assertions.assertEquals(2, EmptiSpi.COUNT.get()); + + ps.setInt(1, 2); + ps.setInt(2, 3); + ps.addBatch(); + ps.setLong(1, 2); + ps.setLong(2, 3); + ps.addBatch(); + + Assertions.assertEquals(2, ps.executeBatch().length); + Assertions.assertEquals(4, EmptiSpi.COUNT.get()); + } + + try (Statement st = conn.createStatement()) { + ExceptionAssert.ydbException("code = GENERIC_ERROR", () -> st.executeQuery("SELECT 1 + 'test'u")); + Assertions.assertEquals(5, EmptiSpi.COUNT.get()); + } + } finally { + Thread.currentThread().setContextClassLoader(prev); + } + } + + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void enableStatsModeTest(String useQS) throws SQLException { + ClassLoader prev = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(new QuerySpiTestLoader(prev, FullStatsSpi.class)); + + try (Connection conn = DriverManager.getConnection(jdbcURL.withArg("useQueryService", useQS).build())) { + Assertions.assertTrue(FullStatsSpi.QUEUE.isEmpty()); + + try (ResultSet rs = conn.createStatement().executeQuery("SELECT 1 + 2")) { + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.next()); + } + + Assertions.assertFalse(FullStatsSpi.QUEUE.isEmpty()); + Assertions.assertNotNull(FullStatsSpi.QUEUE.poll().stats); + Assertions.assertTrue(FullStatsSpi.QUEUE.isEmpty()); + + try (PreparedStatement ps = conn.prepareStatement("SELECT ? + ?")) { + ps.setInt(1, 1); + ps.setInt(2, 2); + Assertions.assertTrue(ps.execute()); + + Assertions.assertEquals(1, FullStatsSpi.QUEUE.size()); + FullStatsSpi.Record record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNotNull(record.stats); + Assertions.assertEquals(Status.SUCCESS, record.status); + Assertions.assertNull(record.th); + + ps.setInt(1, 2); + ps.setInt(2, 3); + ps.addBatch(); + ps.setLong(1, 2); + ps.setLong(2, 3); + ps.addBatch(); + + Assertions.assertEquals(2, ps.executeBatch().length); + + Assertions.assertEquals(2, FullStatsSpi.QUEUE.size()); + + record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNotNull(record.stats); + Assertions.assertEquals(Status.SUCCESS, record.status); + Assertions.assertNull(record.th); + + record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNotNull(record.stats); + Assertions.assertEquals(Status.SUCCESS, record.status); + Assertions.assertNull(record.th); + } + + try (Statement st = conn.createStatement()) { + ExceptionAssert.ydbException("code = GENERIC_ERROR", () -> st.executeQuery("SELECT 1 + 'test'u")); + + Assertions.assertEquals(1, FullStatsSpi.QUEUE.size()); + FullStatsSpi.Record record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNull(record.stats); + Assertions.assertEquals(StatusCode.GENERIC_ERROR, record.status.getCode()); + Assertions.assertNull(record.th); + } + } finally { + Thread.currentThread().setContextClassLoader(prev); + } + } + + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void validateQueryTest(String useQS) throws SQLException { + ClassLoader prev = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(new QuerySpiTestLoader(prev, FullStatsSpi.class, ValidateSpi.class, + EmptiSpi.class)); + + try (Connection conn = DriverManager.getConnection(jdbcURL.withArg("useQueryService", useQS).build())) { + Assertions.assertTrue(FullStatsSpi.QUEUE.isEmpty()); + Assertions.assertEquals(0, EmptiSpi.COUNT.get()); + + try (ResultSet rs = conn.createStatement().executeQuery("SELECT 1 + 2")) { + Assertions.assertTrue(rs.next()); + Assertions.assertFalse(rs.next()); + } + + Assertions.assertEquals(1, FullStatsSpi.QUEUE.size()); + FullStatsSpi.Record record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNotNull(record.stats); + Assertions.assertEquals(Status.SUCCESS, record.status); + Assertions.assertNull(record.th); + Assertions.assertEquals(1, EmptiSpi.COUNT.get()); + + try (Statement st = conn.createStatement()) { + ExceptionAssert.sqlException("INVALID QUERY", () -> st.executeQuery("SELECT 2 + 3")); + } + + Assertions.assertEquals(1, FullStatsSpi.QUEUE.size()); + record = FullStatsSpi.QUEUE.poll(); + Assertions.assertNull(record.stats); + Assertions.assertNull(record.status); + Assertions.assertNotNull(record.th); + Assertions.assertEquals("INVALID QUERY", record.th.getMessage()); + + Assertions.assertEquals(1, EmptiSpi.COUNT.get()); + } finally { + Thread.currentThread().setContextClassLoader(prev); + } + } + + public static class EmptiSpi implements YdbQueryExtentionService { + private static final AtomicLong COUNT = new AtomicLong(0); + + @Override + public QueryCall newDataQuery(YdbStatement statement, YdbQuery query, String yql) { + COUNT.incrementAndGet(); + return new YdbQueryExtentionService.QueryCall() { + }; + } + } + + public static class ValidateSpi implements YdbQueryExtentionService { + + @Override + public QueryCall newDataQuery(YdbStatement statement, YdbQuery query, String yql) throws SQLException { + if ("SELECT 2 + 3".equals(query.getOriginQuery())) { + throw new SQLException("INVALID QUERY"); + } + + return new YdbQueryExtentionService.QueryCall() { + }; + } + } + + public static class FullStatsSpi implements YdbQueryExtentionService { + private static final ConcurrentLinkedQueue QUEUE = new ConcurrentLinkedQueue<>(); + + @Override + public QueryCall newDataQuery(YdbStatement statement, YdbQuery query, String yql) { + Record r = new Record(); + QUEUE.add(r); + return r; + } + + static class Record implements YdbQueryExtentionService.QueryCall { + + private QueryStats stats = null; + private Status status = null; + private Throwable th = null; + + @Override + public ExecuteQuerySettings.Builder prepareQuerySettings(ExecuteQuerySettings.Builder builder) { + return builder.withStatsMode(QueryStatsMode.FULL); + } + + @Override + public ExecuteDataQuerySettings prepareDataQuerySettings(ExecuteDataQuerySettings settings) { + return settings.setCollectStats(QueryStatsCollectionMode.FULL); + } + + @Override + public void onQueryStats(QueryStats stats) { + this.stats = stats; + } + + @Override + public void onQueryResult(Status status, Throwable th) { + this.status = status; + this.th = th; + } + } + } +} diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/ExceptionAssert.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/ExceptionAssert.java index 37aa3698..f05824d3 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/ExceptionAssert.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/helper/ExceptionAssert.java @@ -22,7 +22,7 @@ public static void ydbException(String message, Executable exec) { "Invalid statement must throw YdbSQLException" ); Assertions.assertTrue(ex.getMessage().contains(message), - "YdbNonRetryableException '" + ex.getMessage() + "' doesn't contain message '" + message + "'"); + "YdbSQLException '" + ex.getMessage() + "' doesn't contain message '" + message + "'"); } public static void sqlDataException(String message, Executable exec) {