Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.YdbStatusable;
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
import tech.ydb.jdbc.impl.YdbQueryResultReader;
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
import tech.ydb.jdbc.impl.YdbResultSetMemory;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.jdbc.spi.YdbQueryExtentionService;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
Expand All @@ -43,6 +45,7 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
private final Duration sessionTimeout;
private final QueryClient queryClient;
private final boolean useStreamResultSet;
private final YdbQueryExtentionService querySpi;

private int transactionLevel;
private boolean isReadOnly;
Expand All @@ -58,6 +61,7 @@ public QueryServiceExecutor(YdbContext ctx) throws SQLException {
this.sessionTimeout = options.getSessionTimeout();
this.queryClient = ctx.getQueryClient();
this.useStreamResultSet = options.getUseStreamResultSets();
this.querySpi = ctx.getQuerySpi();

this.transactionLevel = options.getTransactionLevel();
this.isAutoCommit = options.isAutoCommit();
Expand Down Expand Up @@ -224,16 +228,21 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
}

@Override
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
throws SQLException {
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, YdbQuery query, String preparedYql,
Params params) throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();

String yql = prefixPragma + preparedYql;
YdbQueryExtentionService.QueryCall spi = querySpi.newDataQuery(statement, query, yql);

int timeout = statement.getQueryTimeout();
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeout > 0) {
settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS);
}
settings = spi.prepareQuerySettings(settings);

QueryTransaction nextTx = tx.get();
while (nextTx == null) {
Expand All @@ -244,11 +253,9 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String
}
}

final QueryTransaction localTx = nextTx;
QueryTransaction localTx = nextTx;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();

String yql = prefixPragma + preparedYql;

try {
tracer.trace("--> data query");
tracer.query(yql);
Expand All @@ -264,7 +271,19 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
}

if (result.getQueryInfo().hasStats()) {
spi.onQueryStats(result.getQueryInfo().getStats());
}

spi.onQueryResult(Status.SUCCESS, null);
return readers;
} catch (SQLException | RuntimeException ex) {
if (ex instanceof YdbStatusable) {
spi.onQueryResult(((YdbStatusable) ex).getStatus(), null);
} else {
spi.onQueryResult(null, ex);
}
throw ex;
} finally {
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
Expand All @@ -286,16 +305,20 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
ensureOpened();

if (!useStreamResultSet) {
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, query, preparedYql, params);
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
}

YdbValidator validator = statement.getValidator();
String yql = prefixPragma + preparedYql;
YdbQueryExtentionService.QueryCall spi = querySpi.newDataQuery(statement, query, yql);

int timeout = statement.getQueryTimeout();
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeout > 0) {
settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS);
}
settings = spi.prepareQuerySettings(settings);

QueryTransaction nextTx = tx.get();
while (nextTx == null) {
Expand All @@ -306,18 +329,16 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
}
}

final QueryTransaction localTx = nextTx;
QueryTransaction localTx = nextTx;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();

String yql = prefixPragma + preparedYql;

tracer.trace("--> stream query");
tracer.query(yql);
String msg = "STREAM_QUERY >>\n" + yql;

YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
@Override
public void onClose(Status status, Throwable th) {
spi.onQueryResult(status, th);
if (th != null) {
tracer.trace("<-- " + th.getMessage());
}
Expand All @@ -341,7 +362,7 @@ public void onClose(Status status, Throwable th) {

settings = settings.withGrpcFlowControl(reader);
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
validator.execute(msg, tracer, () -> reader.load(validator, stream));
validator.execute(msg, tracer, () -> reader.load(validator, stream, spi::onQueryStats));
return updateCurrentResult(reader);
}

Expand Down
2 changes: 1 addition & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public String getPreparedYQL() {
return preparedYQL;
}

public String getAat() {
public String getAst() {
return ast;
}

Expand Down
57 changes: 35 additions & 22 deletions jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
import java.sql.SQLFeatureNotSupportedException;
import java.time.Duration;

import tech.ydb.core.Status;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.YdbStatusable;
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
import tech.ydb.jdbc.impl.YdbResultSetMemory;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.jdbc.spi.YdbQueryExtentionService;
import tech.ydb.query.result.QueryStats;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand All @@ -33,13 +37,15 @@
*/
public class TableServiceExecutor extends BaseYdbExecutor {
private final boolean failOnTruncatedResult;
private final YdbQueryExtentionService querySpi;
private volatile TxState tx;

public TableServiceExecutor(YdbContext ctx) throws SQLException {
super(ctx);
YdbOperationProperties options = ctx.getOperationProperties();
this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit());
this.failOnTruncatedResult = options.isFailOnTruncatedResult();
this.querySpi = ctx.getQuerySpi();
}

@Override
Expand Down Expand Up @@ -200,8 +206,18 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
}
}

