From 03ef2688dc7adb1fc0a977cd00f5e517408058b3 Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Fri, 14 Nov 2025 14:06:33 +0000 Subject: [PATCH 1/6] X[NEW RBO] Add YqlSelect support --- ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp | 52 ++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp index f2d862615361..e8c4eb8fa96b 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp @@ -154,6 +154,58 @@ Y_UNIT_TEST_SUITE(KqpRbo) { } } + Y_UNIT_TEST(FilterYql) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < 10; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("name").String(std::to_string(i) + "_name") + .EndStruct(); + } + rows.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/foo", rows.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + PRAGMA TablePathPrefix='/Root'; + PRAGMA YqlSelect = 'force'; + SELECT id as id2 FROM foo WHERE name != '3_name'; + )" + }; + + std::vector results = { + R"([["0"];["1"];["2"];["4"];["5"];["6"];["7"];["8"];["9"]])", + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + Y_UNIT_TEST(ConstantFolding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); From 28db4cd228943cdb57450107003497579b30fdf9 Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Tue, 18 Nov 2025 09:42:50 +0000 Subject: [PATCH 2/6] Intermediate commit --- ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 7 +- ydb/core/kqp/host/kqp_host.cpp | 1 + ydb/core/kqp/host/kqp_runner.cpp | 2 +- ydb/core/kqp/opt/kqp_type_ann.cpp | 13 +- ydb/core/kqp/opt/rbo/kqp_operator.cpp | 21 +- ydb/core/kqp/opt/rbo/kqp_operator.h | 3 + .../kqp/opt/rbo/kqp_plan_conversion_utils.cpp | 5 +- ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp | 21 +- ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp | 1018 +---------------- ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h | 8 +- ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp | 4 +- ydb/core/kqp/opt/rbo/kqp_rewrite_select.cpp | 873 ++++++++++++++ ydb/core/kqp/opt/rbo/ya.make | 1 + ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp | 5 +- 14 files changed, 954 insertions(+), 1028 deletions(-) create mode 100644 ydb/core/kqp/opt/rbo/kqp_rewrite_select.cpp diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index afea75f78d09..db14a4b487d6 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -900,7 +900,8 @@ "Match" : {"Type": "Callable", "Name": "KqpOpRoot"}, "Children": [ {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "ColumnOrder", "Type": "TCoAtomList"} + {"Index": 1, "Name": "ColumnOrder", "Type": "TCoAtomList"}, + {"Index": 2, "Name": "PgSyntax", "Type": "TCoAtom"} ] }, { @@ -930,9 +931,9 @@ ] }, { - "Name": "TKqpPgExprSublink", + "Name": "TKqpExprSublink", "Base": "TExprBase", - "Match" : {"Type": "Callable", "Name": "KqpPgExprSublink"}, + "Match" : {"Type": "Callable", "Name": "KqpExprSublink"}, "Children": [ {"Index": 0, "Name": "Expr", "Type": "TExprBase"} ] diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index a066b65d4220..cc711f1846f7 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1989,6 +1989,7 @@ class TKqpHost : public IKqpHost { const TGatewaysConfig* gatewaysConfig = nullptr; // TODO: can we get real gatewaysConfig here? auto allowSettings = [](TStringBuf settingName) { return settingName == "OrderedColumns" + || settingName == "DeriveColumnOrder" || settingName == "DisableOrderedColumns" || settingName == "Warning" || settingName == "UseBlocks" diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index b0b51fb14809..33de950b32f5 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -372,7 +372,7 @@ class TKqpRunner : public IKqpRunner { .AddPostTypeAnnotation(/* forSubgraph */ true) //.AddCommonOptimization() - .Add(CreateKqpPgRewriteTransformer(OptimizeCtx, *typesCtx), "RewritePgSelect") + .Add(CreateKqpRewriteSelectTransformer(OptimizeCtx, *typesCtx), "RewriteSelect") .Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, rboKqpTypeAnnTransformer, kqpTypeAnnTransformer, newRBOPhysicalPeepholeTransformer, funcRegistry), "NewRBOTransformer") .Add(CreateKqpRBOCleanupTransformer(*typesCtx), "RBOCleanupTransformer") diff --git a/ydb/core/kqp/opt/kqp_type_ann.cpp b/ydb/core/kqp/opt/kqp_type_ann.cpp index dc02c559b23c..b6d5baf52944 100644 --- a/ydb/core/kqp/opt/kqp_type_ann.cpp +++ b/ydb/core/kqp/opt/kqp_type_ann.cpp @@ -2251,8 +2251,8 @@ TStatus AnnotateTableSinkSettings(const TExprNode::TPtr& input, TExprContext& ct return TStatus::Ok; } -TStatus AnnotatePgExprSublink(const TExprNode::TPtr& node, TExprContext& ctx) { - auto expr = node->Child(TKqpPgExprSublink::idx_Expr); +TStatus AnnotateExprSublink(const TExprNode::TPtr& node, TExprContext& ctx) { + auto expr = node->Child(TKqpExprSublink::idx_Expr); auto itemType = expr->GetTypeAnn()->Cast()->GetItemType()->Cast(); auto valueType = itemType->GetItems()[0]->GetItemType(); if (!valueType->IsOptionalOrNull()) { @@ -2290,7 +2290,10 @@ TStatus AnnotateOpRead(const TExprNode::TPtr& node, TExprContext& ctx, const TSt TVector structItemTypes = rowType->Cast()->GetItems(); TVector newItemTypes; for (auto t : structItemTypes ) { - newItemTypes.push_back(ctx.MakeType("_alias_" + TString(alias->Content()) + "." + t->GetName(), t->GetItemType())); + TString aliasName = TString(alias->Content()); + TString columnName = TString(t->GetName()); + TString fullName = aliasName != "" ? ( "_alias_" + aliasName + "." + columnName ) : columnName; + newItemTypes.push_back(ctx.MakeType(fullName, t->GetItemType())); } YQL_CLOG(TRACE, CoreDq) << "Row type:" << *rowType; @@ -2710,8 +2713,8 @@ TAutoPtr CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateTableSinkSettings(input, ctx); } - if (TKqpPgExprSublink::Match(input.Get())) { - return AnnotatePgExprSublink(input, ctx); + if (TKqpExprSublink::Match(input.Get())) { + return AnnotateExprSublink(input, ctx); } if (TKqpOpRead::Match(input.Get())) { diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.cpp b/ydb/core/kqp/opt/rbo/kqp_operator.cpp index a63792af9a43..0d5acd58da27 100644 --- a/ydb/core/kqp/opt/rbo/kqp_operator.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_operator.cpp @@ -26,7 +26,7 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext &ctx, TVector(ctx, input->Pos()) - .Name().Build("_alias_" + iu.Alias + "." + iu.ColumnName) + .Name().Build(iu.GetFullName()) .Value() .Struct(arg) .Name().Build(iu.ColumnName) @@ -668,6 +668,19 @@ TVector TOpFilter::GetScalarSubplanIUs(TPlanProps& props) { return res; } +bool TestAndExtractEqualityPredicate(TExprNode::TPtr pred, TExprNode::TPtr& leftArg, TExprNode::TPtr& rightArg) { + if (pred->IsCallable("PgResolvedOp") && pred->Child(0)->Content() == "=") { + leftArg = pred->Child(2); + rightArg = pred->Child(3); + return true; + } else if (pred->IsCallable("==")) { + leftArg = pred->Child(0); + rightArg = pred->Child(1); + return true; + } + return false; +} + TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const { TConjunctInfo res; @@ -686,9 +699,9 @@ TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const { fromPg = true; } - if (conjObj->IsCallable("PgResolvedOp") && conjObj->Child(0)->Content() == "=") { - auto leftArg = conjObj->Child(2); - auto rightArg = conjObj->Child(3); + TExprNode::TPtr leftArg; + TExprNode::TPtr rightArg; + if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) { TVector conjIUs; GetAllMembers(conj, conjIUs, props); diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.h b/ydb/core/kqp/opt/rbo/kqp_operator.h index a8b55cec3733..5d7ef8c6faa8 100644 --- a/ydb/core/kqp/opt/rbo/kqp_operator.h +++ b/ydb/core/kqp/opt/rbo/kqp_operator.h @@ -234,6 +234,7 @@ struct TPlanProps { TStageGraph StageGraph; int InternalVarIdx = 1; TScalarSubplans ScalarSubplans; + bool PgSyntax = false; }; @@ -411,6 +412,8 @@ class TOpFilter : public IUnaryOperator { TExprNode::TPtr FilterLambda; }; +bool TestAndExtractEqualityPredicate(TExprNode::TPtr pred, TExprNode::TPtr& leftArg, TExprNode::TPtr& rightArg); + class TOpJoin : public IBinaryOperator { public: TOpJoin(std::shared_ptr leftArg, std::shared_ptr rightArg, TPositionHandle pos, TString joinKind, diff --git a/ydb/core/kqp/opt/rbo/kqp_plan_conversion_utils.cpp b/ydb/core/kqp/opt/rbo/kqp_plan_conversion_utils.cpp index 7427fda61a0d..e0ef03febd1e 100644 --- a/ydb/core/kqp/opt/rbo/kqp_plan_conversion_utils.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_plan_conversion_utils.cpp @@ -12,7 +12,7 @@ TExprNode::TPtr PlanConverter::RemoveScalarSubplans(TExprNode::TPtr node) { auto lambda = TCoLambda(node); auto lambdaBody = lambda.Body().Ptr(); - auto exprSublinks = FindNodes(lambdaBody, [](const TExprNode::TPtr& n){return n->IsCallable("KqpPgExprSublink");}); + auto exprSublinks = FindNodes(lambdaBody, [](const TExprNode::TPtr& n){return n->IsCallable("KqpExprSublink");}); if (exprSublinks.empty()) { return node; } @@ -26,7 +26,7 @@ TExprNode::TPtr PlanConverter::RemoveScalarSubplans(TExprNode::TPtr node) { .Name().Value(sublinkVar.GetFullName()).Build() .Done().Ptr(); replaceMap[link.Get()] = member; - auto subplan = ExprNodeToOperator(TKqpPgExprSublink(link).Expr().Ptr()); + auto subplan = ExprNodeToOperator(TKqpExprSublink(link).Expr().Ptr()); PlanProps.ScalarSubplans.Add(sublinkVar, subplan); } @@ -53,6 +53,7 @@ TOpRoot PlanConverter::ConvertRoot(TExprNode::TPtr node) { auto res = TOpRoot(rootInput, node->Pos(), columnOrder); res.Node = node; res.PlanProps = PlanProps; + res.PlanProps.PgSyntax = std::stoi(opRoot.PgSyntax().StringValue()); return res; } diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp index b829fe414726..a3fe02a34e87 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp @@ -67,14 +67,14 @@ TExprNode::TPtr FindMemberArg(TExprNode::TPtr input) { return TExprNode::TPtr(); } -TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVector conjuncts, TExprContext &ctx) { +TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVector conjuncts, TExprContext &ctx, bool pgSyntax) { auto arg = Build(ctx, pos).Name("lambda_arg").Done(); TExprNode::TPtr lambda; if (conjuncts.size() == 1) { auto filterInfo = conjuncts[0]; auto body = ReplaceArg(filterInfo.FilterBody, arg.Ptr(), ctx); - if (!filterInfo.FromPg) { + if (pgSyntax && !filterInfo.FromPg) { body = ctx.Builder(body->Pos()).Callable("FromPg").Add(0, body).Seal().Build(); } @@ -89,7 +89,7 @@ TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVectorPos()).Callable("FromPg").Add(0, body).Seal().Build(); } newConjuncts.push_back(ReplaceArg(body, arg.Ptr(), ctx)); @@ -176,9 +176,10 @@ bool TExtractJoinExpressionsRule::TestAndApply(std::shared_ptr &input predicate = predicate->Child(0); } - if (predicate->IsCallable("PgResolvedOp") && predicate->Child(0)->Content() == "=") { - auto leftSide = predicate->Child(2); - auto rightSide = predicate->Child(3); + TExprNode::TPtr leftSide; + TExprNode::TPtr rightSide; + + if (TestAndExtractEqualityPredicate(predicate, leftSide, rightSide)) { if (leftSide->IsCallable("Member") && rightSide->IsCallable("Member")) { continue; @@ -489,7 +490,7 @@ std::shared_ptr TPushFilterRule::SimpleTestAndApply(const std::shared auto rightInput = join->GetRightInput(); if (pushLeft.size()) { - auto leftLambda = BuildFilterLambdaFromConjuncts(leftInput->Pos, pushLeft, ctx.ExprCtx); + auto leftLambda = BuildFilterLambdaFromConjuncts(leftInput->Pos, pushLeft, ctx.ExprCtx, props.PgSyntax); leftInput = std::make_shared(leftInput, input->Pos, leftLambda); } @@ -504,14 +505,14 @@ std::shared_ptr TPushFilterRule::SimpleTestAndApply(const std::shared } } if (predicatesForRightSide.size()) { - auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx); + auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx, props.PgSyntax); rightInput = std::make_shared(rightInput, input->Pos, rightLambda); join->JoinKind = "Inner"; } else { return input; } } else { - auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx); + auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx, props.PgSyntax); rightInput = std::make_shared(rightInput, input->Pos, rightLambda); } } @@ -524,7 +525,7 @@ std::shared_ptr TPushFilterRule::SimpleTestAndApply(const std::shared join->Children[1] = rightInput; if (topLevelPreds.size()) { - auto topFilterLambda = BuildFilterLambdaFromConjuncts(join->Pos, topLevelPreds, ctx.ExprCtx); + auto topFilterLambda = BuildFilterLambdaFromConjuncts(join->Pos, topLevelPreds, ctx.ExprCtx, props.PgSyntax); output = std::make_shared(join, input->Pos, topFilterLambda); } else { output = join; diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp index 8e932f37ad97..e6874965b5c7 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp @@ -10,990 +10,6 @@ using namespace NKikimr::NKqp; using namespace NYql::NDq; namespace { -struct TJoinTableAliases { - THashSet LeftSideAliases; - THashSet RightSideAliases; -}; - -struct TAggregationTraits { - TVector AggTraitsList; - TVector KeyColumns; -}; - -THashSet SupportedAggregationFunctions{"sum", "min", "max", "count"}; -ui64 KqpUniqueAggColumnId{0}; - -TJoinTableAliases GatherJoinAliasesLeftSideMultiInputs(const TVector &joinKeys, const THashSet &processedInputs) { - TJoinTableAliases joinAliases; - for (const auto &joinKey : joinKeys) { - if (processedInputs.count(joinKey.Alias)) { - joinAliases.LeftSideAliases.insert(joinKey.Alias); - } else { - joinAliases.RightSideAliases.insert(joinKey.Alias); - } - } - Y_ENSURE(joinAliases.LeftSideAliases.size(), "Left side of the join inputs are empty"); - Y_ENSURE(joinAliases.RightSideAliases.size() == 1, "Right side of the join should have only one input"); - return joinAliases; -} - -TJoinTableAliases GatherJoinAliasesTwoInputs(const TVector &joinKeys) { - TJoinTableAliases joinAliases; - for (ui32 i = 0; i < joinKeys.size(); i += 2) { - joinAliases.LeftSideAliases.insert(joinKeys[i].Alias); - joinAliases.RightSideAliases.insert(joinKeys[i + 1].Alias); - } - - Y_ENSURE(joinAliases.LeftSideAliases.size() == 1, "Left side of the join should have only one input"); - Y_ENSURE(joinAliases.RightSideAliases.size() == 1, "Right side of the join should have only one input"); - return joinAliases; -} - -TExprNode::TPtr BuildJoinKeys(const TVector &joinKeys, const TJoinTableAliases &joinAliases, THashSet &processedInputs, - TExprContext &ctx, TPositionHandle pos) { - Y_ENSURE(joinKeys.size() >= 2 && !(joinKeys.size() & 1), "Invalid join key size"); - TVector keys; - for (ui32 i = 0; i < joinKeys.size(); i += 2) { - auto leftSideKey = joinKeys[i]; - auto rightSideKey = joinKeys[i + 1]; - if (joinAliases.LeftSideAliases.count(rightSideKey.Alias)) { - std::swap(leftSideKey, rightSideKey); - } - // clang-format off - keys.push_back(Build(ctx, pos) - .LeftLabel() - .Value(leftSideKey.Alias) - .Build() - .LeftColumn() - .Value(leftSideKey.ColumnName) - .Build() - .RightLabel() - .Value(rightSideKey.Alias) - .Build() - .RightColumn() - .Value(rightSideKey.ColumnName) - .Build() - .Done()); - // clang-format on - processedInputs.insert(leftSideKey.Alias); - processedInputs.insert(rightSideKey.Alias); - } - return Build(ctx, pos).Add(keys).Done().Ptr(); -} - -TExprNode::TPtr BuildAggregationTraits(const TString& originalColName, const TString& resultColName, const TString& aggFunction, - const TTypeAnnotationNode* resultType, TExprContext &ctx, TPositionHandle pos) { - // clang-format off - return Build(ctx, pos) - .OriginalColName() - .Value(originalColName) - .Build() - .AggregationFunction() - .Value(aggFunction) - .Build() - .ResultColName() - .Value(resultColName) - .Build() - .AggregationFunctionResultType(ExpandType(pos, *resultType, ctx)) - .Done().Ptr(); - // clang-format on -} - -TExprNode::TPtr BuildAggregate(TExprNode::TPtr resultExpr, const TVector& keys, const TVector& aggTraitsList, - bool distinctAll, - TExprContext& ctx, TPositionHandle pos) { - TVector keyColumns; - for (const auto& column : keys) { - // clang-format off - auto keyColumn = Build(ctx, pos) - .Value(column.GetFullName()) - .Done(); - // clang-format on - keyColumns.push_back(keyColumn); - } - - // clang-format off - return Build(ctx, pos) - .Input(resultExpr) - .AggregationTraitsList() - .Add(aggTraitsList) - .Build() - .KeyColumns() - .Add(keyColumns) - .Build() - .DistinctAll() - .Value(distinctAll ? "True" : "False") - .Build() - .Done().Ptr(); - // clang-format on -} - -void BuildSimpleMapElementLambda(TExprNode::TPtr resultExpr, const TVector>& renamesMap, TVector& mapElements, - TExprContext& ctx, TPositionHandle pos) { - for (const auto& [colName, rename] : renamesMap) { - // clang-format off - // Just wrap by member - auto lambda = ctx.Builder(pos) - .Lambda() - .Param("arg") - .Callable("Member") - .Arg(0, "arg") - .Atom(1, colName.GetFullName()) - .Seal() - .Seal().Build(); - - mapElements.push_back(Build(ctx, pos) - .Input(resultExpr) - .Variable() - .Value(colName.GetFullName()) - .Build() - .Lambda(lambda) - .Done().Ptr()); - // clang-format on - } -} - -TExprNode::TPtr BuildExpressionMap(TExprNode::TPtr resultExpr, - const TVector>& aggFieldsExpressionsMap, - const TVector>& aggFieldsRenamesMap, - const TVector>& groupByKeysRenamesMap, - const THashMap>& groupByKeysExpressionsMap, - TExprContext& ctx, TPositionHandle pos) { - // Add expressions - TVector mapElements; - for (const auto& [colName, expr] : aggFieldsExpressionsMap) { - // clang-format off - mapElements.push_back(Build(ctx, pos) - .Input(resultExpr) - .Variable() - .Value(colName.GetFullName()) - .Build() - .Lambda(expr) - .Done().Ptr()); - // clang-format on - } - - // Add other columns and wrap them with member. - BuildSimpleMapElementLambda(resultExpr, aggFieldsRenamesMap, mapElements, ctx, pos); - BuildSimpleMapElementLambda(resultExpr, groupByKeysRenamesMap, mapElements, ctx, pos); - - // Add expressions for group by keys. - for (const auto& [_, pair] : groupByKeysExpressionsMap) { - // clang-format off - mapElements.push_back(Build(ctx, pos) - .Input(resultExpr) - .Variable() - .Value(pair.first.GetFullName()) - .Build() - .Lambda(pair.second) - .Done().Ptr()); - // clang-format on - } - - // clang-format off - return Build(ctx, pos) - .Input(resultExpr) - .MapElements() - .Add(mapElements) - .Build() - .Project() - .Value("true") - .Build() - .Done().Ptr(); - // clang-format on -} - -void BuildMapElementRename(TExprNode::TPtr resultExpr, const TVector>& renamesMap, - TVector& mapElements, TExprContext& ctx, TPositionHandle pos) { - for (const auto& [colName, newColName] : renamesMap) { - // clang-format off - mapElements.push_back(Build(ctx, pos) - .Input(resultExpr) - .Variable() - .Value(newColName.GetFullName()) - .Build() - .From() - .Value(colName.GetFullName()) - .Build() - .Done().Ptr()); - // clang-format on - } -} - -TExprNode::TPtr BuildRenameMap(TExprNode::TPtr resultExpr, const TVector>& aggFieldsRenamesMap, - const TVector>& groupByKeysRenamesMap, TExprContext& ctx, TPositionHandle pos) { - TVector mapElements; - BuildMapElementRename(resultExpr, aggFieldsRenamesMap, mapElements, ctx, pos); - BuildMapElementRename(resultExpr, groupByKeysRenamesMap, mapElements, ctx, pos); - - // clang-format off - return Build(ctx, pos) - .Input(resultExpr) - .MapElements() - .Add(mapElements) - .Build() - .Project() - .Value("false") - .Build() - .Done().Ptr(); - // clang-format on -} - -TString GenerateUniqueColumnName(const TString &colName) { - TStringBuilder strBuilder; - strBuilder << "_kqp_agg_input_"; - strBuilder << colName; - strBuilder << "_"; - strBuilder << ToString(KqpUniqueAggColumnId++); - return strBuilder; -} - -void ToCamelCase(std::string& s) { - char previous = ' '; - auto f = [&](char current) { - char result = (std::isblank(previous) && std::isalpha(current)) ? std::toupper(current) : std::tolower(current); - previous = current; - return result; - }; - std::transform(s.begin(), s.end(), s.begin(), f); -} - -TExprNode::TPtr GetPgCallable(TExprNode::TPtr input, const TString& callableName) { - auto isPgCallable = [&](const TExprNode::TPtr& node) -> bool { - if (node->IsCallable(callableName)) { - return true; - } - return false; - }; - - return FindNode(input, isPgCallable); -} - -TExprNode::TPtr GetAtom(TExprNode::TPtr input, const TString& atomName) { - auto isPgAtom = [&](const TExprNode::TPtr& node) -> bool { - if (node->IsAtom(atomName)) { - return true; - } - return false; - }; - - return FindNode(input, isPgAtom); -} - -TExprNode::TPtr ReplacePgOps(TExprNode::TPtr input, TExprContext &ctx) { - if (input->IsLambda()) { - auto lambda = TCoLambda(input); - - // clang-format off - return Build(ctx, input->Pos()) - .Args(lambda.Args()) - .Body(ReplacePgOps(lambda.Body().Ptr(), ctx)) - .Done().Ptr(); - // clang-format on - } else if (input->IsCallable("PgAnd")) { - // clang-format off - return ctx.Builder(input->Pos()) - .Callable("ToPg") - .Callable(0, "And") - .Callable(0, "FromPg") - .Add(0, ReplacePgOps(input->ChildPtr(0), ctx)) - .Seal() - .Callable(1, "FromPg") - .Add(0, ReplacePgOps(input->ChildPtr(1), ctx)) - .Seal() - .Seal() - .Seal() - .Build(); - // clang-format on - - } else if (input->IsCallable("PgOr")) { - // clang-format off - return ctx.Builder(input->Pos()) - .Callable("ToPg") - .Callable(0, "Or") - .Callable(0, "FromPg") - .Add(0, ReplacePgOps(input->ChildPtr(0), ctx)) - .Seal() - .Callable(1, "FromPg") - .Add(0, ReplacePgOps(input->ChildPtr(1), ctx)) - .Seal() - .Seal() - .Seal() - .Build(); - // clnag-format on - } else if (input->IsCallable()){ - TVector newChildren; - for (auto c : input->Children()) { - newChildren.push_back(ReplacePgOps(c, ctx)); - } - // clang-format off - return ctx.Builder(input->Pos()) - .Callable(input->Content()) - .Add(std::move(newChildren)) - .Seal() - .Build(); - // clang-format on - } else if (input->IsList()) { - TVector newChildren; - for (auto c : input->Children()) { - newChildren.push_back(ReplacePgOps(c, ctx)); - } - // clang-format off - return ctx.Builder(input->Pos()) - .List() - .Add(std::move(newChildren)) - .Seal() - .Build(); - // clang-format on - } else { - return input; - } -} - -TExprNode::TPtr BuildSort(TExprNode::TPtr input, TExprNode::TPtr sort, TExprContext &ctx) { - TVector sortElements; - - for (auto sortItem : sort->Child(1)->Children()) { - auto sortLambda = sortItem->Child(1); - auto direction = sortItem->Child(2); - auto nullsFirst = sortItem->Child(3); - - // clang-format off - sortElements.push_back(Build(ctx, input->Pos()) - .Input(input) - .Direction(direction) - .NullsFirst(nullsFirst) - .Lambda(sortLambda) - .Done().Ptr()); - // clang-format on - } - - // clang-format off - return Build(ctx, input->Pos()) - .Input(input) - .SortExpressions().Add(sortElements).Build() - .Done().Ptr(); - // clang-format off -} - -TExprNode::TPtr RewritePgSelect(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext &typeCtx) { - Y_UNUSED(typeCtx); - - TVector finalColumnOrder; - - auto setItems = GetSetting(node->Head(), "set_items")->TailPtr(); - TVector setItemsResults; - for (ui32 i = 0; i < setItems->ChildrenSize(); ++i) { - auto setItem = setItems->ChildPtr(i); - - TVector resultElements; - // In pg syntax duplicate attributes are allowed in the results, but we need to rename them - // We use the counters for this purpose - THashMap resultElementCounters; - - TExprNode::TPtr joinExpr; - TExprNode::TPtr filterExpr; - TExprNode::TPtr lastAlias; - - auto from = GetSetting(setItem->Tail(), "from"); - THashMap aliasToInputMap; - TVector inputsInOrder; - - if (from) { - for (auto fromItem : from->Child(1)->Children()) { - // From item can be a table read with an alias or a subquery with an alias - // In case of a subquery, we have already translated PgSelect of the nested subquery - // so we just need to remove TKqpOpRoot and plug in the translated subquery - - auto childExpr = fromItem->ChildPtr(0); - auto alias = fromItem->Child(1); - TExprNode::TPtr fromExpr; - - if (TKqpOpRoot::Match(childExpr.Get())) { - auto opRoot = TKqpOpRoot(childExpr); - - TVector subqueryElements; - - // We need to rename all the IUs in the subquery to reflect the new alias - auto subqueryType = childExpr->GetTypeAnn()->Cast()->GetItemType()->Cast(); - for (auto item : subqueryType->GetItems()) { - auto orig = TString(item->GetName()); - auto unit = TInfoUnit(orig); - auto renamedUnit = TInfoUnit(TString(alias->Content()), unit.ColumnName); - - // clang-format off - subqueryElements.push_back(Build(ctx, node->Pos()) - .Input(opRoot.Input()) - .Variable().Value(renamedUnit.GetFullName()).Build() - .From().Value(unit.GetFullName()).Build() - .Done().Ptr()); - // clang-format on - } - - // clang-format off - fromExpr = Build(ctx, node->Pos()) - .Input(opRoot.Input()) - .MapElements().Add(subqueryElements).Build() - .Project().Value("true").Build() - .Done().Ptr(); - // clang-format on - } - - else { - auto readExpr = TKqlReadTableRanges(childExpr); - - // clang-format off - fromExpr = Build(ctx, node->Pos()) - .Table(readExpr.Table()) - .Alias(alias) - .Columns(readExpr.Columns()) - .Done().Ptr(); - // clang-format on - } - - aliasToInputMap.insert({TString(alias->Content()), fromExpr}); - inputsInOrder.push_back(fromExpr); - lastAlias = alias; - } - } - - THashSet processedInputs; - auto joinOps = GetSetting(setItem->Tail(), "join_ops"); - if (joinOps) { - for (ui32 i = 0; i < joinOps->Tail().ChildrenSize(); ++i) { - ui32 tableInputsCount = 0; - auto tuple = joinOps->Tail().Child(i); - for (ui32 j = 0; j < tuple->ChildrenSize(); ++j) { - auto join = tuple->Child(j); - auto joinType = join->Child(0)->Content(); - if (joinType == "push") { - ++tableInputsCount; - continue; - } - - Y_ENSURE(join->ChildrenSize() > 1 && join->Child(1)->ChildrenSize() > 1); - auto pgResolvedOps = FindNodes(join->Child(1)->Child(1)->TailPtr(), [](const TExprNode::TPtr &node) { - if (node->IsCallable("PgResolvedOp")) { - return true; - } else { - return false; - } - }); - - // FIXME: join on clause may include expressions, we need to handle this case - TVector joinKeys; - for (const auto &pgResolvedOp : pgResolvedOps) { - TVector keys; - GetAllMembers(pgResolvedOp, keys); - joinKeys.insert(joinKeys.end(), keys.begin(), keys.end()); - } - - TJoinTableAliases joinAliases; - TExprNode::TPtr leftInput; - TExprNode::TPtr rightInput; - - if (tableInputsCount == 2) { - joinAliases = GatherJoinAliasesTwoInputs(joinKeys); - const auto leftSideAlias = *joinAliases.LeftSideAliases.begin(); - const auto rightSideAlias = *joinAliases.RightSideAliases.begin(); - Y_ENSURE(aliasToInputMap.count(leftSideAlias), "Left side alias is not present in input tables"); - Y_ENSURE(aliasToInputMap.count(rightSideAlias), "Right sided alias is not present input tables"); - leftInput = aliasToInputMap[leftSideAlias]; - rightInput = aliasToInputMap[rightSideAlias]; - } else if (tableInputsCount == 1) { - joinAliases = GatherJoinAliasesLeftSideMultiInputs(joinKeys, processedInputs); - const auto rightSideAlias = *joinAliases.RightSideAliases.begin(); - Y_ENSURE(aliasToInputMap.contains(rightSideAlias), "Right side alias is not present in input tables"); - leftInput = joinExpr; - rightInput = aliasToInputMap[rightSideAlias]; - } - - auto joinKind = TString(joinType); - ToCamelCase(joinKind.MutRef()); - - // clang-format off - joinExpr = Build(ctx, node->Pos()) - .LeftInput(leftInput) - .RightInput(rightInput) - .JoinKind() - .Value(joinKind) - .Build() - .JoinKeys(BuildJoinKeys(joinKeys, joinAliases, processedInputs, ctx, node->Pos())) - .Done().Ptr(); - // clang-format on - tableInputsCount = 0; - } - } - - // Build in order - if (!joinExpr) { - ui32 inputIndex = 0; - if (inputsInOrder.size() > 1) { - while (inputIndex < inputsInOrder.size()) { - auto leftTableInput = inputIndex == 0 ? inputsInOrder[inputIndex] : joinExpr; - auto rightTableInput = inputIndex == 0 ? inputsInOrder[inputIndex + 1] : inputsInOrder[inputIndex]; - auto joinKeys = Build(ctx, node->Pos()).Done(); - // clang-format off - joinExpr = Build(ctx, node->Pos()) - .LeftInput(leftTableInput) - .RightInput(rightTableInput) - .JoinKind() - .Value("Cross") - .Build() - .JoinKeys(joinKeys) - .Done().Ptr(); - // clang-format on - inputIndex += (inputIndex == 0 ? 2 : 1); - } - } else { - joinExpr = inputsInOrder.front(); - } - } - } - - filterExpr = joinExpr; - - auto where = GetSetting(setItem->Tail(), "where"); - - if (where) { - TExprNode::TPtr lambda = where->Child(1)->Child(1); - lambda = ReplacePgOps(lambda, ctx); - // clang-format off - filterExpr = Build(ctx, node->Pos()) - .Input(filterExpr) - .Lambda(lambda) - .Done().Ptr(); - // clang-format on - } - - if (!filterExpr) { - filterExpr = Build(ctx, node->Pos()).Done().Ptr(); - } - - // Columns to for `PgCast` - THashSet aggregationColumnsRequireCastToPgType; - // Group by fields for renames or expressions. - TVector> groupByKeysRenamesMap; - THashMap> groupByKeysExpressionsMap; - // Aggregate. - TAggregationTraits aggTraits; - // Pre/Post distinct aggregations. - TAggregationTraits distinctAggregationTraitsPreAggregate; - TAggregationTraits distinctAggregationTraitsPostAggregate; - auto groupOps = GetSetting(setItem->Tail(), "group_exprs"); - if (groupOps) { - const auto groupByList = groupOps->TailPtr(); - for (ui32 i = 0; i < groupByList->ChildrenSize(); ++i) { - auto lambda = TCoLambda(ctx.DeepCopyLambda(*(groupByList->ChildPtr(i)->Child(1)))); - auto body = lambda.Body().Ptr(); - auto pgResolvedOp = GetPgCallable(lambda.Body().Ptr(), "PgResolvedOp"); - // Expression for group by keys. - if (pgResolvedOp) { - auto fromPg = ctx.NewCallable(node->Pos(), "FromPg", {pgResolvedOp}); - - // clang-format off - auto groupExprLambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body(fromPg) - .Done().Ptr(); - // clang-format on - - const auto newColName = TInfoUnit(GenerateUniqueColumnName("_group_expr_")); - groupByKeysExpressionsMap[i] = std::make_pair(newColName, groupExprLambda); - aggTraits.KeyColumns.push_back(newColName); - } else { - TVector keys; - GetAllMembers(body, keys); - Y_ENSURE(keys.size() == 1, "Invalid size of the group keys."); - const auto groupKeyName = keys.front(); - aggTraits.KeyColumns.push_back(groupKeyName); - groupByKeysRenamesMap.push_back({groupKeyName, groupKeyName}); - } - } - } - - const bool distinctAll = !!GetSetting(setItem->Tail(), "distinct_all"); - auto result = GetSetting(setItem->Tail(), "result"); - Y_ENSURE(result); - auto finalType = node->GetTypeAnn()->Cast()->GetItemType()->Cast(); - - // Unique column names. - THashSet uniqueColumnNames; - bool needRenameMap = false; - // Aggregations. - TVector> aggFieldsRenamesMap; - TVector> aggFieldsExpressionsMap; - bool distinctPreAggregate = false; - for (ui32 i = 0; i < result->Child(1)->ChildrenSize(); ++i) { - const auto resultItem = result->Child(1)->ChildPtr(i); - auto lambda = TCoLambda(ctx.DeepCopyLambda(*(resultItem->Child(2)))); - auto resultColName = TString(resultItem->Child(0)->Content()); - const auto *aggFuncResultType = finalType->FindItemType(resultColName); - Y_ENSURE(aggFuncResultType, "Cannot find type for aggregation result."); - - auto pgAgg = GetPgCallable(lambda.Body().Ptr(), "PgAgg"); - if (pgAgg) { - // Collect original column names processing `PgAgg` callable. - TInfoUnit originalColName; - TInfoUnit renamedColName; - - auto lambdaBody = lambda.Body().Ptr(); - auto pgResolvedOp = GetPgCallable(lambda.Body().Ptr(), "PgResolvedOp"); - if (pgResolvedOp && !lambdaBody->IsCallable("PgResolvedOp")) { - // Aggregation on expression f(a x b). - // We pull expression outside a given aggregation and rename result of a given expression with unique name - // to later process result with aggregate function. - // For example: (a x b) as uqique_result_name -> f(unique_result_name) - auto fromPg = ctx.NewCallable(node->Pos(), "FromPg", {pgResolvedOp}); - - // clang-format off - auto exprLambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body(fromPg) - .Done().Ptr(); - // clang-format on - - // Just any unique name for expression result, physical plan should be AsSturct(`unique_name (expression)). - originalColName = TInfoUnit(GenerateUniqueColumnName("_expr_")); - renamedColName = originalColName; - aggFieldsExpressionsMap.push_back({originalColName, exprLambda}); - } else { - // Either an aggregation f(a) or expression on aggregation f(a) x b. - // Here we want to get just a column name for aggregation. - Y_ENSURE(pgAgg->ChildrenSize() == 3, "Invalid children size for `PgAgg`"); - auto toPg = pgAgg->ChildPtr(2); - Y_ENSURE(toPg->IsCallable("ToPg") && toPg->ChildPtr(0)->IsCallable("Member"), "PgAgg not a member"); - auto member = TCoMember(toPg->ChildPtr(0)); - originalColName = TInfoUnit(member.Name().StringValue()); - renamedColName = originalColName; - - // Aggregation columns should be unique, so we have to add rename map. - // For example f(a), g(a) => map((a -> a), (a -> a_0)) -> f(a), g(a_0). - if (uniqueColumnNames.count(originalColName.GetFullName())) { - renamedColName = TInfoUnit(originalColName.Alias, GenerateUniqueColumnName(originalColName.ColumnName)); - needRenameMap = true; - } - aggFieldsRenamesMap.push_back({originalColName, renamedColName}); - } - uniqueColumnNames.insert(renamedColName.GetFullName()); - - // Distinct for column or expression f(distinct a) => distinct a as result -> f(result). - if (!!GetAtom(pgAgg->ChildPtr(1), "distinct")) { - const auto colName = renamedColName.GetFullName(); - auto distinctAggTraits = - BuildAggregationTraits(colName, colName, "distinct", aggFuncResultType, ctx, node->Pos()); - distinctAggregationTraitsPreAggregate.AggTraitsList.push_back(distinctAggTraits); - distinctAggregationTraitsPreAggregate.KeyColumns.push_back(renamedColName); - distinctPreAggregate = true; - } - - // Aggregation on pg columns requires type cast from yql type to pg type. - aggregationColumnsRequireCastToPgType.insert(resultColName); - const TString aggFuncName = TString(pgAgg->ChildPtr(0)->Content()); - // Build an aggregation traits. - auto aggregationTraits = BuildAggregationTraits(renamedColName.GetFullName(), resultColName, aggFuncName, - aggFuncResultType, ctx, node->Pos()); - aggTraits.AggTraitsList.push_back(aggregationTraits); - - // Case for distinct after aggregation. - if (distinctAll) { - auto distinctAggTraits = - BuildAggregationTraits(resultColName, resultColName, "distinct", aggFuncResultType, ctx, node->Pos()); - distinctAggregationTraitsPostAggregate.AggTraitsList.push_back(distinctAggTraits); - distinctAggregationTraitsPostAggregate.KeyColumns.push_back(TInfoUnit(resultColName)); - } - // This case covers distinct all on just columns without aggregation functions. - } else if (!pgAgg && distinctAll) { - auto pgGroupRef = GetPgCallable(lambda.Body().Ptr(), "PgGroupRef"); - TInfoUnit colName; - if (pgGroupRef) { - if (pgGroupRef->ChildrenSize() == 4) { - colName = TInfoUnit(TString(pgGroupRef->ChildPtr(3)->Content())); - } else { - Y_ENSURE(false, "Invalid column size"); - } - } else { - TVector originalColNames; - GetAllMembers(resultItem->ChildPtr(2), originalColNames); - Y_ENSURE(originalColNames.size() == 1, "Invalid column size for aggregation columns."); - colName = originalColNames.front(); - } - aggregationColumnsRequireCastToPgType.insert(colName.ColumnName); - - auto distinctAggTraits = - BuildAggregationTraits(colName.GetFullName(), colName.GetFullName(), "distinct", aggFuncResultType, ctx, node->Pos()); - distinctAggregationTraitsPostAggregate.AggTraitsList.push_back(distinctAggTraits); - distinctAggregationTraitsPostAggregate.KeyColumns.push_back(colName); - } - } - - // Distinct pre aggregate fro group by keys. - if (distinctPreAggregate) { - for (const auto& key : aggTraits.KeyColumns) { - const auto colName = key.GetFullName(); - const auto* aggFuncResultType = finalType->FindItemType(key.ColumnName); - Y_ENSURE(aggFuncResultType, "Cannot find type for aggregation result"); - auto distinctAggTraits = BuildAggregationTraits(colName, colName, "distinct", aggFuncResultType, ctx, node->Pos()); - distinctAggregationTraitsPreAggregate.AggTraitsList.push_back(distinctAggTraits); - distinctAggregationTraitsPreAggregate.KeyColumns.push_back(colName); - aggregationColumnsRequireCastToPgType.insert(key.ColumnName); - } - } - - // Distinct post aggregate for group by keys. - if (distinctAll) { - for (const auto& key : aggTraits.KeyColumns) { - const auto colName = key.GetFullName(); - const auto* aggFuncResultType = finalType->FindItemType(key.ColumnName); - // agg key in result set. - if (aggFuncResultType) { - auto distinctAggTraits = BuildAggregationTraits(colName, colName, "distinct", aggFuncResultType, ctx, node->Pos()); - distinctAggregationTraitsPostAggregate.AggTraitsList.push_back(distinctAggTraits); - distinctAggregationTraitsPostAggregate.KeyColumns.push_back(colName); - aggregationColumnsRequireCastToPgType.insert(key.ColumnName); - } - } - } - - TExprNode::TPtr resultExpr = filterExpr; - // In case we have a multiple consumers for the single column we have to map and rename it. - if (needRenameMap) { - resultExpr = BuildRenameMap(resultExpr, aggFieldsRenamesMap, groupByKeysRenamesMap, ctx, node->Pos()); - } - // In case we have an expression for aggregation - f(a + b ..) or group by. - if (!aggFieldsExpressionsMap.empty() || !groupByKeysExpressionsMap.empty()) { - resultExpr = BuildExpressionMap(resultExpr, aggFieldsExpressionsMap, aggFieldsRenamesMap, groupByKeysRenamesMap, - groupByKeysExpressionsMap, ctx, node->Pos()); - } - // Build distinct aggregate pre aggregate. - if (!distinctAggregationTraitsPreAggregate.AggTraitsList.empty()) { - resultExpr = BuildAggregate(resultExpr, distinctAggregationTraitsPreAggregate.KeyColumns, - distinctAggregationTraitsPreAggregate.AggTraitsList, true, ctx, node->Pos()); - } - // Build Aggreegate. - if (!aggTraits.AggTraitsList.empty()) { - resultExpr = BuildAggregate(resultExpr, aggTraits.KeyColumns, aggTraits.AggTraitsList, false, ctx, node->Pos()); - } - // Build distinct aggregate post aggregate. - if (!distinctAggregationTraitsPostAggregate.AggTraitsList.empty()) { - resultExpr = BuildAggregate(resultExpr, distinctAggregationTraitsPostAggregate.KeyColumns, - distinctAggregationTraitsPostAggregate.AggTraitsList, true, ctx, node->Pos()); - } - - finalColumnOrder.clear(); - - for (auto resultItem : result->Child(1)->Children()) { - auto column = resultItem->Child(0); - TString columnName = TString(column->Content()); - - const auto expectedTypeNode = finalType->FindItemType(columnName); - Y_ENSURE(expectedTypeNode); - const auto expectedType = expectedTypeNode->Cast(); - const auto actualTypeNode = resultItem->GetTypeAnn(); - - YQL_CLOG(TRACE, CoreDq) << "Actual type for column: " << columnName << " is: " << *actualTypeNode; - YQL_CLOG(TRACE, CoreDq) << "Expected type for column: " << columnName << " is: " << *expectedTypeNode; - - ui32 actualPgTypeId; - bool convertToPg; - Y_ENSURE(ExtractPgType(actualTypeNode, actualPgTypeId, convertToPg, node->Pos(), ctx)); - - bool needPgCast = (expectedType->GetId() != actualPgTypeId); - auto lambda = TCoLambda(ctx.DeepCopyLambda(*(resultItem->Child(2)))); - bool needPgCastForAgg = aggregationColumnsRequireCastToPgType.count(columnName); - - auto pgAgg = GetPgCallable(lambda.Body().Ptr(), "PgAgg"); - if (pgAgg) { - Y_ENSURE(SupportedAggregationFunctions.count(pgAgg->ChildPtr(0)->Content()), - "Aggregation function " + TString(pgAgg->ChildPtr(0)->Content()) + " is not supported "); - - // clang-format off - auto newBody = Build(ctx, node->Pos()) - .Struct(lambda.Args().Arg(0)) - .Name() - .Value(columnName) - .Build() - .Done().Ptr(); - // clang-format on - - auto lambdaBody = lambda.Body().Ptr(); - // Build a projection lambda, we do not need `PgAgg` inside. - if (lambdaBody->IsCallable("PgResolvedOp")) { - // Replace PgResolvedOp(PgAgg(arg)) -> PgResolvedOp(PgCast(ToPg(Member(arg, columnName)))) - auto toPg = ctx.NewCallable(node->Pos(), "ToPg", {newBody}); - auto pgType = - ctx.NewCallable(node->Pos(), "PgType", {ctx.NewAtom(node->Pos(), NPg::LookupType(expectedType->GetId()).Name)}); - auto pgCast = ctx.NewCallable(node->Pos(), "PgCast", {toPg, pgType}); - - TNodeOnNodeOwnedMap replaces; - replaces[pgAgg.Get()] = pgCast; - newBody = ctx.ReplaceNodes(lambda.Body().Ptr(), replaces); - } - - // clang-format off - lambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body(newBody) - .Done(); - // clang-format on - } - - // Eliminate `PgGroupRef` from projection lambda. - auto pgGroupRef = GetPgCallable(lambda.Body().Ptr(), "PgGroupRef"); - if (pgGroupRef) { - TString columnName; - if (pgGroupRef->ChildrenSize() == 4) { - columnName = TString(pgGroupRef->ChildPtr(3)->Content()); - } else if (pgGroupRef->ChildrenSize() == 3) { - // In this case we can get a column name from group expr map - const auto groupByKeyExprId = FromString(TString(pgGroupRef->ChildPtr(2)->Content())); - auto it = groupByKeysExpressionsMap.find(groupByKeyExprId); - Y_ENSURE(it != groupByKeysExpressionsMap.end(), "Group by key expression has invalid content."); - columnName = it->second.first.GetFullName(); - // Always need a pg cast for expressions. - needPgCast = true; - } else { - Y_ENSURE(false, "Invalid children size for `pgGroupRef`"); - } - - // clang-format off - lambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body() - .Struct(lambda.Args().Arg(0)) - .Name() - .Value(columnName) - .Build() - .Build() - .Done(); - // clang-format on - } - - if (convertToPg && !needPgCastForAgg) { - Y_ENSURE(!needPgCast, - TStringBuilder() << "Conversion to PG type is different at typization (" << expectedType->GetId() - << ") and optimization (" << actualPgTypeId << ") stages."); - - TExprNode::TPtr lambdaBody = lambda.Body().Ptr(); - lambdaBody = ReplacePgOps(lambdaBody, ctx); - auto toPg = ctx.NewCallable(node->Pos(), "ToPg", {lambdaBody}); - - // clang-format off - lambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body(toPg) - .Done(); - // clang-format on - } else if ((needPgCast || needPgCastForAgg)) { - - auto pgType = - ctx.NewCallable(node->Pos(), "PgType", {ctx.NewAtom(node->Pos(), NPg::LookupType(expectedType->GetId()).Name)}); - TExprNode::TPtr lambdaBody = lambda.Body().Ptr(); - lambdaBody = ReplacePgOps(lambdaBody, ctx); - auto pgCast = ctx.NewCallable(node->Pos(), "PgCast", {lambdaBody, pgType}); - - // clang-format off - lambda = Build(ctx, node->Pos()) - .Args(lambda.Args()) - .Body(pgCast) - .Done(); - // clang-format on - } - - if (resultElementCounters.contains(columnName)) { - resultElementCounters[columnName] += 1; - columnName = columnName + "_generated_" + std::to_string(resultElementCounters.at(columnName)); - } else { - resultElementCounters[columnName] = 1; - } - - finalColumnOrder.push_back(columnName); - auto variable = Build(ctx, node->Pos()).Value(columnName).Done(); - - // clang-format off - resultElements.push_back(Build(ctx, node->Pos()) - .Input(resultExpr) - .Variable(variable) - .Lambda(lambda) - .Done().Ptr()); - // clang-format on - } - - // clang-format off - auto setItemPtr = Build(ctx, node->Pos()) - .Input(resultExpr) - .MapElements() - .Add(resultElements) - .Build() - .Project() - .Value("true") - .Build() - .Done().Ptr(); - // clang-format onto - - auto sort = GetSetting(setItem->Tail(), "sort"); - if (sort) { - setItemPtr = BuildSort(setItemPtr, sort, ctx); - } - - setItemsResults.push_back(setItemPtr); - } - - auto setOps = GetSetting(node->Head(), "set_ops"); - Y_ENSURE(setOps && setItemsResults.size()); - - auto setOpsList = setOps->TailPtr(); - TExprNode::TPtr opResult = setItemsResults.front(); - for (ui32 i = 0, end = setOpsList->ChildrenSize(), setItemsIndex = 0, opsInputCount = 0; i < end; ++i) { - if (setOpsList->ChildPtr(i)->Content() == "push") { - ++opsInputCount; - continue; - } - Y_ENSURE(setOpsList->ChildPtr(i)->Content() == "union_all"); - Y_ENSURE(opsInputCount <= 2); - - TExprNode::TPtr leftInput; - TExprNode::TPtr rightInput; - if (opsInputCount == 2) { - Y_ENSURE(setItemsIndex + 1 < end); - leftInput = setItemsResults[setItemsIndex++]; - rightInput = setItemsResults[setItemsIndex++]; - } else { - Y_ENSURE(setItemsIndex < end); - leftInput = opResult; - rightInput = setItemsResults[setItemsIndex++]; - } - - // clang-format off - opResult = Build(ctx, node->Pos()) - .LeftInput(leftInput) - .RightInput(rightInput) - .Done().Ptr(); - // clang-format on - - // Count again. - opsInputCount = 0; - } - - auto sort = GetSetting(node->Head(), "sort"); - if (sort) { - opResult = BuildSort(opResult, sort, ctx); - } - - TVector columnAtomList; - for (auto c : finalColumnOrder) { - columnAtomList.push_back(Build(ctx, node->Pos()).Value(c).Done()); - } - auto columnOrder = Build(ctx, node->Pos()).Add(columnAtomList).Done().Ptr(); - - // clang-format off - return Build(ctx, node->Pos()) - .Input(opResult) - .ColumnOrder(columnOrder) - .Done().Ptr(); - // clang-format on - -} TExprNode::TPtr PushTakeIntoPlan(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext &typeCtx) { Y_UNUSED(typeCtx); @@ -1006,6 +22,7 @@ TExprNode::TPtr PushTakeIntoPlan(const TExprNode::TPtr &node, TExprContext &ctx, .Count(take.Count()) .Build() .ColumnOrder(root.Cast().ColumnOrder()) + .PgSyntax(root.Cast().PgSyntax()) .Done().Ptr(); // clang-format on } else { @@ -1013,23 +30,23 @@ TExprNode::TPtr PushTakeIntoPlan(const TExprNode::TPtr &node, TExprContext &ctx, } } -TExprNode::TPtr RewritePgSublink(const TExprNode::TPtr &node, TExprContext &ctx) { +TExprNode::TPtr RewriteSublink(const TExprNode::TPtr &node, TExprContext &ctx) { if (node->Child(0)->Content() != "expr") { return node; } // clang-format off - return Build(ctx, node->Pos()) + return Build(ctx, node->Pos()) .Expr(node->Child(4)) .Done().Ptr(); // clang-format on } TExprNode::TPtr RemoveRootFromSublink(const TExprNode::TPtr &node, TExprContext &ctx) { - auto sublink = TKqpPgExprSublink(node); + auto sublink = TKqpExprSublink(node); if (auto root = sublink.Expr().Maybe()) { // clang-format off - return Build(ctx, node->Pos()) + return Build(ctx, node->Pos()) .Expr(root.Cast().Input()) .Done().Ptr(); // clang-format on @@ -1041,7 +58,7 @@ TExprNode::TPtr RemoveRootFromSublink(const TExprNode::TPtr &node, TExprContext namespace NKikimr { namespace NKqp { -IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr &output, TExprContext &ctx) { +IGraphTransformer::TStatus TKqpRewriteSelectTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr &output, TExprContext &ctx) { output = input; TOptimizeExprSettings settings(&TypeCtx); @@ -1049,7 +66,9 @@ IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr output, output, [](const TExprNode::TPtr &node, TExprContext &ctx) -> TExprNode::TPtr { if (node->IsCallable("PgSubLink")) { - return RewritePgSublink(node, ctx); + return RewriteSublink(node, ctx); + } else if (node->IsCallable("YqlSubLink")) { + return RewriteSublink(node, ctx); } else { return node; } @@ -1063,11 +82,18 @@ IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr status = OptimizeExpr( output, output, [this](const TExprNode::TPtr &node, TExprContext &ctx) -> TExprNode::TPtr { + + // PostgreSQL AST rewrtiting if (TCoPgSelect::Match(node.Get())) { - return RewritePgSelect(node, ctx, TypeCtx); - } else if (TKqpPgExprSublink::Match(node.Get())) { + return RewriteSelect(node, ctx, TypeCtx, true); + } + + // YQL AST rewriting + else if (TCoYqlSelect::Match(node.Get())) { + return RewriteSelect(node, ctx, TypeCtx, false); + } else if (TKqpExprSublink::Match(node.Get())) { return RemoveRootFromSublink(node, ctx); - } else if (TCoTake::Match(node.Get())) { + } else if (TCoTake::Match(node.Get())) { return PushTakeIntoPlan(node, ctx, TypeCtx); } else { return node; @@ -1078,7 +104,7 @@ IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr return status; } -void TKqpPgRewriteTransformer::Rewind() {} +void TKqpRewriteSelectTransformer::Rewind() {} IGraphTransformer::TStatus TKqpNewRBOTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr &output, TExprContext &ctx) { output = input; @@ -1145,9 +171,9 @@ IGraphTransformer::TStatus TKqpRBOCleanupTransformer::DoTransform(TExprNode::TPt void TKqpRBOCleanupTransformer::Rewind() {} -TAutoPtr CreateKqpPgRewriteTransformer(const TIntrusivePtr &kqpCtx, +TAutoPtr CreateKqpRewriteSelectTransformer(const TIntrusivePtr &kqpCtx, TTypeAnnotationContext &typeCtx) { - return new TKqpPgRewriteTransformer(kqpCtx, typeCtx); + return new TKqpRewriteSelectTransformer(kqpCtx, typeCtx); } TAutoPtr CreateKqpNewRBOTransformer(const TIntrusivePtr &kqpCtx, TTypeAnnotationContext &typeCtx, diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h index 3bb563600d66..836fc6224aea 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h @@ -17,9 +17,9 @@ using namespace NYql; using namespace NYql::NNodes; using namespace NOpt; -class TKqpPgRewriteTransformer : public TSyncTransformerBase { +class TKqpRewriteSelectTransformer : public TSyncTransformerBase { public: - TKqpPgRewriteTransformer(const TIntrusivePtr &kqpCtx, TTypeAnnotationContext &typeCtx) + TKqpRewriteSelectTransformer(const TIntrusivePtr &kqpCtx, TTypeAnnotationContext &typeCtx) : TypeCtx(typeCtx), KqpCtx(*kqpCtx) {} // Main method of the transformer @@ -31,7 +31,7 @@ class TKqpPgRewriteTransformer : public TSyncTransformerBase { const TKqpOptimizeContext &KqpCtx; }; -TAutoPtr CreateKqpPgRewriteTransformer(const TIntrusivePtr &kqpCtx, +TAutoPtr CreateKqpRewriteSelectTransformer(const TIntrusivePtr &kqpCtx, TTypeAnnotationContext &typeCtx); class TKqpNewRBOTransformer : public TSyncTransformerBase { @@ -81,5 +81,7 @@ class TKqpRBOCleanupTransformer : public TSyncTransformerBase { TAutoPtr CreateKqpRBOCleanupTransformer(TTypeAnnotationContext &typeCtx); +TExprNode::TPtr RewriteSelect(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext &typeCtx, bool pgSyntax=false); + } // namespace NKqp } // namespace NKikimr \ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp index 3b3763500bbe..e13d19f26208 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_type_ann.cpp @@ -60,7 +60,9 @@ TStatus ComputeTypes(std::shared_ptr read, TRBOContext & ctx) { TVector structItemTypes = rowType->Cast()->GetItems(); TVector newItemTypes; for (auto t : structItemTypes) { - newItemTypes.push_back(ctx.ExprCtx.MakeType("_alias_" + read->Alias + "." + t->GetName(), t->GetItemType())); + TString columnName = TString(t->GetName()); + TString fullName = read->Alias != "" ? ( "_alias_" + read->Alias + "." + columnName ) : columnName; + newItemTypes.push_back(ctx.ExprCtx.MakeType(fullName, t->GetItemType())); } auto newStructType = ctx.ExprCtx.MakeType(newItemTypes); diff --git a/ydb/core/kqp/opt/rbo/kqp_rewrite_select.cpp b/ydb/core/kqp/opt/rbo/kqp_rewrite_select.cpp new file mode 100644 index 000000000000..147bd108f022 --- /dev/null +++ b/ydb/core/kqp/opt/rbo/kqp_rewrite_select.cpp @@ -0,0 +1,873 @@ +#include "kqp_rbo_transformer.h" +#include "kqp_operator.h" +#include "kqp_plan_conversion_utils.h" + +#include + + +using namespace NYql; +using namespace NYql::NNodes; +using namespace NKikimr::NKqp; +using namespace NYql::NDq; + +namespace { + +struct TJoinTableAliases { + THashSet LeftSideAliases; + THashSet RightSideAliases; +}; + +THashSet SupportedAggregationFunctions{"sum", "min", "max", "count"}; +ui64 KqpUniqueAggColumnId{0}; + +TJoinTableAliases GatherJoinAliasesLeftSideMultiInputs(const TVector &joinKeys, const THashSet &processedInputs) { + TJoinTableAliases joinAliases; + for (const auto &joinKey : joinKeys) { + if (processedInputs.count(joinKey.Alias)) { + joinAliases.LeftSideAliases.insert(joinKey.Alias); + } else { + joinAliases.RightSideAliases.insert(joinKey.Alias); + } + } + Y_ENSURE(joinAliases.LeftSideAliases.size(), "Left side of the join inputs are empty"); + Y_ENSURE(joinAliases.RightSideAliases.size() == 1, "Right side of the join should have only one input"); + return joinAliases; +} + +TJoinTableAliases GatherJoinAliasesTwoInputs(const TVector &joinKeys) { + TJoinTableAliases joinAliases; + for (ui32 i = 0; i < joinKeys.size(); i += 2) { + joinAliases.LeftSideAliases.insert(joinKeys[i].Alias); + joinAliases.RightSideAliases.insert(joinKeys[i + 1].Alias); + } + + Y_ENSURE(joinAliases.LeftSideAliases.size() == 1, "Left side of the join should have only one input"); + Y_ENSURE(joinAliases.RightSideAliases.size() == 1, "Right side of the join should have only one input"); + return joinAliases; +} + +TExprNode::TPtr BuildJoinKeys(const TVector &joinKeys, const TJoinTableAliases &joinAliases, THashSet &processedInputs, + TExprContext &ctx, TPositionHandle pos) { + Y_ENSURE(joinKeys.size() >= 2 && !(joinKeys.size() & 1), "Invalid join key size"); + TVector keys; + for (ui32 i = 0; i < joinKeys.size(); i += 2) { + auto leftSideKey = joinKeys[i]; + auto rightSideKey = joinKeys[i + 1]; + if (joinAliases.LeftSideAliases.count(rightSideKey.Alias)) { + std::swap(leftSideKey, rightSideKey); + } + // clang-format off + keys.push_back(Build(ctx, pos) + .LeftLabel() + .Value(leftSideKey.Alias) + .Build() + .LeftColumn() + .Value(leftSideKey.ColumnName) + .Build() + .RightLabel() + .Value(rightSideKey.Alias) + .Build() + .RightColumn() + .Value(rightSideKey.ColumnName) + .Build() + .Done()); + // clang-format on + processedInputs.insert(leftSideKey.Alias); + processedInputs.insert(rightSideKey.Alias); + } + return Build(ctx, pos).Add(keys).Done().Ptr(); +} + +TExprNode::TPtr BuildAggregationTraits(const TString& originalColName, const TString& resultColName, const TString& aggFunction, + const TTypeAnnotationNode* resultType, TExprContext &ctx, TPositionHandle pos) { + // clang-format off + return Build(ctx, pos) + .OriginalColName() + .Value(originalColName) + .Build() + .AggregationFunction() + .Value(aggFunction) + .Build() + .ResultColName() + .Value(resultColName) + .Build() + .AggregationFunctionResultType(ExpandType(pos, *resultType, ctx)) + .Done().Ptr(); + // clang-format on +} + +TExprNode::TPtr BuildAggregate(TExprNode::TPtr resultExpr, const TVector& keys, const TVector& aggTraitsList, + bool distinctAll, + TExprContext& ctx, TPositionHandle pos) { + TVector keyColumns; + for (const auto& column : keys) { + // clang-format off + auto keyColumn = Build(ctx, pos) + .Value(column.GetFullName()) + .Done(); + // clang-format on + keyColumns.push_back(keyColumn); + } + + // clang-format off + return Build(ctx, pos) + .Input(resultExpr) + .AggregationTraitsList() + .Add(aggTraitsList) + .Build() + .KeyColumns() + .Add(keyColumns) + .Build() + .DistinctAll() + .Value(distinctAll ? "True" : "False") + .Build() + .Done().Ptr(); + // clang-format on +} + +void BuildSimpleMapElementLambda(TExprNode::TPtr resultExpr, const TVector>& renamesMap, TVector& mapElements, + TExprContext& ctx, TPositionHandle pos) { + for (const auto& [colName, rename] : renamesMap) { + // clang-format off + // Just wrap by member + auto lambda = ctx.Builder(pos) + .Lambda() + .Param("arg") + .Callable("Member") + .Arg(0, "arg") + .Atom(1, colName.GetFullName()) + .Seal() + .Seal().Build(); + + mapElements.push_back(Build(ctx, pos) + .Input(resultExpr) + .Variable() + .Value(colName.GetFullName()) + .Build() + .Lambda(lambda) + .Done().Ptr()); + // clang-format on + } +} + +TExprNode::TPtr BuildExpressionMap(TExprNode::TPtr resultExpr, + const TVector>& aggFieldsExpressionsMap, + const TVector>& aggFieldsRenamesMap, + const TVector>& groupByKeysRenamesMap, TExprContext& ctx, + TPositionHandle pos) { + // Add expressions + TVector mapElements; + for (const auto& [colName, expr] : aggFieldsExpressionsMap) { + // clang-format off + mapElements.push_back(Build(ctx, pos) + .Input(resultExpr) + .Variable() + .Value(colName.GetFullName()) + .Build() + .Lambda(expr) + .Done().Ptr()); + // clang-format on + } + + // Add other columns and wrap them with member. + BuildSimpleMapElementLambda(resultExpr, aggFieldsRenamesMap, mapElements, ctx, pos); + BuildSimpleMapElementLambda(resultExpr, groupByKeysRenamesMap, mapElements, ctx, pos); + + // clang-format off + return Build(ctx, pos) + .Input(resultExpr) + .MapElements() + .Add(mapElements) + .Build() + .Project() + .Value("true") + .Build() + .Done().Ptr(); + // clang-format on +} + +void BuildMapElementRename(TExprNode::TPtr resultExpr, const TVector>& renamesMap, TVector& mapElements, + TExprContext& ctx, TPositionHandle pos) { + for (const auto& [colName, newColName] : renamesMap) { + // clang-format off + mapElements.push_back(Build(ctx, pos) + .Input(resultExpr) + .Variable() + .Value(newColName.GetFullName()) + .Build() + .From() + .Value(colName.GetFullName()) + .Build() + .Done().Ptr()); + // clang-format on + } +} + +TExprNode::TPtr BuildRenameMap(TExprNode::TPtr resultExpr, const TVector>& aggFieldsRenamesMap, + const TVector>& groupByKeysRenamesMap, TExprContext& ctx, TPositionHandle pos) { + TVector mapElements; + BuildMapElementRename(resultExpr, aggFieldsRenamesMap, mapElements, ctx, pos); + BuildMapElementRename(resultExpr, groupByKeysRenamesMap, mapElements, ctx, pos); + + // clang-format off + return Build(ctx, pos) + .Input(resultExpr) + .MapElements() + .Add(mapElements) + .Build() + .Project() + .Value("false") + .Build() + .Done().Ptr(); + // clang-format on +} + +TString GenerateUniqueColumnName(const TString &colName) { + TStringBuilder strBuilder; + strBuilder << "_kqp_agg_input_"; + strBuilder << colName; + strBuilder << "_"; + strBuilder << ToString(KqpUniqueAggColumnId++); + return strBuilder; +} + +void ToCamelCase(std::string& s) { + char previous = ' '; + auto f = [&](char current) { + char result = (std::isblank(previous) && std::isalpha(current)) ? std::toupper(current) : std::tolower(current); + previous = current; + return result; + }; + std::transform(s.begin(), s.end(), s.begin(), f); +} + +TExprNode::TPtr GetPgCallable(TExprNode::TPtr input, const TString& callableName) { + auto isPgCallable = [&](const TExprNode::TPtr& node) -> bool { + if (node->IsCallable(callableName)) { + return true; + } + return false; + }; + + return FindNode(input, isPgCallable); +} + +TExprNode::TPtr GetAtom(TExprNode::TPtr input, const TString& atomName) { + auto isPgAtom = [&](const TExprNode::TPtr& node) -> bool { + if (node->IsAtom(atomName)) { + return true; + } + return false; + }; + + return FindNode(input, isPgAtom); +} + +TExprNode::TPtr ReplacePgOps(TExprNode::TPtr input, TExprContext &ctx) { + if (input->IsLambda()) { + auto lambda = TCoLambda(input); + + // clang-format off + return Build(ctx, input->Pos()) + .Args(lambda.Args()) + .Body(ReplacePgOps(lambda.Body().Ptr(), ctx)) + .Done().Ptr(); + // clang-format on + } else if (input->IsCallable("PgAnd")) { + // clang-format off + return ctx.Builder(input->Pos()) + .Callable("ToPg") + .Callable(0, "And") + .Callable(0, "FromPg") + .Add(0, ReplacePgOps(input->ChildPtr(0), ctx)) + .Seal() + .Callable(1, "FromPg") + .Add(0, ReplacePgOps(input->ChildPtr(1), ctx)) + .Seal() + .Seal() + .Seal() + .Build(); + // clang-format on + + } else if (input->IsCallable("PgOr")) { + // clang-format off + return ctx.Builder(input->Pos()) + .Callable("ToPg") + .Callable(0, "Or") + .Callable(0, "FromPg") + .Add(0, ReplacePgOps(input->ChildPtr(0), ctx)) + .Seal() + .Callable(1, "FromPg") + .Add(0, ReplacePgOps(input->ChildPtr(1), ctx)) + .Seal() + .Seal() + .Seal() + .Build(); + // clnag-format on + } + else if (input->IsCallable()){ + TVector newChildren; + for (auto c : input->Children()) { + newChildren.push_back(ReplacePgOps(c, ctx)); + } + // clang-format off + return ctx.Builder(input->Pos()) + .Callable(input->Content()) + .Add(std::move(newChildren)) + .Seal() + .Build(); + // clang-format on + } else if (input->IsList()) { + TVector newChildren; + for (auto c : input->Children()) { + newChildren.push_back(ReplacePgOps(c, ctx)); + } + // clang-format off + return ctx.Builder(input->Pos()) + .List() + .Add(std::move(newChildren)) + .Seal() + .Build(); + // clang-format on + } else { + return input; + } +} + +TExprNode::TPtr BuildSort(TExprNode::TPtr input, TExprNode::TPtr sort, TExprContext &ctx) { + TVector sortElements; + + for (auto sortItem : sort->Child(1)->Children()) { + auto sortLambda = sortItem->Child(1); + auto direction = sortItem->Child(2); + auto nullsFirst = sortItem->Child(3); + + // clang-format off + sortElements.push_back(Build(ctx, input->Pos()) + .Input(input) + .Direction(direction) + .NullsFirst(nullsFirst) + .Lambda(sortLambda) + .Done().Ptr()); + // clang-format on + } + + // clang-format off + return Build(ctx, input->Pos()) + .Input(input) + .SortExpressions().Add(sortElements).Build() + .Done().Ptr(); + // clang-format off +} + +} // namespace + +namespace NKikimr { +namespace NKqp { + +TExprNode::TPtr RewriteSelect(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext &typeCtx, bool pgSyntax) { + Y_UNUSED(typeCtx); + Y_UNUSED(pgSyntax); + + TVector finalColumnOrder; + + auto setItems = GetSetting(node->Head(), "set_items")->TailPtr(); + TVector setItemsResults; + for (ui32 i = 0; i < setItems->ChildrenSize(); ++i) { + auto setItem = setItems->ChildPtr(i); + + TVector resultElements; + // In pg syntax duplicate attributes are allowed in the results, but we need to rename them + // We use the counters for this purpose + THashMap resultElementCounters; + + TExprNode::TPtr joinExpr; + TExprNode::TPtr filterExpr; + TExprNode::TPtr lastAlias; + + auto from = GetSetting(setItem->Tail(), "from"); + THashMap aliasToInputMap; + TVector inputsInOrder; + + if (from) { + for (auto fromItem : from->Child(1)->Children()) { + // From item can be a table read with an alias or a subquery with an alias + // In case of a subquery, we have already translated PgSelect of the nested subquery + // so we just need to remove TKqpOpRoot and plug in the translated subquery + + auto childExpr = fromItem->ChildPtr(0); + auto alias = fromItem->Child(1); + TExprNode::TPtr fromExpr; + + if (TKqpOpRoot::Match(childExpr.Get())) { + auto opRoot = TKqpOpRoot(childExpr); + + TVector subqueryElements; + + // We need to rename all the IUs in the subquery to reflect the new alias + auto subqueryType = childExpr->GetTypeAnn()->Cast()->GetItemType()->Cast(); + for (auto item : subqueryType->GetItems()) { + auto orig = TString(item->GetName()); + auto unit = TInfoUnit(orig); + auto renamedUnit = TInfoUnit(TString(alias->Content()), unit.ColumnName); + + // clang-format off + subqueryElements.push_back(Build(ctx, node->Pos()) + .Input(opRoot.Input()) + .Variable().Value(renamedUnit.GetFullName()).Build() + .From().Value(unit.GetFullName()).Build() + .Done().Ptr()); + // clang-format on + } + + // clang-format off + fromExpr = Build(ctx, node->Pos()) + .Input(opRoot.Input()) + .MapElements().Add(subqueryElements).Build() + .Project().Value("true").Build() + .Done().Ptr(); + // clang-format on + } + + else { + auto readExpr = TKqlReadTableRanges(childExpr); + + // clang-format off + fromExpr = Build(ctx, node->Pos()) + .Table(readExpr.Table()) + .Alias(alias) + .Columns(readExpr.Columns()) + .Done().Ptr(); + // clang-format on + } + + aliasToInputMap.insert({TString(alias->Content()), fromExpr}); + inputsInOrder.push_back(fromExpr); + lastAlias = alias; + } + } + + THashSet processedInputs; + auto joinOps = GetSetting(setItem->Tail(), "join_ops"); + if (joinOps) { + for (ui32 i = 0; i < joinOps->Tail().ChildrenSize(); ++i) { + ui32 tableInputsCount = 0; + auto tuple = joinOps->Tail().Child(i); + for (ui32 j = 0; j < tuple->ChildrenSize(); ++j) { + auto join = tuple->Child(j); + auto joinType = join->Child(0)->Content(); + if (joinType == "push") { + ++tableInputsCount; + continue; + } + + Y_ENSURE(join->ChildrenSize() > 1 && join->Child(1)->ChildrenSize() > 1); + auto pgResolvedOps = FindNodes(join->Child(1)->Child(1)->TailPtr(), [](const TExprNode::TPtr &node) { + if (node->IsCallable("PgResolvedOp")) { + return true; + } else { + return false; + } + }); + + // FIXME: join on clause may include expressions, we need to handle this case + TVector joinKeys; + for (const auto &pgResolvedOp : pgResolvedOps) { + TVector keys; + GetAllMembers(pgResolvedOp, keys); + joinKeys.insert(joinKeys.end(), keys.begin(), keys.end()); + } + + TJoinTableAliases joinAliases; + TExprNode::TPtr leftInput; + TExprNode::TPtr rightInput; + + if (tableInputsCount == 2) { + joinAliases = GatherJoinAliasesTwoInputs(joinKeys); + const auto leftSideAlias = *joinAliases.LeftSideAliases.begin(); + const auto rightSideAlias = *joinAliases.RightSideAliases.begin(); + Y_ENSURE(aliasToInputMap.count(leftSideAlias), "Left side alias is not present in input tables"); + Y_ENSURE(aliasToInputMap.count(rightSideAlias), "Right sided alias is not present input tables"); + leftInput = aliasToInputMap[leftSideAlias]; + rightInput = aliasToInputMap[rightSideAlias]; + } else if (tableInputsCount == 1) { + joinAliases = GatherJoinAliasesLeftSideMultiInputs(joinKeys, processedInputs); + const auto rightSideAlias = *joinAliases.RightSideAliases.begin(); + Y_ENSURE(aliasToInputMap.contains(rightSideAlias), "Right side alias is not present in input tables"); + leftInput = joinExpr; + rightInput = aliasToInputMap[rightSideAlias]; + } + + auto joinKind = TString(joinType); + ToCamelCase(joinKind.MutRef()); + + // clang-format off + joinExpr = Build(ctx, node->Pos()) + .LeftInput(leftInput) + .RightInput(rightInput) + .JoinKind() + .Value(joinKind) + .Build() + .JoinKeys(BuildJoinKeys(joinKeys, joinAliases, processedInputs, ctx, node->Pos())) + .Done().Ptr(); + // clang-format on + tableInputsCount = 0; + } + } + + // Build in order + if (!joinExpr) { + ui32 inputIndex = 0; + if (inputsInOrder.size() > 1) { + while (inputIndex < inputsInOrder.size()) { + auto leftTableInput = inputIndex == 0 ? inputsInOrder[inputIndex] : joinExpr; + auto rightTableInput = inputIndex == 0 ? inputsInOrder[inputIndex + 1] : inputsInOrder[inputIndex]; + auto joinKeys = Build(ctx, node->Pos()).Done(); + // clang-format off + joinExpr = Build(ctx, node->Pos()) + .LeftInput(leftTableInput) + .RightInput(rightTableInput) + .JoinKind() + .Value("Cross") + .Build() + .JoinKeys(joinKeys) + .Done().Ptr(); + // clang-format on + inputIndex += (inputIndex == 0 ? 2 : 1); + } + } else { + joinExpr = inputsInOrder.front(); + } + } + } + + filterExpr = joinExpr; + + auto where = GetSetting(setItem->Tail(), "where"); + + if (where) { + TExprNode::TPtr lambda = where->Child(1)->Child(1); + lambda = ReplacePgOps(lambda, ctx); + // clang-format off + filterExpr = Build(ctx, node->Pos()) + .Input(filterExpr) + .Lambda(lambda) + .Done().Ptr(); + // clang-format on + } + + if (!filterExpr) { + filterExpr = Build(ctx, node->Pos()).Done().Ptr(); + } + + // FIXME: Group by key can be an expression, we need to handle this case + TVector> groupByKeysRenamesMap; + TVector groupByKeys; + auto groupOps = GetSetting(setItem->Tail(), "group_exprs"); + if (groupOps) { + const auto groupByList = groupOps->TailPtr(); + for (ui32 i = 0; i < groupByList->ChildrenSize(); ++i) { + auto lambda = TCoLambda(ctx.DeepCopyLambda(*(groupByList->ChildPtr(i)->Child(1)))); + auto body = lambda.Body().Ptr(); + TVector keys; + GetAllMembers(body, keys); + groupByKeys.insert(groupByKeys.end(), keys.begin(), keys.end()); + for (const auto &infoUnit : keys) { + groupByKeysRenamesMap.push_back({infoUnit, infoUnit}); + } + } + } + + const bool distinctAll = !!GetSetting(setItem->Tail(), "distinct_all"); + auto result = GetSetting(setItem->Tail(), "result"); + Y_ENSURE(result); + auto finalType = node->GetTypeAnn()->Cast()->GetItemType()->Cast(); + + // This is a hack to enable convertion for aggregation columns. + THashSet aggregationColumns; + THashSet columnNames; + // Collect PgAgg for each result item at first pass. + TVector aggTraitsList; + TVector distinctAggTraitsList; + TVector distinctAggKeyColumns; + TVector> aggFieldsRenamesMap; + TVector> aggFieldsExpressionsMap; + bool needRenameMap = false; + for (ui32 i = 0; i < result->Child(1)->ChildrenSize(); ++i) { + const auto resultItem = result->Child(1)->ChildPtr(i); + auto lambda = TCoLambda(ctx.DeepCopyLambda(*(resultItem->Child(2)))); + auto resultColName = TString(resultItem->Child(0)->Content()); + const auto *aggFuncResultType = finalType->FindItemType(resultColName); + Y_ENSURE(aggFuncResultType, "Cannot find type for aggregation result."); + + auto pgAgg = GetPgCallable(lambda.Body().Ptr(), "PgAgg"); + if (pgAgg) { + // Collect original column names processing `PgAgg` callable. + TVector originalColNames; + GetAllMembers(pgAgg, originalColNames); + auto pgResolvedOp = GetPgCallable(lambda.Body().Ptr(), "PgResolvedOp"); + //Y_ENSURE(originalColNames.size() > 1 && pgResolvedOp, "Invalid column size for aggregation columns."); + + auto originalColName = originalColNames.front(); + auto renamedColName = originalColName; + + if (pgResolvedOp) { + auto fromPg = ctx.NewCallable(node->Pos(), "FromPg", {pgResolvedOp}); + auto exprLambda = Build(ctx, node->Pos()).Args(lambda.Args()).Body(fromPg).Done().Ptr(); + + // Just any unique name for expression result, physical plan should be AsSturct(`unique_name (expression)) + originalColName = TInfoUnit(GenerateUniqueColumnName("_expr_")); + renamedColName = originalColName; + aggFieldsExpressionsMap.push_back({originalColName, exprLambda}); + } else { + // Rename agg column we will add a map to map same column to different renames. + if (columnNames.count(originalColName.GetFullName())) { + renamedColName = TInfoUnit(originalColName.Alias, GenerateUniqueColumnName(originalColName.ColumnName)); + needRenameMap = true; + } + aggFieldsRenamesMap.push_back({originalColName, renamedColName}); + } + + columnNames.insert(renamedColName.GetFullName()); + Y_ENSURE(!GetAtom(pgAgg->ChildPtr(1), "distinct"), "Aggregation on distinct is not supported"); + + aggregationColumns.insert(resultColName); + const TString aggFuncName = TString(pgAgg->ChildPtr(0)->Content()); + auto aggregationTraits = BuildAggregationTraits(renamedColName.GetFullName(), resultColName, aggFuncName, + aggFuncResultType, ctx, node->Pos()); + aggTraitsList.push_back(aggregationTraits); + + if (distinctAll) { + auto distinctAggTraits = + BuildAggregationTraits(resultColName, resultColName, "distinct_all", aggFuncResultType, ctx, node->Pos()); + distinctAggTraitsList.push_back(distinctAggTraits); + distinctAggKeyColumns.push_back(TInfoUnit(resultColName)); + } + // This case covers distinct all on just columns without aggregation functions. + } else if (!pgAgg && distinctAll) { + aggregationColumns.insert(resultColName); + Y_ENSURE(aggFuncResultType, "Cannot find type for aggregation result."); + TVector originalColNames; + GetAllMembers(resultItem->ChildPtr(2), originalColNames); + Y_ENSURE(originalColNames.size() == 1, "Invalid column size for aggregation columns."); + + const auto originalColName = originalColNames.front().GetFullName(); + auto distinctAggTraits = + BuildAggregationTraits(originalColName, originalColName, "distinct_all", aggFuncResultType, ctx, node->Pos()); + distinctAggTraitsList.push_back(distinctAggTraits); + distinctAggKeyColumns.push_back(originalColNames.front()); + } + } + + TExprNode::TPtr resultExpr = filterExpr; + // Build aggregate + if (!aggTraitsList.empty()) { + // In case we have a multiple consumers for the single column we have to map and rename it. + if (needRenameMap) { + resultExpr = BuildRenameMap(resultExpr, aggFieldsRenamesMap, groupByKeysRenamesMap, ctx, node->Pos()); + } + // In case we have an expression for aggregation - f(a + b ..) + if (!aggFieldsExpressionsMap.empty()) { + resultExpr = BuildExpressionMap(resultExpr, aggFieldsExpressionsMap, aggFieldsRenamesMap, groupByKeysRenamesMap, ctx, node->Pos()); + } + resultExpr = BuildAggregate(resultExpr, groupByKeys, aggTraitsList, distinctAll, ctx, node->Pos()); + } + + // Build distinct aggregate + if (!distinctAggTraitsList.empty()) { + resultExpr = BuildAggregate(resultExpr, distinctAggKeyColumns, distinctAggTraitsList, distinctAll, ctx, node->Pos()); + } + + finalColumnOrder.clear(); + THashMap aggProjectionMap; + + for (auto resultItem : result->Child(1)->Children()) { + auto column = resultItem->Child(0); + TString columnName = TString(column->Content()); + + const auto expectedTypeNode = finalType->FindItemType(columnName); + Y_ENSURE(expectedTypeNode); + const auto actualTypeNode = resultItem->GetTypeAnn(); + + auto lambda = TCoLambda(ctx.DeepCopyLambda(*(resultItem->Child(2)))); + + YQL_CLOG(TRACE, CoreDq) << "Actual type for column: " << columnName << " is: " << *actualTypeNode; + YQL_CLOG(TRACE, CoreDq) << "Expected type for column: " << columnName << " is: " << *expectedTypeNode; + + bool needPgCast = false; + bool convertToPg = false; + bool needPgCastForAgg = false; + const TPgExprType* expectedPgType = pgSyntax ? expectedTypeNode->Cast() : nullptr; + ui32 actualPgTypeId = 0; + + if (pgSyntax) { + Y_ENSURE(ExtractPgType(actualTypeNode, actualPgTypeId, convertToPg, node->Pos(), ctx)); + + needPgCast = (expectedPgType->GetId() != actualPgTypeId); + needPgCastForAgg = aggregationColumns.count(columnName); + } + + auto pgAgg = GetPgCallable(lambda.Body().Ptr(), "PgAgg"); + if (pgAgg) { + Y_ENSURE(SupportedAggregationFunctions.count(pgAgg->ChildPtr(0)->Content()), + "Aggregation function " + TString(pgAgg->ChildPtr(0)->Content()) + " is not supported "); + + // Build a projection lambda, we do not need `PgAgg` inside. + // clang-format off + lambda = Build(ctx, node->Pos()) + .Args(lambda.Args()) + .Body() + .Struct(lambda.Args().Arg(0)) + .Name() + .Value(columnName) + .Build() + .Build() + .Done(); + // clang-format on + } + + // Eliminate `PgGroupRef` from projection lambda. + auto pgGroupRef = GetPgCallable(lambda.Body().Ptr(), "PgGroupRef"); + if (pgGroupRef) { + Y_ENSURE(pgGroupRef->ChildrenSize() == 4); + // clang-format off + lambda = Build(ctx, node->Pos()) + .Args(lambda.Args()) + .Body() + .Struct(lambda.Args().Arg(0)) + .Name() + .Value(pgGroupRef->ChildPtr(3)->Content()) + .Build() + .Build() + .Done(); + // clang-format on + } + + if (convertToPg && !needPgCastForAgg) { + Y_ENSURE(!needPgCast, + TStringBuilder() << "Conversion to PG type is different at typization (" << expectedPgType->GetId() + << ") and optimization (" << actualPgTypeId << ") stages."); + + TExprNode::TPtr lambdaBody = lambda.Body().Ptr(); + lambdaBody = ReplacePgOps(lambdaBody, ctx); + auto toPg = ctx.NewCallable(node->Pos(), "ToPg", {lambdaBody}); + + // clang-format off + lambda = Build(ctx, node->Pos()) + .Args(lambda.Args()) + .Body(toPg) + .Done(); + // clang-format on + } else if (needPgCast || needPgCastForAgg) { + + auto pgType = + ctx.NewCallable(node->Pos(), "PgType", {ctx.NewAtom(node->Pos(), ::NPg::LookupType(expectedPgType->GetId()).Name)}); + TExprNode::TPtr lambdaBody = lambda.Body().Ptr(); + lambdaBody = ReplacePgOps(lambdaBody, ctx); + auto pgCast = ctx.NewCallable(node->Pos(), "PgCast", {lambdaBody, pgType}); + + // clang-format off + lambda = Build(ctx, node->Pos()) + .Args(lambda.Args()) + .Body(pgCast) + .Done(); + // clang-format on + } + + if (resultElementCounters.contains(columnName)) { + resultElementCounters[columnName] += 1; + columnName = columnName + "_generated_" + std::to_string(resultElementCounters.at(columnName)); + } else { + resultElementCounters[columnName] = 1; + } + + finalColumnOrder.push_back(columnName); + auto variable = Build(ctx, node->Pos()).Value(columnName).Done(); + + // clang-format off + resultElements.push_back(Build(ctx, node->Pos()) + .Input(resultExpr) + .Variable(variable) + .Lambda(lambda) + .Done().Ptr()); + // clang-format on + } + + // clang-format off + auto setItemPtr = Build(ctx, node->Pos()) + .Input(resultExpr) + .MapElements() + .Add(resultElements) + .Build() + .Project() + .Value("true") + .Build() + .Done().Ptr(); + // clang-format onto + + auto sort = GetSetting(setItem->Tail(), "sort"); + if (sort) { + setItemPtr = BuildSort(setItemPtr, sort, ctx); + } + + setItemsResults.push_back(setItemPtr); + } + + auto setOps = GetSetting(node->Head(), "set_ops"); + Y_ENSURE(setOps && setItemsResults.size()); + + auto setOpsList = setOps->TailPtr(); + TExprNode::TPtr opResult = setItemsResults.front(); + for (ui32 i = 0, end = setOpsList->ChildrenSize(), setItemsIndex = 0, opsInputCount = 0; i < end; ++i) { + if (setOpsList->ChildPtr(i)->Content() == "push") { + ++opsInputCount; + continue; + } + Y_ENSURE(setOpsList->ChildPtr(i)->Content() == "union_all"); + Y_ENSURE(opsInputCount <= 2); + + TExprNode::TPtr leftInput; + TExprNode::TPtr rightInput; + if (opsInputCount == 2) { + Y_ENSURE(setItemsIndex + 1 < end); + leftInput = setItemsResults[setItemsIndex++]; + rightInput = setItemsResults[setItemsIndex++]; + } else { + Y_ENSURE(setItemsIndex < end); + leftInput = opResult; + rightInput = setItemsResults[setItemsIndex++]; + } + + // clang-format off + opResult = Build(ctx, node->Pos()) + .LeftInput(leftInput) + .RightInput(rightInput) + .Done().Ptr(); + // clang-format on + + // Count again. + opsInputCount = 0; + } + + auto sort = GetSetting(node->Head(), "sort"); + if (sort) { + opResult = BuildSort(opResult, sort, ctx); + } + + TVector columnAtomList; + for (auto c : finalColumnOrder) { + columnAtomList.push_back(Build(ctx, node->Pos()).Value(c).Done()); + } + auto columnOrder = Build(ctx, node->Pos()).Add(columnAtomList).Done().Ptr(); + + // clang-format off + return Build(ctx, node->Pos()) + .Input(opResult) + .ColumnOrder(columnOrder) + .PgSyntax().Value(pgSyntax).Build() + .Done().Ptr(); + // clang-format on +} + +} +} \ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/ya.make b/ydb/core/kqp/opt/rbo/ya.make index 25a41e68bfd5..90cac668f2e2 100644 --- a/ydb/core/kqp/opt/rbo/ya.make +++ b/ydb/core/kqp/opt/rbo/ya.make @@ -10,6 +10,7 @@ SRCS( kqp_rbo_type_ann.cpp kqp_rename_unused_stage.cpp kqp_constant_folding_stage.cpp + kqp_rewrite_select.cpp ) PEERDIR( diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp index e8c4eb8fa96b..472bc0d6fa92 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp @@ -188,14 +188,13 @@ Y_UNIT_TEST_SUITE(KqpRbo) { std::vector queries = { R"( - PRAGMA TablePathPrefix='/Root'; PRAGMA YqlSelect = 'force'; - SELECT id as id2 FROM foo WHERE name != '3_name'; + SELECT id as id2 FROM `/Root/foo` WHERE name != '3_name'; )" }; std::vector results = { - R"([["0"];["1"];["2"];["4"];["5"];["6"];["7"];["8"];["9"]])", + R"([[0];[1];[2];[4];[5];[6];[7];[8];[9]])", }; for (ui32 i = 0; i < queries.size(); ++i) { From 8abcdd8e52460466292b2f9860cc4f910895dbcb Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Wed, 19 Nov 2025 12:12:54 +0000 Subject: [PATCH 3/6] Working commit, waiting for YQL changes --- .../rbo/{kqp_rbo_ut.cpp => kqp_rbo_pg_ut.cpp} | 53 +- ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp | 1111 +++++++++++++++++ ydb/core/kqp/ut/rbo/ya.make | 3 +- 3 files changed, 1114 insertions(+), 53 deletions(-) rename ydb/core/kqp/ut/rbo/{kqp_rbo_ut.cpp => kqp_rbo_pg_ut.cpp} (95%) create mode 100644 ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp similarity index 95% rename from ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp rename to ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp index 472bc0d6fa92..ebc25c2cf7f8 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp @@ -79,7 +79,7 @@ double TimeQuery(TString schema, TString query, int nIterations) { return elapsed_time / nIterations; } -Y_UNIT_TEST_SUITE(KqpRbo) { +Y_UNIT_TEST_SUITE(KqpRboPg) { Y_UNIT_TEST(Select) { NKikimrConfig::TAppConfig appConfig; @@ -154,57 +154,6 @@ Y_UNIT_TEST_SUITE(KqpRbo) { } } - Y_UNIT_TEST(FilterYql) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); - TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - session.ExecuteSchemeQuery(R"( - CREATE TABLE `/Root/foo` ( - id Int64 NOT NULL, - name String, - primary key(id) - ); - )").GetValueSync(); - - NYdb::TValueBuilder rows; - rows.BeginList(); - for (size_t i = 0; i < 10; ++i) { - rows.AddListItem() - .BeginStruct() - .AddMember("id").Int64(i) - .AddMember("name").String(std::to_string(i) + "_name") - .EndStruct(); - } - rows.EndList(); - - auto resultUpsert = db.BulkUpsert("/Root/foo", rows.Build()).GetValueSync(); - UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); - - db = kikimr.GetTableClient(); - auto session2 = db.CreateSession().GetValueSync().GetSession(); - - std::vector queries = { - R"( - PRAGMA YqlSelect = 'force'; - SELECT id as id2 FROM `/Root/foo` WHERE name != '3_name'; - )" - }; - - std::vector results = { - R"([[0];[1];[2];[4];[5];[6];[7];[8];[9]])", - }; - - for (ui32 i = 0; i < queries.size(); ++i) { - const auto &query = queries[i]; - auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); - } - } - Y_UNIT_TEST(ConstantFolding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp new file mode 100644 index 000000000000..087186e407c5 --- /dev/null +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp @@ -0,0 +1,1111 @@ +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace { + +using namespace NKikimr; +using namespace NKikimr::NKqp; + +double TimeQuery(NKikimr::NKqp::TKikimrRunner& kikimr, TString query, int nIterations) { + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + clock_t the_time; + double elapsed_time; + the_time = clock(); + + for (int i=0; iSetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + session.ExecuteSchemeQuery(schema).GetValueSync(); + + clock_t the_time; + double elapsed_time; + the_time = clock(); + + for (int i=0; iSetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + PRAGMA YqlSelect = 'force'; + SELECT 1 as a, 2 as b; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + Y_UNIT_TEST(Filter) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < 10; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("name").String(std::to_string(i) + "_name") + .EndStruct(); + } + rows.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/foo", rows.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT id as id2 FROM `/Root/foo` WHERE name != '3_name'; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT id as id2 FROM `/Root/foo` WHERE name = '3_name'; + )", + }; + + std::vector results = { + R"([[0];[1];[2];[4];[5];[6];[7];[8];[9]])", + R"([[3]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(ConstantFolding) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < 10; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("name").String(std::to_string(i) + "_name") + .EndStruct(); + } + rows.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/foo", rows.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT id as id2 FROM `/Root/foo` WHERE id = 15 - 14 and 18-17 = 1; + )" + }; + + std::vector results = { + R"([[1]])", + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + /* + Y_UNIT_TEST(ScalarSubquery) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + + CREATE TABLE `/Root/bar` ( + id Int64 NOT NULL, + lastname String, + primary key(id) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rowsTableFoo; + rowsTableFoo.BeginList(); + for (size_t i = 0; i < 1; ++i) { + rowsTableFoo.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("name").String(std::to_string(i) + "_name") + .EndStruct(); + } + rowsTableFoo.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/foo", rowsTableFoo.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + NYdb::TValueBuilder rowsTableBar; + rowsTableBar.BeginList(); + for (size_t i = 0; i < 4; ++i) { + rowsTableBar.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("lastname").String(std::to_string(i) + "_name") + .EndStruct(); + } + rowsTableBar.EndList(); + + resultUpsert = db.BulkUpsert("/Root/bar", rowsTableBar.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + SELECT bar.id FROM `/Root/bar` where bar.id = (SELECT id FROM `/Root/foo`); + )" + }; + + // TODO: The order of result is not defined, we need order by to add more interesting tests. + std::vector results = { + R"([[0]])", + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(CrossInnerJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + + CREATE TABLE `/Root/bar` ( + id Int64 NOT NULL, + lastname String, + primary key(id) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rowsTableFoo; + rowsTableFoo.BeginList(); + for (size_t i = 0; i < 1; ++i) { + rowsTableFoo.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("name").String(std::to_string(i) + "_name") + .EndStruct(); + } + rowsTableFoo.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/foo", rowsTableFoo.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + NYdb::TValueBuilder rowsTableBar; + rowsTableBar.BeginList(); + for (size_t i = 0; i < 4; ++i) { + rowsTableBar.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("lastname").String(std::to_string(i) + "_name") + .EndStruct(); + } + rowsTableBar.EndList(); + + resultUpsert = db.BulkUpsert("/Root/bar", rowsTableBar.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT foo.id FROM `/Root/foo` inner join `/Root/bar` on foo.id = bar.id; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT foo.id FROM `/Root/foo` inner join `/Root/bar` on foo.id = bar.id WHERE name = '1_name'; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT f.id as id2 FROM `/Root/foo` AS f, `/Root/bar` WHERE f.id = bar.id and name = '0_name'; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT foo.id FROM `/Root/foo`, `/Root/bar`; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT foo.id, bar.id FROM `/Root/foo`, `/Root/bar`; + )", + }; + + // TODO: The order of result is not defined, we need order by to add more interesting tests. + std::vector results = { + R"([[0]])", + R"([])", + R"([[0]])", + R"([[0];[0];[0];[0]])", + R"([[0;0];[0;1];[0;2];[0;3]])", + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(PredicatePushdownLeftJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/t1` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t2` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + )").GetValueSync(); + + NYdb::TValueBuilder rowsTableT1; + rowsTableT1.BeginList(); + for (size_t i = 0; i < 2; ++i) { + rowsTableT1.AddListItem() + .BeginStruct() + .AddMember("a").Int64(i) + .AddMember("b").String(std::to_string(i) + "_b") + .AddMember("c").Int64(i + 1) + .EndStruct(); + } + rowsTableT1.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/t1", rowsTableT1.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + NYdb::TValueBuilder rowsTableT2; + rowsTableT2.BeginList(); + for (size_t i = 0; i < 1; ++i) { + rowsTableT2.AddListItem() + .BeginStruct() + .AddMember("a").Int64(i) + .AddMember("b").String(std::to_string(i) + "_b") + .AddMember("c").Int64(i + 1) + .EndStruct(); + } + rowsTableT2.EndList(); + + resultUpsert = db.BulkUpsert("/Root/t2", rowsTableT2.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a, t2.a FROM `/Root/t1` left join `/Root/t2` on t1.a = t2.a where t1.a = 0; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` left join `/Root/t2` on t1.a = t2.a where t2.b = 'some_string'; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` left join `/Root/t2` on t1.a = t2.a where t2.b IS NULL; + )", + }; + + std::vector results = { + R"([[0;0]])", + R"([])", + R"([[1]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + void InsertIntoSchema0(NYdb::NTable::TTableClient &db, std::string tableName, int numRows) { + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < numRows; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("a").Int64(i) + .AddMember("b").String(std::to_string(i) + "_b") + .AddMember("c").Int64(i + 1) + .EndStruct(); + } + rows.EndList(); + auto resultUpsert = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + } + + Y_UNIT_TEST(Aggregation) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/t1` ( + a Int64 NOT NULL, + b Int64, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t2` ( + a Int64 NOT NULL, + b Int64, + c Int64, + primary key(a) + ); + )").GetValueSync(); + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + NYdb::TValueBuilder rowsTableT1; + rowsTableT1.BeginList(); + for (size_t i = 0; i < 5; ++i) { + rowsTableT1.AddListItem() + .BeginStruct() + .AddMember("a").Int64(i) + .AddMember("b").Int64(i & 1 ? 1 : 2) + .AddMember("c").Int64(2) + .EndStruct(); + } + rowsTableT1.EndList(); + + auto resultUpsert = db.BulkUpsert("/Root/t1", rowsTableT1.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + NYdb::TValueBuilder rowsTableT2; + rowsTableT2.BeginList(); + for (size_t i = 0; i < 5; ++i) { + rowsTableT2.AddListItem() + .BeginStruct() + .AddMember("a").Int64(i) + .AddMember("b").Int64(i & 1 ? 1 : 2) + .AddMember("c").Int64(2) + .EndStruct(); + } + rowsTableT2.EndList(); + + resultUpsert = db.BulkUpsert("/Root/t2", rowsTableT2.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + select t1.b, sum(t1.c) from t1 group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select t1.b, sum(t1.c) from t1 inner join t2 on t1.a = t2.a group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select sum(t1.c) from t1 group by t1.b + union all + select sum(t1.b) from t1; + )", + R"( + PRAGMA YqlSelect = 'force'; + select t1.b, min(t1.a) from t1 group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select t1.b, max(t1.a) from t1 group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select t1.b, count(t1.a) from t1 group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select max(t1.b), min(t1.a) from t1; + )", + R"( + PRAGMA YqlSelect = 'force'; + select sum(t1.a) from t1 group by t1.b, t1.c; + )", + R"( + PRAGMA YqlSelect = 'force'; + select sum(t1.c), t1.b from t1 group by t1.b order by t1.b; + )", + R"( + PRAGMA YqlSelect = 'force'; + select max(t1.a), min(t1.a), min(t1.b) as min_b from t1; + )", + R"( + PRAGMA YqlSelect = 'force'; + select distinct t1.a, t1.b from t1 order by t1.a; + )", + R"( + PRAGMA YqlSelect = 'force'; + select distinct sum(t1.c) as sum_c, sum(t1.a) as sum_b from t1 group by t1.b order by sum_c; + )", + R"( + PRAGMA YqlSelect = 'force'; + select distinct min(t1.a) as min_a, max(t1.a) as max_a from t1 group by t1.b order by min_a; + )", + R"( + PRAGMA YqlSelect = 'force'; + select sum(t1.a + 1 + t1.c) as sumExpr0, sum(t1.c + 2) as sumExpr1 from t1 group by t1.b order by sumExpr0; + )", + }; + + std::vector results = { + R"([[1;4];[2;6]])", + R"([[1;4];[2;6]])", + R"([[6];[4];[8]])", + R"([[1;1];[2;0]])", + R"([[1;3];[2;4]])", + R"([[1;2];[2;3]])", + R"([[2;0]])", + R"([[6];[4]])", + R"([[4;1];[6;2]])", + R"([[4;0;1]])", + R"([[0;2];[1;1];[2;2];[3;1];[4;2]])", + R"([[4;4];[6;6]])", + R"([[0;4];[1;3]])", + R"([[10;8];[15;12]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + //Cout << "OUTPUT_RESULT " << FormatResultSetYson(result.GetResultSet(0)) << Endl; + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(UnionAll) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/t1` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t2` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t3` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t4` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + )").GetValueSync(); + + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + std::vector> tables{{"/Root/t1", 4}, {"/Root/t2", 3}, {"/Root/t3", 2}, {"/Root/t4", 1}}; + for (const auto &[table, rowsNum] : tables) { + InsertIntoSchema0(db, table, rowsNum); + } + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` + UNION ALL + SELECT t2.a FROM `/Root/t2`; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` + UNION ALL + SELECT t2.a FROM `/Root/t2` + UNION ALL + SELECT t3.a FROM `/Root/t3`; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` + UNION ALL + SELECT t2.a FROM `/Root/t2` + UNION ALL + SELECT t3.a FROM `/Root/t3` + UNION ALL + SELECT t4.a FROM `/Root/t4`; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT t1.a FROM `/Root/t1` inner join `/Root/t2` on t1.a = t2.a where t1.a > 1 + UNION ALL + SELECT t3.a FROM `/Root/t3` where t3.a = 1; + )", + }; + + std::vector results = { + R"([[0];[1];[2];[3];[0];[1];[2]])", + R"([[0];[1];[2];[3];[0];[1];[2];[0];[1]])", + R"([[0];[1];[2];[3];[0];[1];[2];[0];[1];[0]])", + R"([[2];[1]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(OrderBy) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/t1` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t2` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + )").GetValueSync(); + + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + std::vector> tables{{"/Root/t1", 4}, {"/Root/t2", 3}}; + for (const auto &[table, rowsNum] : tables) { + InsertIntoSchema0(db, table, rowsNum); + } + + std::vector queries = { + R"( + PRAGMA YqlSelect = 'force'; + SELECT a FROM `/Root/t1` + ORDER BY a DESC; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT a,c FROM `/Root/t1` + ORDER BY a DESC, c ASC; + )", + R"( + PRAGMA YqlSelect = 'force'; + SELECT a FROM `/Root/t1` + UNION ALL + SELECT a FROM `/Root/t2` + ORDER BY a DESC; + )" + }; + + std::vector results = { + R"([[3];[2];[1];[0]])", + R"([[3;4];[2;3];[1;2];[0;1]])", + R"([[3];[2];[2];[1];[1];[0];[0]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + Y_UNIT_TEST(LeftJoinToKqpOpJoin) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/t1` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t2` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t3` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + + CREATE TABLE `/Root/t4` ( + a Int64 NOT NULL, + b String, + c Int64, + primary key(a) + ); + )").GetValueSync(); + + + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + std::vector> tables{{"/Root/t1", 4}, {"/Root/t2", 3}, {"/Root/t3", 2}, {"/Root/t4", 1}}; + for (const auto &[table, rowsNum] : tables) { + InsertIntoSchema0(db, table, rowsNum); + } + + std::vector queries = { + R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT t1.a FROM t1 left join t2 on t1.a = t2.a where t2.b = '0_b'; + )", + R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT t1.a FROM t1 left join t2 on t1.a = t2.a left join t3 on t2.a = t3.a where t3.b = '0_b'; + )", + R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT t1.a FROM t1 left join t2 on t1.a = t2.a left join t3 on t2.a = t3.a left join t4 on t3.a = t4.a and t4.c = t2.c and t1.c = t4.c where t4.b = '0_b'; + )", + }; + + std::vector results = { + R"([["0"]])", + R"([["0"]])", + R"([["0"]])" + }; + + for (ui32 i = 0; i < queries.size(); ++i) { + const auto &query = queries[i]; + auto result = session2.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); + } + } + + void InsertIntoAliasesRenames(NYdb::NTable::TTableClient &db, std::string tableName, int numRows) { + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < numRows; ++i) { + rows.AddListItem() + .BeginStruct() + .AddMember("id").Int64(i) + .AddMember("join_id").Int64(i + 1) + .AddMember("c").Int64(i + 2) + .EndStruct(); + } + rows.EndList(); + auto resultUpsert = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); + } + + void AliasesRenamesTest(bool newRbo) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(newRbo); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo_0` ( + id Int64 NOT NULL, + join_id Int64 NOT NULL, + c Int64, + primary key(id) + ); + + CREATE TABLE `/Root/foo_1` ( + id Int64 NOT NULL, + join_id Int64 NOT NULL, + c Int64, + primary key(id) + ); + + CREATE TABLE `/Root/foo_2` ( + id Int64 NOT NULL, + join_id Int64 NOT NULL, + c Int64, + primary key(id) + ); + + )").GetValueSync(); + + std::vector> tables{{"/Root/foo_0", 4}, {"/Root/foo_1", 3}, {"/Root/foo_2", 2}}; + for (const auto &[table, rowsNum] : tables) { + InsertIntoAliasesRenames(db, table, rowsNum); + } + db = kikimr.GetTableClient(); + auto session2 = db.CreateSession().GetValueSync().GetSession(); + + auto result = session2.ExecuteDataQuery(R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + + WITH cte as ( + SELECT a1.id2, join_id FROM (SELECT id as "id2", join_id FROM foo_0) as a1) + + SELECT X1.id2, X2.id2 + FROM + (SELECT id2 + FROM foo_1, cte + WHERE foo_1.join_id = cte.join_id) as X1, + + (SELECT id2 + FROM foo_2, cte + WHERE foo_2.join_id = cte.join_id) as X2; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), R"([["0";"0"];["0";"1"];["1";"0"];["1";"1"];["2";"0"];["2";"1"]])"); + } + + Y_UNIT_TEST(AliasesRenames) { + AliasesRenamesTest(true); + AliasesRenamesTest(false); + } + + Y_UNIT_TEST(Bench_Select) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + + auto time = TimeQuery(kikimr, R"( + --!syntax_pg + SELECT 1 as "a", 2 as "b"; + )", 10); + + Cout << "Time per query: " << time; + + //UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + Y_UNIT_TEST(Bench_Filter) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + )").GetValueSync(); + + auto time = TimeQuery(kikimr, R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT id as "id2" FROM foo WHERE name = 'some_name'; + )",10); + + Cout << "Time per query: " << time; + } + + Y_UNIT_TEST(Bench_CrossFilter) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + + CREATE TABLE `/Root/bar` ( + id Int64 NOT NULL, + lastname String, + primary key(id) + ); + )").GetValueSync(); + + auto time = TimeQuery(kikimr, R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT f.id as "id2" FROM foo AS f, bar WHERE name = 'some_name'; + )", 10); + + Cout << "Time per query: " << time; + } + + Y_UNIT_TEST(Bench_JoinFilter) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/foo` ( + id Int64 NOT NULL, + name String, + primary key(id) + ); + + CREATE TABLE `/Root/bar` ( + id Int64 NOT NULL, + lastname String, + primary key(id) + ); + )").GetValueSync(); + + auto time = TimeQuery(kikimr, R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + SELECT f.id as "id2" FROM foo AS f, bar WHERE f.id = bar.id and name = 'some_name'; + )", 10); + + Cout << "Time per query: " << time; + } + + Y_UNIT_TEST(Bench_10Joins) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto schema = R"( +CREATE TABLE `/Root/foo_0` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_1` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_2` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_3` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_4` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_5` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_6` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_7` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_8` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + + + CREATE TABLE `/Root/foo_9` ( + id Int64 NOT NULL, + join_id Int64, + primary key(id) + ); + )"; + + auto query = R"( + --!syntax_pg + SET TablePathPrefix = "/Root/"; + + SELECT foo_0.id as "id2" + FROM foo_0, foo_1, foo_2, foo_3, foo_4, foo_5, foo_6, foo_7, foo_8, foo_9 + WHERE foo_0.join_id = foo_1.id AND foo_0.join_id = foo_2.id AND foo_0.join_id = foo_3.id AND foo_0.join_id = foo_4.id AND foo_0.join_id = foo_5.id AND foo_0.join_id = foo_6.id AND foo_0.join_id = foo_7.id AND foo_0.join_id = foo_8.id AND foo_0.join_id = foo_9.id; + + )"; + + auto time = TimeQuery(schema, query, 10); + + Cout << "Time per query: " << time; + } + + */ +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/rbo/ya.make b/ydb/core/kqp/ut/rbo/ya.make index e2366f69ec79..0fa102bcb2cd 100644 --- a/ydb/core/kqp/ut/rbo/ya.make +++ b/ydb/core/kqp/ut/rbo/ya.make @@ -5,7 +5,8 @@ FORK_SUBTESTS() SIZE(MEDIUM) SRCS( - kqp_rbo_ut.cpp + kqp_rbo_pg_ut.cpp + kqp_rbo_yql_ut.cpp ) PEERDIR( From c119de6fb675ca02c731fc18f85e3c51dc51d91b Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Wed, 19 Nov 2025 17:55:09 +0000 Subject: [PATCH 4/6] Update before rebased --- ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 9 +++++++++ ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp | 3 +++ ydb/core/protos/table_service_config.proto | 7 +++++++ 3 files changed, 19 insertions(+) diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7d76e1f69062..b38285d864c8 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -757,6 +757,15 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.DefaultHashShuffleFuncType = NYql::NDq::EHashShuffleFuncType::HashV2; break; } + + switch(serviceConfig.GetBackportMode()) { + case NKikimrConfig::TTableServiceConfig_EBackportMode_Released: + kqpConfig.BackportMode = NYql::EBackportCompatibleFeaturesMode::Released; + break; + case NKikimrConfig::TTableServiceConfig_EBackportMode_All: + kqpConfig.BackportMode = NYql::EBackportCompatibleFeaturesMode::All; + break; + } } IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp index 087186e407c5..b4a42bb5ad24 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp @@ -69,6 +69,7 @@ Y_UNIT_TEST_SUITE(KqpRboYql) { Y_UNIT_TEST(Select) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + appConfig.MutableTableServiceConfig()->SetBackportMode(NKikimrConfig::TTableServiceConfig_EBackportMode_All); TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -84,6 +85,7 @@ Y_UNIT_TEST_SUITE(KqpRboYql) { Y_UNIT_TEST(Filter) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + appConfig.MutableTableServiceConfig()->SetBackportMode(NKikimrConfig::TTableServiceConfig_EBackportMode_All); TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -140,6 +142,7 @@ Y_UNIT_TEST_SUITE(KqpRboYql) { Y_UNIT_TEST(ConstantFolding) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); + appConfig.MutableTableServiceConfig()->SetBackportMode(NKikimrConfig::TTableServiceConfig_EBackportMode_All); TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index df2201fe7de6..78c340671885 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -439,4 +439,11 @@ message TTableServiceConfig { optional bool EnableBuildAggregationResultStages = 106 [default = true, (InvalidateCompileCache) = true]; optional bool EnforceSqlVersionV1 = 107 [default = true, (InvalidateCompileCache) = true]; + + enum EBackportMode { + Released = 0; + All = 1; + } + + optional EBackportMode BackportMode = 108 [ default = Released, (InvalidateCompileCache) = true]; }; From d1a3a03c924e52d41e3fedbdaf09dfc44ab2d481 Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Wed, 19 Nov 2025 19:20:27 +0000 Subject: [PATCH 5/6] Got it all working --- ydb/core/kqp/host/kqp_translate.cpp | 1 + ydb/core/kqp/host/kqp_translate.h | 5 +++++ ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp | 7 ++++++- ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp | 2 ++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/host/kqp_translate.cpp b/ydb/core/kqp/host/kqp_translate.cpp index 42e4acd490bc..9f008cc82b66 100644 --- a/ydb/core/kqp/host/kqp_translate.cpp +++ b/ydb/core/kqp/host/kqp_translate.cpp @@ -187,6 +187,7 @@ TKqpTranslationSettingsBuilder& TKqpTranslationSettingsBuilder::SetFromConfig(co // only options that should be specified for all types of queries // including views and etc.. SetLangVer(config.LangVer); + SetBackportMode(config.BackportMode); SetIsAmbiguityError(config.Antlr4ParserIsAmbiguityError); return *this; } diff --git a/ydb/core/kqp/host/kqp_translate.h b/ydb/core/kqp/host/kqp_translate.h index f8d348e997f9..014661f4da7d 100644 --- a/ydb/core/kqp/host/kqp_translate.h +++ b/ydb/core/kqp/host/kqp_translate.h @@ -151,6 +151,11 @@ class TKqpTranslationSettingsBuilder { return *this; } + TKqpTranslationSettingsBuilder& SetBackportMode(NYql::EBackportCompatibleFeaturesMode backportMode) { + BackportMode = backportMode; + return *this; + } + TKqpTranslationSettingsBuilder& SetIsAmbiguityError(bool isAmbiguityError) { IsAmbiguityError = isAmbiguityError; return *this; diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp index e6874965b5c7..017d833f1d71 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp @@ -14,7 +14,12 @@ namespace { TExprNode::TPtr PushTakeIntoPlan(const TExprNode::TPtr &node, TExprContext &ctx, const TTypeAnnotationContext &typeCtx) { Y_UNUSED(typeCtx); auto take = TCoTake(node); - if (auto root = take.Input().Maybe()) { + auto takeInput = take.Input(); + if (takeInput.Maybe()) { + takeInput = takeInput.Cast().Input(); + } + + if (auto root = takeInput.Maybe()) { // clang-format off return Build(ctx, node->Pos()) .Input() diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp index b4a42bb5ad24..7ca3d8b7d455 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_yql_ut.cpp @@ -86,6 +86,8 @@ Y_UNIT_TEST_SUITE(KqpRboYql) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); appConfig.MutableTableServiceConfig()->SetBackportMode(NKikimrConfig::TTableServiceConfig_EBackportMode_All); + appConfig.MutableTableServiceConfig()->SetDefaultLangVer(NYql::GetMaxLangVersion()); + TKikimrRunner kikimr(NKqp::TKikimrSettings(appConfig).SetWithSampleTables(false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); From 93b2d804a7f29e8d7898d202b5867b4277db56f7 Mon Sep 17 00:00:00 2001 From: Pavel Velikhov Date: Wed, 19 Nov 2025 19:39:56 +0000 Subject: [PATCH 6/6] Commented out aggregation ut --- ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp b/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp index ebc25c2cf7f8..0920282dde9f 100644 --- a/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp +++ b/ydb/core/kqp/ut/rbo/kqp_rbo_pg_ut.cpp @@ -480,6 +480,7 @@ Y_UNIT_TEST_SUITE(KqpRboPg) { UNIT_ASSERT_C(resultUpsert.IsSuccess(), resultUpsert.GetIssues().ToString()); } + /* Y_UNIT_TEST(Aggregation) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableNewRBO(true); @@ -666,6 +667,7 @@ Y_UNIT_TEST_SUITE(KqpRboPg) { UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), results[i]); } } + */ Y_UNIT_TEST(UnionAll) { NKikimrConfig::TAppConfig appConfig;