From dc6942dabd2fdbb0a3db4f2e7bdcd07ce433d1a8 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Thu, 20 Nov 2025 12:12:49 +0300 Subject: [PATCH] YQ-4893 Fix precompute (#29150) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ydb/core/fq/libs/actors/run_actor.cpp | 26 ++++----- ydb/tests/fq/s3/test_s3_0.py | 76 +++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index b54c80a9b147..ba61d1a2b592 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -323,6 +323,7 @@ struct TEvaluationGraphInfo { NActors::TActorId ResultId; NThreading::TPromise Result; ui64 Index = 0; + TMaybe ResultFormatSettings; }; class TRunActor : public NActors::TActorBootstrapped { @@ -1279,7 +1280,7 @@ class TRunActor : public NActors::TActorBootstrapped { LOG_D("Query evaluation " << NYql::NDqProto::StatusIds_StatusCode_Name(QueryEvalStatusCode) << ". " << it->second.Index << " response. Issues count: " << result.IssuesSize() - << ". Rows count: " << result.GetRowsCount()); + << ". Rows count: " << result.GetRowsCount() << ", Sample count: " << result.SampleSize() << ", Truncated: " << result.GetTruncated()); TVector rows; for (const auto& s : result.GetSample()) { @@ -1288,11 +1289,12 @@ class TRunActor : public NActors::TActorBootstrapped { rows.emplace_back(std::move(batch)); } - TProtoBuilder protoBuilder(ResultFormatSettings->ResultType, ResultFormatSettings->Columns); + const auto& resultFormatSettings = it->second.ResultFormatSettings; + TProtoBuilder protoBuilder(resultFormatSettings->ResultType, resultFormatSettings->Columns); bool ysonTruncated = false; - queryResult.Data = protoBuilder.BuildYson(std::move(rows), ResultFormatSettings->SizeLimit.GetOrElse(Max()), - ResultFormatSettings->RowsLimit.GetOrElse(Max()), &ysonTruncated); + queryResult.Data = protoBuilder.BuildYson(std::move(rows), resultFormatSettings->SizeLimit.GetOrElse(Max()), + resultFormatSettings->RowsLimit.GetOrElse(Max()), &ysonTruncated); queryResult.RowsCount = result.GetRowsCount(); queryResult.Truncated = result.GetTruncated() || ysonTruncated; @@ -1546,7 +1548,7 @@ class TRunActor : public NActors::TActorBootstrapped { *request.MutableSettings() = dqGraphParams.GetSettings(); *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); *request.MutableColumns() = dqGraphParams.GetColumns(); - PrepareResultFormatSettings(dqGraphParams, *dqConfiguration); + PrepareResultFormatSettings(info.ResultFormatSettings, dqGraphParams, *dqConfiguration); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId)); LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId); @@ -1585,7 +1587,7 @@ class TRunActor : public NActors::TActorBootstrapped { ExecuterId, dqGraphParams.GetResultType(), writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit)); - PrepareResultFormatSettings(dqGraphParams, *dqConfiguration); + PrepareResultFormatSettings(ResultFormatSettings, dqGraphParams, *dqConfiguration); } else { LOG_D("ResultWriter was NOT CREATED since ResultType is empty"); resultId = ExecuterId; @@ -1652,15 +1654,15 @@ class TRunActor : public NActors::TActorBootstrapped { LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId); } - void PrepareResultFormatSettings(NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) { - ResultFormatSettings.ConstructInPlace(); + void PrepareResultFormatSettings(TMaybe& resultFormatSettings, NFq::NProto::TGraphParams& dqGraphParams, const TDqConfiguration& dqConfiguration) { + resultFormatSettings.ConstructInPlace(); for (const auto& c : dqGraphParams.GetColumns()) { - ResultFormatSettings->Columns.push_back(c); + resultFormatSettings->Columns.push_back(c); } - ResultFormatSettings->ResultType = dqGraphParams.GetResultType(); - ResultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get(); - ResultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get(); + resultFormatSettings->ResultType = dqGraphParams.GetResultType(); + resultFormatSettings->SizeLimit = dqConfiguration._AllResultsBytesLimit.Get(); + resultFormatSettings->RowsLimit = dqConfiguration._RowsLimitPerWrite.Get(); } void ClearResultFormatSettings() { diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index 1e527706db24..159cbb180e13 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -1167,3 +1167,79 @@ def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefi issues = str(client.describe_query(query_id).result.query.issue) assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues + + @yq_v1 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_precompute_with_different_result_types(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''f1,f2,f3 +Banana,3,100''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file1.csv', ContentType='text/plain') + fruits = '''f3,f4 +Banana,3''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='file2.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + $input1 = + SELECT AGGREGATE_LIST( AsStruct(f1 AS dns_mining_pool)) + FROM `{storage_connection_name}`.`file1.csv` + WITH (format=csv_with_names, SCHEMA ( + f1 String NOT NULL, + f2 Int NOT NULL, + f3 Int NOT NULL + )); + + $input2 = + SELECT AGGREGATE_LIST( AsStruct(f3 AS dns_f_query, f4 AS dns_query1wewqwer) ) + FROM `{storage_connection_name}`.`file2.csv` + WITH (format=csv_with_names, SCHEMA ( + f3 String NOT NULL, + f4 Int NOT NULL + )); + + $f1 = () -> {{ + RETURN ListHead(ListMap( + $input1, + ($r) -> ( + AsStruct("1" AS dns_mining_pool) + ) + )) + }}; + + $f2 = () -> {{ + RETURN ListHead(ListMap( + $input2, + ($r) -> ( + AsStruct("2" AS dns_f_query) + ) + )) + }}; + + $parsed = SELECT $f1() AS f1, $f2() AS f2; + + $parsed = + SELECT + p.f1.dns_mining_pool AS f1, p.f2.dns_f_query AS f2 + FROM $parsed AS p; + + SELECT SOME("MinersPoolsViaDNS") AS event_class FROM $parsed + WHERE (f1 == f2 ) + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)