Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.common.YdbTypes;
import tech.ydb.jdbc.exception.YdbRetryableException;
import tech.ydb.jdbc.impl.YdbQueryResultReader;
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
import tech.ydb.jdbc.impl.YdbResultSetMemory;
Expand Down Expand Up @@ -89,24 +87,6 @@ public void ensureOpened() throws SQLException {
}
}

@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
boolean insideTx = isInsideTransaction();
while (true) {
try {
return executeQueryImpl(statement, query, preparedYql, params);
} catch (YdbRetryableException ex) {
if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) {
throw ex;
}
}
}
}

protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql,
Params params) throws SQLException;

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
ensureOpened();
Expand Down
109 changes: 69 additions & 40 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import tech.ydb.core.Status;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
Expand Down Expand Up @@ -225,12 +224,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
}

@Override
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();

int timeout = statement.getQueryTimeout();
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeout > 0) {
Expand All @@ -251,41 +249,6 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query

String yql = prefixPragma + preparedYql;

if (useStreamResultSet) {
tracer.trace("--> stream query");
tracer.query(yql);
String msg = "STREAM_QUERY >>\n" + yql;

final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
@Override
public void onClose(Status status, Throwable th) {
if (th != null) {
tracer.trace("<-- " + th.getMessage());
}
if (status != null) {
validator.addStatusIssues(status);
tracer.trace("<-- " + status.toString());
}

if (localTx.isActive()) {
tracer.setId(localTx.getId());
} else {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
tracer.close();
}

super.onClose(status, th);
}
};

settings = settings.withGrpcFlowControl(reader);
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
validator.execute(msg, tracer, () -> reader.load(validator, stream));
return updateCurrentResult(reader);
}

try {
tracer.trace("--> data query");
tracer.query(yql);
Expand All @@ -296,11 +259,12 @@ public void onClose(Status status, Throwable th) {
);
validator.addStatusIssues(result.getIssueList());

YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
for (int idx = 0; idx < readers.length; idx++) {
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
}
return updateCurrentResult(new YdbQueryResultStatic(query, readers));

return readers;
} finally {
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
Expand All @@ -316,6 +280,71 @@ public void onClose(Status status, Throwable th) {
}
}

@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
ensureOpened();

if (!useStreamResultSet) {
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
}

YdbValidator validator = statement.getValidator();
int timeout = statement.getQueryTimeout();
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
if (timeout > 0) {
settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS);
}

QueryTransaction nextTx = tx.get();
while (nextTx == null) {
nextTx = createNewQuerySession(validator).createNewTransaction(txMode);
if (!tx.compareAndSet(null, nextTx)) {
nextTx.getSession().close();
nextTx = tx.get();
}
}

final QueryTransaction localTx = nextTx;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();

String yql = prefixPragma + preparedYql;

tracer.trace("--> stream query");
tracer.query(yql);
String msg = "STREAM_QUERY >>\n" + yql;

YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
@Override
public void onClose(Status status, Throwable th) {
if (th != null) {
tracer.trace("<-- " + th.getMessage());
}
if (status != null) {
validator.addStatusIssues(status);
tracer.trace("<-- " + status.toString());
}

if (localTx.isActive()) {
tracer.setId(localTx.getId());
} else {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
tracer.close();
}

super.onClose(status, th);
}
};

settings = settings.withGrpcFlowControl(reader);
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
validator.execute(msg, tracer, () -> reader.load(validator, stream));
return updateCurrentResult(reader);
}

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
ensureOpened();
Expand Down
65 changes: 42 additions & 23 deletions jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
Expand Down Expand Up @@ -201,40 +200,53 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
}
}

@Override
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();
private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql,
ExecuteDataQuerySettings settings, Params prms) throws SQLException {
Session session = tx.getSession(validator);
String yql = prefixPragma + preparedYql;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
tracer.trace("--> data query");
tracer.query(yql);

try {
tracer.trace("--> data query");
tracer.query(yql);

DataQueryResult result = validator.call(
QueryType.DATA_QUERY + " >>\n" + yql, tracer,
() -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(statement))
QueryType.DATA_QUERY + " >>\n" + yql,
tracer,
() -> session.executeDataQuery(yql, tx.txControl(), prms, settings)
);
updateState(tx.withDataQuery(session, result.getTxId()));

YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
ResultSetReader rs = result.getResultSet(idx);
if (failOnTruncatedResult && rs.isTruncated()) {
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
throw new SQLException(msg);
if (failOnTruncatedResult) {
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
ResultSetReader rs = result.getResultSet(idx);
if (rs.isTruncated()) {
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
throw new SQLException(msg);
}
}

readers[idx] = new YdbResultSetMemory(types, statement, rs);
}

return updateCurrentResult(new YdbQueryResultStatic(query, readers));
return result;
} catch (SQLException | RuntimeException ex) {
updateState(tx.withRollback(session));
throw ex;
}
}

@Override
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();
String yql = prefixPragma + preparedYql;
YdbTracer tracer = statement.getConnection().getCtx().getTracer();

try {
DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params);
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
}
return readers;
} finally {
if (tx.isInsideTransaction()) {
tracer.setId(tx.txID());
Expand All @@ -244,6 +256,13 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query
}
}

@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
throws SQLException {
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
}

@Override
public boolean isValid(YdbValidator validator, int timeout) throws SQLException {
ensureOpened();
Expand Down
2 changes: 2 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import tech.ydb.jdbc.YdbQueryResult;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.impl.YdbResultSetMemory;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.values.ListValue;
Expand Down Expand Up @@ -34,6 +35,7 @@ public interface YdbExecutor {
YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException;
YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException;

void commit(YdbContext ctx, YdbValidator validator) throws SQLException;
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.Types;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
import java.util.Objects;
import java.util.logging.Logger;

Expand Down Expand Up @@ -97,9 +98,14 @@ public int[] executeBatch() throws SQLException {
YdbQueryResult newState = executeBulkUpsert(query, bulk.getTablePath(), bulk.getBatchedBulk());
updateState(newState);
} else {
for (Params prm: prepared.getBatchParams()) {
List<Params> prms = prepared.getBatchParams();
if (prms.size() == 1) {
Params prm = prms.get(0);
YdbQueryResult newState = executeDataQuery(query, prepared.getBatchText(prm), prm);
updateState(newState);
} else {
YdbQueryResult newState = executeBatchQuery(query, prepared::getBatchText, prms);
updateState(newState);
}
}
} finally {
Expand Down
4 changes: 4 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public YdbResultSetMemory(YdbTypes types, YdbStatement statement, ResultSetReade
this.totalCount = total;
}

public ResultSetReader[] getResultSets() {
return rs;
}

@Override
protected ValueReader getValue(int columnIndex) throws SQLException {
if (!isRowIndexValid()) {
Expand Down
Loading