private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql,
ExecuteDataQuerySettings settings, Params prms) throws SQLException {
@Override
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, YdbQuery query, String preparedYql,
Params params) throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();
String yql = prefixPragma + preparedYql;
YdbQueryExtentionService.QueryCall spi = querySpi.newDataQuery(statement, query, yql);

YdbTracer tracer = statement.getConnection().getCtx().getTracer();
ExecuteDataQuerySettings settings = spi.prepareDataQuerySettings(dataQuerySettings(statement));

Session session = tx.getSession(validator);
try {
tracer.trace("--> data query");
Expand All @@ -210,10 +226,14 @@ private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer trac
DataQueryResult result = validator.call(
QueryType.DATA_QUERY + " >>\n" + yql,
tracer,
() -> session.executeDataQuery(yql, tx.txControl(), prms, settings)
() -> session.executeDataQuery(yql, tx.txControl(), params, settings)
);
updateState(tx.withDataQuery(session, result.getTxId()));

if (result.hasQueryStats()) {
spi.onQueryStats(new QueryStats(result.getRawQueryStats()));
}

if (failOnTruncatedResult) {
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
ResultSetReader rs = result.getResultSet(idx);
Expand All @@ -224,29 +244,22 @@ private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer trac
}
}

return result;
} catch (SQLException | RuntimeException ex) {
updateState(tx.withRollback(session));
throw ex;
}
}

@Override
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();
String yql = prefixPragma + preparedYql;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();

try {
DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params);
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
}

spi.onQueryResult(Status.SUCCESS, null);
return readers;
} catch (SQLException | RuntimeException ex) {
if (ex instanceof YdbStatusable) {
spi.onQueryResult(((YdbStatusable) ex).getStatus(), null);
} else {
spi.onQueryResult(null, ex);
}

updateState(tx.withRollback(session));
throw ex;
} finally {
if (tx.isInsideTransaction()) {
tracer.setId(tx.txID());
Expand All @@ -259,7 +272,7 @@ public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String
@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, query, preparedYql, params);
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
}

Expand Down
9 changes: 9 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.ydb.jdbc.settings.YdbConnectionProperties;
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.jdbc.settings.YdbQueryProperties;
import tech.ydb.jdbc.spi.YdbQueryExtentionService;
import tech.ydb.query.QueryClient;
import tech.ydb.query.impl.QueryClientImpl;
import tech.ydb.scheme.SchemeClient;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class YdbContext implements AutoCloseable {
private final boolean autoResizeSessionPool;
private final AtomicInteger connectionsCount = new AtomicInteger();

private final YdbQueryExtentionService querySpi;

private YdbContext(
YdbConfig config,
YdbOperationProperties operationProperties,
Expand Down Expand Up @@ -97,6 +100,8 @@ private YdbContext(
this.cache = new YdbCache(this,
queryProperties, config.getPreparedStatementsCachecSize(), config.isFullScanDetectorEnabled());
}

this.querySpi = YdbServiceLoader.loadQuerySpi();
}

public YdbTypes getTypes() {
Expand All @@ -115,6 +120,10 @@ public YdbTracer getTracer() {
return config.isTxTracedEnabled() ? YdbTracer.current() : YdbTracerNone.DISABLED;
}

public YdbQueryExtentionService getQuerySpi() {
return querySpi;
}

static String joined(String path1, String path2) {
return path1.endsWith("/") || path2.startsWith("/") ? path1 + path2 : path1 + "/" + path2;
}
Expand Down
3 changes: 2 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public interface YdbExecutor {
YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException;
YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException;
YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, YdbQuery query, String yql, Params prms)
throws SQLException;

void commit(YdbContext ctx, YdbValidator validator) throws SQLException;
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;
Expand Down
Loading