diff --git a/query/src/main/java/tech/ydb/query/script/ScriptClient.java b/query/src/main/java/tech/ydb/query/script/ScriptClient.java new file mode 100644 index 00000000..19aaaa85 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/ScriptClient.java @@ -0,0 +1,92 @@ +package tech.ydb.query.script; + +import java.util.concurrent.CompletableFuture; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; +import tech.ydb.query.script.result.ScriptResultPart; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.table.query.Params; + +/** + * High-level API for executing YQL scripts and retrieving their results. + *

+ * Provides convenience methods for starting script execution, tracking operation status, + * and fetching result sets with pagination support. + *

+ * How to use + *

+ *

Example with fetch + *

{@code
+ *      Operation operation = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings).join())
+ *      Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join()
+ *      Result< ScriptResultPart> resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings).join()
+ *      ResultSetReader reader = scriptResultPart.getResultSetReader()
+ *      reader.next()
+ * }
+ *

Example without fetch + *

{@code
+ * Status status = scriptClient.startQueryScript("select...",Params.of(...), executeScriptSettings)
+ *                             .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1))
+ *                             .join()
+ * }
+ *

Author: Evgeny Kuvardin + */ +public interface ScriptClient { + + /** + * Returns operation metadata for a previously started script execution. + * + * @param operationId operation identifier + * @param settings request settings + * @return future resolving to operation status + */ + CompletableFuture> findQueryScript(String operationId, FindScriptSettings settings); + + /** + * Starts execution of the given YQL script with optional parameters. + * + * @param query YQL script text + * @param params query parameters + * @param settings execution settings (TTL, resource pool, exec mode) + * @return future resolving to a long-running operation + */ + CompletableFuture> startQueryScript(String query, + Params params, + ExecuteScriptSettings settings); + + /** + * Wait for script execution and return status + * + * @param operation operation object returned when script started + * @param fetchRateSeconds How often should we check if the operation has finished + * @return future with result of script execution + */ + default CompletableFuture fetchQueryScriptStatus(Operation operation, int fetchRateSeconds) { + return OperationTray.fetchOperation(operation, fetchRateSeconds); + } + + /** + * Fetches script results incrementally. + * + * @param operation operation object returned when script started + * @param previous previous result part, or {@code null} if fetching from start + * @param settings fetch configuration + * @return future resolving to result part containing a result set fragment + */ + CompletableFuture> fetchQueryScriptResult(@Nonnull Operation operation, + @Nullable ScriptResultPart previous, + FetchScriptSettings settings); + +} diff --git a/query/src/main/java/tech/ydb/query/script/ScriptRpc.java b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java new file mode 100644 index 00000000..3773c5c9 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/ScriptRpc.java @@ -0,0 +1,48 @@ +package tech.ydb.query.script; + +import java.util.concurrent.CompletableFuture; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.operation.Operation; +import tech.ydb.proto.query.YdbQuery; + +/** + * Low-level RPC interface for executing YQL scripts and fetching their results using gRPC. + *

+ * Provides direct bindings to the YDB QueryService API + * Used internally by {@link tech.ydb.query.script.ScriptClient} implementations. + * + *

Author: Evgeny Kuvardin + */ +public interface ScriptRpc { + + /** + * Retrieves a previously created operation by its ID. + * + * @param operationId ID of the operation to fetch + * @return future resolving to the operation metadata and status + */ + CompletableFuture> getOperation(String operationId); + + /** + * Executes a script as a long-running operation. + * + * @param request execution request describing the script and execution mode {@link YdbQuery.ExecuteScriptRequest} + * @param settings RPC request settings including timeout, trace ID, etc. + * @return future resolving to an {@link Operation} representing the script execution + */ + CompletableFuture> executeScript( + YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings); + + /** + * Fetches partial results for a previously executed script. + * + * @param request fetch request including token, result set index, etc. {@link YdbQuery.FetchScriptResultsRequest} + * @param settings RPC settings for this request + * @return future resolving to the result fetch response {@link Result} of {@link YdbQuery.FetchScriptResultsResponse} + */ + CompletableFuture> fetchScriptResults( + YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings); +} diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java new file mode 100644 index 00000000..cea06a66 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptClientImpl.java @@ -0,0 +1,150 @@ +package tech.ydb.query.script.impl; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.WillNotClose; + +import com.google.protobuf.Duration; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.settings.BaseRequestSettings; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.query.script.ScriptClient; +import tech.ydb.query.script.ScriptRpc; +import tech.ydb.query.script.result.ScriptResultPart; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.QueryStatsMode; +import tech.ydb.table.query.Params; + +/** + * Default implementation of {@link ScriptClient} using {@link ScriptRpc} for RPC calls. + *

+ * Handles script execution lifecycle: starting scripts, polling their status, + * and retrieving result sets in streaming fashion. + * + *

Author: Evgeny Kuvardin + */ +public class ScriptClientImpl implements ScriptClient { + + private final ScriptRpc scriptRpc; + + ScriptClientImpl(ScriptRpc scriptRpc) { + this.scriptRpc = scriptRpc; + } + + public static ScriptClient newClient(@WillNotClose GrpcTransport transport) { + return new ScriptClientImpl(ScriptRpcImpl.useTransport(transport)); + } + + @Override + public CompletableFuture> findQueryScript(String operationId, FindScriptSettings settings) { + return scriptRpc.getOperation(operationId); + } + + @Override + public CompletableFuture> startQueryScript(String query, + Params params, + ExecuteScriptSettings settings) { + YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() + .setExecMode(mapExecMode(settings.getExecMode())) + .setStatsMode(mapStatsMode(settings.getStatsMode())) + .setScriptContent(YdbQuery.QueryContent.newBuilder() + .setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1) + .setText(query) + .build()); + + java.time.Duration ttl = settings.getTtl(); + if (ttl != null) { + request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano())); + } + + String resourcePool = settings.getResourcePool(); + if (resourcePool != null && !resourcePool.isEmpty()) { + request.setPoolId(resourcePool); + } + + request.putAllParameters(params.toPb()); + + GrpcRequestSettings options = makeGrpcRequestSettings(settings); + + return scriptRpc.executeScript(request.build(), options); + } + + @Override + public CompletableFuture> fetchQueryScriptResult(@Nonnull Operation operation, + @Nullable ScriptResultPart previous, + FetchScriptSettings settings) { + YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); + + if (previous != null && previous.getNextFetchToken() != null) { + requestBuilder.setFetchToken(previous.getNextFetchToken()); + } + + if (settings.getRowsLimit() > 0) { + requestBuilder.setRowsLimit(settings.getRowsLimit()); + } + + requestBuilder.setOperationId(operation.getId()); + + if (settings.getSetResultSetIndex() >= 0) { + requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); + } + + GrpcRequestSettings options = makeGrpcRequestSettings(settings); + + return scriptRpc.fetchScriptResults(requestBuilder.build(), options) + .thenApply(p -> p.map(ScriptResultPart::new)); + } + + private GrpcRequestSettings makeGrpcRequestSettings(BaseRequestSettings settings) { + String traceId = settings.getTraceId() == null ? UUID.randomUUID().toString() : settings.getTraceId(); + return GrpcRequestSettings.newBuilder() + .withDeadline(settings.getRequestTimeout()) + .withTraceId(traceId) + .build(); + } + + private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { + switch (mode) { + case EXECUTE: + return YdbQuery.ExecMode.EXEC_MODE_EXECUTE; + case EXPLAIN: + return YdbQuery.ExecMode.EXEC_MODE_EXPLAIN; + case PARSE: + return YdbQuery.ExecMode.EXEC_MODE_PARSE; + case VALIDATE: + return YdbQuery.ExecMode.EXEC_MODE_VALIDATE; + + case UNSPECIFIED: + default: + return YdbQuery.ExecMode.EXEC_MODE_UNSPECIFIED; + } + } + + private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { + switch (mode) { + case NONE: + return YdbQuery.StatsMode.STATS_MODE_NONE; + case BASIC: + return YdbQuery.StatsMode.STATS_MODE_BASIC; + case FULL: + return YdbQuery.StatsMode.STATS_MODE_FULL; + case PROFILE: + return YdbQuery.StatsMode.STATS_MODE_PROFILE; + + case UNSPECIFIED: + default: + return YdbQuery.StatsMode.STATS_MODE_UNSPECIFIED; + } + } +} diff --git a/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java new file mode 100644 index 00000000..e7028d4e --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/impl/ScriptRpcImpl.java @@ -0,0 +1,79 @@ +package tech.ydb.query.script.impl; + +import java.util.concurrent.CompletableFuture; + +import javax.annotation.WillNotClose; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationBinder; +import tech.ydb.proto.OperationProtos; +import tech.ydb.proto.operation.v1.OperationServiceGrpc; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.query.v1.QueryServiceGrpc; +import tech.ydb.query.script.ScriptRpc; + +/** + * Default gRPC-based implementation of {@link ScriptRpc}. + *

+ * Uses {@link GrpcTransport} to communicate with YDB QueryService and OperationService. + * Provides async unary calls for executing scripts and retrieving results or operation metadata. + * + *

Author: Evgeny Kuvardin + */ +public class ScriptRpcImpl implements ScriptRpc { + + private final GrpcTransport transport; + + private ScriptRpcImpl(GrpcTransport grpcTransport) { + this.transport = grpcTransport; + } + + /** + * Creates a new RPC instance bound to the given gRPC transport. + * + * @param grpcTransport transport instance (not closed by this class) + * @return new {@link ScriptRpcImpl} instance + */ + public static ScriptRpcImpl useTransport(@WillNotClose GrpcTransport grpcTransport) { + return new ScriptRpcImpl(grpcTransport); + } + + @Override + public CompletableFuture> getOperation(String operationId) { + OperationProtos.GetOperationRequest request = OperationProtos.GetOperationRequest.newBuilder() + .setId(operationId) + .build(); + + GrpcRequestSettings settings = GrpcRequestSettings.newBuilder().build(); + + return transport + .unaryCall(OperationServiceGrpc.getGetOperationMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + OperationProtos.GetOperationResponse::getOperation + )); + } + + @Override + public CompletableFuture> executeScript( + YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { + + return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + op -> op + )); + } + + @Override + public CompletableFuture> fetchScriptResults( + YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { + + return transport + .unaryCall(QueryServiceGrpc.getFetchScriptResultsMethod(), settings, request); + } +} diff --git a/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java new file mode 100644 index 00000000..87adb44f --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/result/ScriptResultPart.java @@ -0,0 +1,51 @@ +package tech.ydb.query.script.result; + +import tech.ydb.core.Issue; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; + +/** + * Represents a single portion of script execution results. + *

+ * Contains: + *

+ */ +public class ScriptResultPart { + private final ResultSetReader resultSetReader; + private final long resultSetIndex; + private final String nextFetchToken; + private final Issue[] issues; + + public ScriptResultPart(YdbQuery.FetchScriptResultsResponse value) { + this.resultSetReader = ProtoValueReaders.forResultSet(value.getResultSet()); + this.resultSetIndex = value.getResultSetIndex(); + this.nextFetchToken = value.getNextFetchToken(); + this.issues = Issue.fromPb(value.getIssuesList()); + } + + public ResultSetReader getResultSetReader() { + return resultSetReader; + } + + public String getNextFetchToken() { + return nextFetchToken; + } + + public long getResultSetIndex() { + return resultSetIndex; + } + + public boolean hasErrors() { + return issues.length > 0; + } + + public Issue[] getIssues() { + return issues; + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java new file mode 100644 index 00000000..a328b0e6 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/ExecuteScriptSettings.java @@ -0,0 +1,156 @@ +package tech.ydb.query.script.settings; + +import java.time.Duration; + +import tech.ydb.core.settings.BaseRequestSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.QueryStatsMode; + +/** + * Settings controlling execution of a YQL script. + *

+ * Used to specify execution mode, statistics collection level, + * result TTL, and resource pool assignment. + * + *

Author: Evgeny Kuvardin + */ +public class ExecuteScriptSettings extends BaseRequestSettings { + private final QueryExecMode execMode; + private final QueryStatsMode statsMode; + private final String resourcePool; + private final Duration ttl; + + private ExecuteScriptSettings(Builder builder) { + super(builder); + this.execMode = builder.execMode; + this.statsMode = builder.statsMode; + this.ttl = builder.ttl; + this.resourcePool = builder.resourcePool; + } + + /** + * Returns the execution mode for the script. + * + *

Defines how the script should be processed, e.g. executed, explained, validated, or parsed.

+ * + * @return the {@link QueryExecMode} used for execution + */ + public QueryExecMode getExecMode() { + return this.execMode; + } + + /** + * Returns the time-to-live (TTL) duration for the script results. + * + *

Specifies how long results of the executed script will be kept available + * before automatic cleanup on the server.

+ * + * @return the TTL value, or {@code null} if not set + */ + public Duration getTtl() { + return ttl; + } + + /** + * Returns the statistics collection mode for script execution. + * + *

Determines how detailed execution statistics should be gathered + * (none, basic, full, or profiling level).

+ * + * @return the {@link QueryStatsMode} used for statistics collection + */ + public QueryStatsMode getStatsMode() { + return this.statsMode; + } + + /** + * Returns the name of the resource pool assigned to the script execution. + * + *

Resource pools define isolated resource groups for workload management. + * If not specified, the default pool is used.

+ * + * @return the resource pool name, or {@code null} if not set + */ + public String getResourcePool() { + return this.resourcePool; + } + + + /** + * Creates a new {@link Builder} instance for constructing {@link ExecuteScriptSettings}. + * + * @return a new builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for creating immutable {@link ExecuteScriptSettings} instances. + *

+ * Provides fluent configuration for script execution settings + */ + public static class Builder extends BaseBuilder { + private QueryExecMode execMode = QueryExecMode.EXECUTE; + private QueryStatsMode statsMode = QueryStatsMode.NONE; + private String resourcePool = null; + private Duration ttl = null; + + /** + * Sets the execution mode for the script. + * + * @param mode the desired execution mode + * @return this builder instance for chaining + * @see QueryExecMode + */ + public Builder withExecMode(QueryExecMode mode) { + this.execMode = mode; + return this; + } + + /** + * Sets the statistics collection mode for the script execution. + * + * @param mode the desired statistics mode + * @return this builder instance for chaining + * @see QueryStatsMode + */ + public Builder withStatsMode(QueryStatsMode mode) { + this.statsMode = mode; + return this; + } + + /** + * Sets the time-to-live (TTL) duration for script results. + * + *

After this duration expires, stored script results may be deleted + * from the server automatically.

+ * + * @param value the TTL duration + * @return this builder instance for chaining + */ + public Builder withTtl(Duration value) { + this.ttl = value; + return this; + } + + /** + * Specifies the resource pool to use for query execution. + *

+ * If no pool is specified, or the ID is empty, or equal to {@code "default"}, + * the unremovable resource pool "default" will be used. + * + * @param poolId resource pool identifier + * @return this builder instance for chaining + */ + public Builder withResourcePool(String poolId) { + this.resourcePool = poolId; + return this; + } + + @Override + public ExecuteScriptSettings build() { + return new ExecuteScriptSettings(this); + } + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java new file mode 100644 index 00000000..0b530b2a --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/FetchScriptSettings.java @@ -0,0 +1,76 @@ +package tech.ydb.query.script.settings; + +import tech.ydb.core.settings.BaseRequestSettings; + +/** + * Settings for configuring the fetch phase of a previously executed YQL script. + * Take a note that script should be executed successfully before fetch result + *

+ * These settings define which operation results to fetch, pagination options, + * row limits, and which result set index to retrieve. + * Used with {@code QuerySession.fetchScriptResults(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin + */ +public class FetchScriptSettings extends BaseRequestSettings { + private final int rowsLimit; + private final long setResultSetIndex; + + private FetchScriptSettings(Builder builder) { + super(builder); + this.rowsLimit = builder.rowsLimit; + this.setResultSetIndex = builder.setResultSetIndex; + } + + /** + * Returns the maximum number of rows to retrieve in this fetch request. + * + *

If not set , the server will use its default limit.

+ * + * @return the maximum number of rows to fetch + */ + public int getRowsLimit() { + return rowsLimit; + } + + /** + * Returns the index of the result set to fetch from the executed script. + * + *

When the executed script produces multiple result sets, + * this value specifies which one to retrieve (starting from 0).

+ * + * @return the result set index + */ + public long getSetResultSetIndex() { + return setResultSetIndex; + } + + /** + * Creates a new builder configured for asynchronous operation requests. + */ + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends BaseBuilder { + + private int rowsLimit = 0; + private long setResultSetIndex = 0; + + @Override + public FetchScriptSettings build() { + return new FetchScriptSettings(this); + } + + public Builder withRowsLimit(int rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + public Builder withSetResultSetIndex(long setResultSetIndex) { + this.setResultSetIndex = setResultSetIndex; + return this; + } + + } +} diff --git a/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java new file mode 100644 index 00000000..0dc78404 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/script/settings/FindScriptSettings.java @@ -0,0 +1,28 @@ +package tech.ydb.query.script.settings; + +import tech.ydb.core.settings.OperationSettings; + +/** + * Settings for retrieving metadata of a previously started script operation. + *

+ * Extends {@link OperationSettings} and enables async fetching by default. + * + *

Author: Evgeny Kuvardin + */ +public class FindScriptSettings extends OperationSettings { + + private FindScriptSettings(Builder builder) { + super(builder); + } + + public static Builder newBuilder() { + return new Builder().withAsyncMode(true); + } + + public static class Builder extends OperationSettings.OperationBuilder { + @Override + public FindScriptSettings build() { + return new FindScriptSettings(this); + } + } +} diff --git a/query/src/test/java/tech/ydb/query/TestExampleData.java b/query/src/test/java/tech/ydb/query/TestExampleData.java index 3c7684d5..14a3d6de 100644 --- a/query/src/test/java/tech/ydb/query/TestExampleData.java +++ b/query/src/test/java/tech/ydb/query/TestExampleData.java @@ -4,7 +4,7 @@ import java.util.Arrays; import java.util.List; -final class TestExampleData { +public final class TestExampleData { public static class Series { private final long seriesID; private final String title; diff --git a/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java new file mode 100644 index 00000000..86235447 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/impl/ScriptExampleTest.java @@ -0,0 +1,482 @@ +package tech.ydb.query.impl; + +import java.time.Duration; +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.query.QueryClient; +import tech.ydb.query.TestExampleData; +import tech.ydb.query.script.ScriptClient; +import tech.ydb.query.script.impl.ScriptClientImpl; +import tech.ydb.query.script.result.ScriptResultPart; +import tech.ydb.query.script.settings.ExecuteScriptSettings; +import tech.ydb.query.script.settings.FetchScriptSettings; +import tech.ydb.query.script.settings.FindScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.PrimitiveValue; +import tech.ydb.table.values.StructType; +import tech.ydb.test.junit4.GrpcTransportRule; + + +/** + * Integration tests that validate the execution of YQL scripts + * using the YDB Query API and scripting features. + * + *

Author: Evgeny Kuvardin + */ +public class ScriptExampleTest { + + @ClassRule + public final static GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static QueryClient client; + private static SessionRetryContext retryCtx; + private static ScriptClient scriptClient; + + // Create type for struct of series + StructType seriesType = StructType.of( + "series_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "release_date", PrimitiveType.Date, + "series_info", PrimitiveType.Text + ); + // Create and fill list of series + ListValue seriesData = ListType.of(seriesType).newValue( + TestExampleData.SERIES.stream().map(series -> seriesType.newValue( + "series_id", PrimitiveValue.newUint64(series.seriesID()), + "title", PrimitiveValue.newText(series.title()), + "series_info", PrimitiveValue.newText(series.seriesInfo()), + "release_date", PrimitiveValue.newDate(series.releaseDate()) + )).collect(Collectors.toList()) + ); + + // Create type for struct of season + StructType seasonType = StructType.of( + "series_id", PrimitiveType.Uint64, + "season_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "first_aired", PrimitiveType.Date, + "last_aired", PrimitiveType.Date + ); + // Create and fill list of seasons + ListValue seasonsData = ListType.of(seasonType).newValue( + TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( + "series_id", PrimitiveValue.newUint64(season.seriesID()), + "season_id", PrimitiveValue.newUint64(season.seasonID()), + "title", PrimitiveValue.newText(season.title()), + "first_aired", PrimitiveValue.newDate(season.firstAired()), + "last_aired", PrimitiveValue.newDate(season.lastAired()) + )).collect(Collectors.toList()) + ); + + @BeforeClass + public static void init() { + client = QueryClient.newClient(ydbRule) + .sessionPoolMaxSize(5) + .build(); + retryCtx = SessionRetryContext.create(client).build(); + + scriptClient = ScriptClientImpl.newClient(ydbRule); + + Assert.assertNotNull(client.getScheduler()); + + retryCtx.supplyResult(session -> session.createQuery("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ")", TxMode.NONE).execute() + ).join().getStatus().expectSuccess("Can't create table series"); + + retryCtx.supplyResult(session -> session.createQuery("" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")", TxMode.NONE).execute() + ).join().getStatus().expectSuccess("Can't create table seasons"); + } + + @After + public void clean() { + retryCtx.supplyResult(session -> session.createQuery("delete from series;", TxMode.NONE).execute()) + .join(); + retryCtx.supplyResult(session -> session.createQuery("delete from seasons;", TxMode.NONE).execute()) + .join(); + } + + @AfterClass + public static void cleanAll() { + retryCtx.supplyResult(session -> session.createQuery("drop table series;", TxMode.NONE).execute()) + .join(); + retryCtx.supplyResult(session -> session.createQuery("drop table seasons;", TxMode.NONE).execute()) + .join(); + + client.close(); + } + + /** + * Validates that executing a malformed YQL script results in a failure. + *

+ * Test steps: + *

    + *
  1. Attempts to execute an invalid script containing a syntax error
  2. + *
  3. Verifies that the script operation returns a failed {@link Status}
  4. + *
  5. Performs a follow-up SELECT query on a table that should not exist
  6. + *
  7. Confirms that the query fails, as expected
  8. + *
+ * The test ensures that invalid scripts do not produce side effects. + */ + @Test + public void createScriptShouldFail() { + Status statusOperation = scriptClient.startQueryScript("" + + "CREATE TABLE series2 " + + "( series_id UInt64, " + + " title Text, " + + " series_info Text, " + + " release_date Date, " + + " PRIMARY KEY(series_id));" + + "ZCREATE TABLE seasons2 (" + + " series_id UInt64, " + + " season_id UInt64, " + + " title Text, " + + " first_aired Date, " + + " last_aired Date, " + + " PRIMARY KEY(series_id, season_id))", + Params.empty(), ExecuteScriptSettings.newBuilder().build()) + .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + .join(); + + Assert.assertFalse(statusOperation.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series2 WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + // Check that table exists and contains no data + Assert.assertFalse(result.isSuccess()); + } + + /** + * Verifies end-to-end execution of a script that performs UPSERT operations. + *

+ * Test steps: + *

    + *
  1. Configures an execution script
  2. + *
  3. Runs a script that inserts sample data into multiple tables
  4. + *
  5. Waits for script completion
  6. + *
  7. Executes a SELECT query to confirm that data was inserted
  8. + *
+ */ + @Test + public void createInsertQueryScript() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .withTtl(Duration.ofSeconds(10)) + .build(); + + Status status = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) + .thenCompose(p -> scriptClient.fetchQueryScriptStatus(p, 1)) + .join(); + + Assert.assertNotNull(status); + Assert.assertTrue(status.isSuccess()); + + String query + = "SELECT series_id " + + "FROM seasons WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); + } + + + /** + * Validates the ability to find script + *

+ * Test steps: + *

    + *
  1. Starts a script that performs UPSERTs and a SELECT query
  2. + *
  3. Fetches the operation using {@code findQueryScript}
  4. + *
  5. Ensures the returned operation ID matches the original
  6. + *
  7. Waits for operation completion and validates success
  8. + *
+ * Confirms that {@link ScriptClient#findQueryScript} correctly locates + * running or completed script operations. + */ + @Test + public void findAndStartScript() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + + Operation operation1 = scriptClient.findQueryScript(operation.getId(), FindScriptSettings.newBuilder().build()).join(); + + + Assert.assertEquals(operation.getId(), operation1.getId()); + + Status status = scriptClient.fetchQueryScriptStatus(operation1, 1).join(); + Assert.assertTrue(status.isSuccess()); + } + + /** + * Tests fetching results from an executed script using {@link FetchScriptSettings}. + * + *

Scenario: + *

    + *
  1. Create tables
  2. + *
  3. Insert sample data via parameterized script
  4. + *
  5. Fetch the result set from the executed operation
  6. + *
+ */ + @Test + public void fetchScript() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + scriptClient.fetchQueryScriptStatus(operation, 1).join(); + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .build(); + + Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + checkFetch(resultPartResult, 1); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(resultPartResult.getValue().getResultSetIndex()) + .build(); + + Result resultPartResult1 = scriptClient.fetchQueryScriptResult(operation, resultPartResult.getValue(), fetchScriptSettings2) + .join(); + + checkFetch(resultPartResult1, 2); + } + + /** + * Verifies fetching multiple result sets generated by a single script. + *

+ * Scenario: + *

    + *
  1. Executes a script that produces two independent SELECT result sets
  2. + *
  3. Fetches each result set separately using {@code resultSetIndex}
  4. + *
  5. Validates row counts for each result set
  6. + *
+ */ + @Test + public void fetchScriptWithManyResultSet() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;" + + "SELECT season_id FROM seasons where series_id = 2 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + scriptClient.fetchQueryScriptStatus(operation, 1).join(); + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(10) + .withSetResultSetIndex(0) + .build(); + + Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + ScriptResultPart part = resultPartResult.getValue(); + + ResultSetReader reader = part.getResultSetReader(); + + Assert.assertEquals(4, reader.getRowCount()); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(10) + .withSetResultSetIndex(1) + .build(); + + Result resultPartResult1 = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings2) + .join(); + + ScriptResultPart part1 = resultPartResult1.getValue(); + + ResultSetReader reader2 = part1.getResultSetReader(); + + Assert.assertEquals(5, reader2.getRowCount()); + } + + /** + * Ensures that script execution surfaces query errors in result fetch. + *

+ * Test steps: + *

    + *
  1. Executes a script containing an incorrect column reference
  2. + *
  3. Waits for script execution to complete
  4. + *
  5. Fetches the corresponding result set
  6. + *
  7. Validates that the result contains error issues
  8. + *
  9. Checks that the reported issue matches the incorrect column name
  10. + *
+ */ + + @Test + public void fetchScriptWithError() { + ExecuteScriptSettings executeScriptSettings = ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Operation operation = scriptClient.startQueryScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_ids = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings).join(); + + Status status = scriptClient.fetchQueryScriptStatus(operation, 1).join(); + + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .build(); + + Result resultPartResult = scriptClient.fetchQueryScriptResult(operation, null, fetchScriptSettings1) + .join(); + + Assert.assertTrue(resultPartResult.getValue().hasErrors()); + + Assert.assertTrue( + Arrays.stream(resultPartResult.getValue().getIssues()).anyMatch( + issue -> issue.toString().contains("not found: series_ids."))); + } + + private void checkFetch(Result resultPartResult, int value) { + ScriptResultPart scriptResultPart = resultPartResult.getValue(); + + ResultSetReader reader = scriptResultPart.getResultSetReader(); + Assert.assertEquals(1, reader.getRowCount()); + + reader.next(); + Assert.assertEquals(value, reader.getColumn(0).getUint64()); + } +}