Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,15 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.DefaultHashShuffleFuncType = NYql::NDq::EHashShuffleFuncType::HashV2;
break;
}

switch(serviceConfig.GetBackportMode()) {
case NKikimrConfig::TTableServiceConfig_EBackportMode_Released:
kqpConfig.BackportMode = NYql::EBackportCompatibleFeaturesMode::Released;
break;
case NKikimrConfig::TTableServiceConfig_EBackportMode_All:
kqpConfig.BackportMode = NYql::EBackportCompatibleFeaturesMode::All;
break;
}
}

IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,8 @@
"Match" : {"Type": "Callable", "Name": "KqpOpRoot"},
"Children": [
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "ColumnOrder", "Type": "TCoAtomList"}
{"Index": 1, "Name": "ColumnOrder", "Type": "TCoAtomList"},
{"Index": 2, "Name": "PgSyntax", "Type": "TCoAtom"}
]
},
{
Expand Down Expand Up @@ -930,9 +931,9 @@
]
},
{
"Name": "TKqpPgExprSublink",
"Name": "TKqpExprSublink",
"Base": "TExprBase",
"Match" : {"Type": "Callable", "Name": "KqpPgExprSublink"},
"Match" : {"Type": "Callable", "Name": "KqpExprSublink"},
"Children": [
{"Index": 0, "Name": "Expr", "Type": "TExprBase"}
]
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,7 @@ class TKqpHost : public IKqpHost {
const TGatewaysConfig* gatewaysConfig = nullptr; // TODO: can we get real gatewaysConfig here?
auto allowSettings = [](TStringBuf settingName) {
return settingName == "OrderedColumns"
|| settingName == "DeriveColumnOrder"
|| settingName == "DisableOrderedColumns"
|| settingName == "Warning"
|| settingName == "UseBlocks"
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class TKqpRunner : public IKqpRunner {
.AddPostTypeAnnotation(/* forSubgraph */ true)
//.AddCommonOptimization()

.Add(CreateKqpPgRewriteTransformer(OptimizeCtx, *typesCtx), "RewritePgSelect")
.Add(CreateKqpRewriteSelectTransformer(OptimizeCtx, *typesCtx), "RewriteSelect")
.Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, rboKqpTypeAnnTransformer, kqpTypeAnnTransformer, newRBOPhysicalPeepholeTransformer, funcRegistry), "NewRBOTransformer")
.Add(CreateKqpRBOCleanupTransformer(*typesCtx), "RBOCleanupTransformer")

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_translate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ TKqpTranslationSettingsBuilder& TKqpTranslationSettingsBuilder::SetFromConfig(co
// only options that should be specified for all types of queries
// including views and etc..
SetLangVer(config.LangVer);
SetBackportMode(config.BackportMode);
SetIsAmbiguityError(config.Antlr4ParserIsAmbiguityError);
return *this;
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/host/kqp_translate.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ class TKqpTranslationSettingsBuilder {
return *this;
}

TKqpTranslationSettingsBuilder& SetBackportMode(NYql::EBackportCompatibleFeaturesMode backportMode) {
BackportMode = backportMode;
return *this;
}

TKqpTranslationSettingsBuilder& SetIsAmbiguityError(bool isAmbiguityError) {
IsAmbiguityError = isAmbiguityError;
return *this;
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/kqp/opt/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2251,8 +2251,8 @@ TStatus AnnotateTableSinkSettings(const TExprNode::TPtr& input, TExprContext& ct
return TStatus::Ok;
}

TStatus AnnotatePgExprSublink(const TExprNode::TPtr& node, TExprContext& ctx) {
auto expr = node->Child(TKqpPgExprSublink::idx_Expr);
TStatus AnnotateExprSublink(const TExprNode::TPtr& node, TExprContext& ctx) {
auto expr = node->Child(TKqpExprSublink::idx_Expr);
auto itemType = expr->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
auto valueType = itemType->GetItems()[0]->GetItemType();
if (!valueType->IsOptionalOrNull()) {
Expand Down Expand Up @@ -2290,7 +2290,10 @@ TStatus AnnotateOpRead(const TExprNode::TPtr& node, TExprContext& ctx, const TSt
TVector<const TItemExprType*> structItemTypes = rowType->Cast<TStructExprType>()->GetItems();
TVector<const TItemExprType*> newItemTypes;
for (auto t : structItemTypes ) {
newItemTypes.push_back(ctx.MakeType<TItemExprType>("_alias_" + TString(alias->Content()) + "." + t->GetName(), t->GetItemType()));
TString aliasName = TString(alias->Content());
TString columnName = TString(t->GetName());
TString fullName = aliasName != "" ? ( "_alias_" + aliasName + "." + columnName ) : columnName;
newItemTypes.push_back(ctx.MakeType<TItemExprType>(fullName, t->GetItemType()));
}

YQL_CLOG(TRACE, CoreDq) << "Row type:" << *rowType;
Expand Down Expand Up @@ -2710,8 +2713,8 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateTableSinkSettings(input, ctx);
}

if (TKqpPgExprSublink::Match(input.Get())) {
return AnnotatePgExprSublink(input, ctx);
if (TKqpExprSublink::Match(input.Get())) {
return AnnotateExprSublink(input, ctx);
}

if (TKqpOpRead::Match(input.Get())) {
Expand Down
21 changes: 17 additions & 4 deletions ydb/core/kqp/opt/rbo/kqp_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext &ctx, TVector<TIn
for (auto iu : renames) {
// clang-format off
auto tuple = Build<TCoNameValueTuple>(ctx, input->Pos())
.Name().Build("_alias_" + iu.Alias + "." + iu.ColumnName)
.Name().Build(iu.GetFullName())
.Value<TCoMember>()
.Struct(arg)
.Name().Build(iu.ColumnName)
Expand Down Expand Up @@ -668,6 +668,19 @@ TVector<TInfoUnit> TOpFilter::GetScalarSubplanIUs(TPlanProps& props) {
return res;
}

bool TestAndExtractEqualityPredicate(TExprNode::TPtr pred, TExprNode::TPtr& leftArg, TExprNode::TPtr& rightArg) {
if (pred->IsCallable("PgResolvedOp") && pred->Child(0)->Content() == "=") {
leftArg = pred->Child(2);
rightArg = pred->Child(3);
return true;
} else if (pred->IsCallable("==")) {
leftArg = pred->Child(0);
rightArg = pred->Child(1);
return true;
}
return false;
}

TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const {
TConjunctInfo res;

Expand All @@ -686,9 +699,9 @@ TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const {
fromPg = true;
}

if (conjObj->IsCallable("PgResolvedOp") && conjObj->Child(0)->Content() == "=") {
auto leftArg = conjObj->Child(2);
auto rightArg = conjObj->Child(3);
TExprNode::TPtr leftArg;
TExprNode::TPtr rightArg;
if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs, props);

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/opt/rbo/kqp_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ struct TPlanProps {
TStageGraph StageGraph;
int InternalVarIdx = 1;
TScalarSubplans ScalarSubplans;
bool PgSyntax = false;
};


Expand Down Expand Up @@ -411,6 +412,8 @@ class TOpFilter : public IUnaryOperator {
TExprNode::TPtr FilterLambda;
};

bool TestAndExtractEqualityPredicate(TExprNode::TPtr pred, TExprNode::TPtr& leftArg, TExprNode::TPtr& rightArg);

class TOpJoin : public IBinaryOperator {
public:
TOpJoin(std::shared_ptr<IOperator> leftArg, std::shared_ptr<IOperator> rightArg, TPositionHandle pos, TString joinKind,
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/opt/rbo/kqp_plan_conversion_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TExprNode::TPtr PlanConverter::RemoveScalarSubplans(TExprNode::TPtr node) {
auto lambda = TCoLambda(node);
auto lambdaBody = lambda.Body().Ptr();

auto exprSublinks = FindNodes(lambdaBody, [](const TExprNode::TPtr& n){return n->IsCallable("KqpPgExprSublink");});
auto exprSublinks = FindNodes(lambdaBody, [](const TExprNode::TPtr& n){return n->IsCallable("KqpExprSublink");});
if (exprSublinks.empty()) {
return node;
}
Expand All @@ -26,7 +26,7 @@ TExprNode::TPtr PlanConverter::RemoveScalarSubplans(TExprNode::TPtr node) {
.Name<TCoAtom>().Value(sublinkVar.GetFullName()).Build()
.Done().Ptr();
replaceMap[link.Get()] = member;
auto subplan = ExprNodeToOperator(TKqpPgExprSublink(link).Expr().Ptr());
auto subplan = ExprNodeToOperator(TKqpExprSublink(link).Expr().Ptr());
PlanProps.ScalarSubplans.Add(sublinkVar, subplan);
}

Expand All @@ -53,6 +53,7 @@ TOpRoot PlanConverter::ConvertRoot(TExprNode::TPtr node) {
auto res = TOpRoot(rootInput, node->Pos(), columnOrder);
res.Node = node;
res.PlanProps = PlanProps;
res.PlanProps.PgSyntax = std::stoi(opRoot.PgSyntax().StringValue());
return res;
}

Expand Down
21 changes: 11 additions & 10 deletions ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ TExprNode::TPtr FindMemberArg(TExprNode::TPtr input) {
return TExprNode::TPtr();
}

TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVector<TFilterInfo> conjuncts, TExprContext &ctx) {
TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVector<TFilterInfo> conjuncts, TExprContext &ctx, bool pgSyntax) {
auto arg = Build<TCoArgument>(ctx, pos).Name("lambda_arg").Done();
TExprNode::TPtr lambda;

if (conjuncts.size() == 1) {
auto filterInfo = conjuncts[0];
auto body = ReplaceArg(filterInfo.FilterBody, arg.Ptr(), ctx);
if (!filterInfo.FromPg) {
if (pgSyntax && !filterInfo.FromPg) {
body = ctx.Builder(body->Pos()).Callable("FromPg").Add(0, body).Seal().Build();
}

Expand All @@ -89,7 +89,7 @@ TExprNode::TPtr BuildFilterLambdaFromConjuncts(TPositionHandle pos, TVector<TFil

for (auto c : conjuncts) {
auto body = ReplaceArg(c.FilterBody, arg.Ptr(), ctx);
if (!c.FromPg) {
if (pgSyntax && !c.FromPg) {
body = ctx.Builder(body->Pos()).Callable("FromPg").Add(0, body).Seal().Build();
}
newConjuncts.push_back(ReplaceArg(body, arg.Ptr(), ctx));
Expand Down Expand Up @@ -176,9 +176,10 @@ bool TExtractJoinExpressionsRule::TestAndApply(std::shared_ptr<IOperator> &input
predicate = predicate->Child(0);
}

if (predicate->IsCallable("PgResolvedOp") && predicate->Child(0)->Content() == "=") {
auto leftSide = predicate->Child(2);
auto rightSide = predicate->Child(3);
TExprNode::TPtr leftSide;
TExprNode::TPtr rightSide;

if (TestAndExtractEqualityPredicate(predicate, leftSide, rightSide)) {

if (leftSide->IsCallable("Member") && rightSide->IsCallable("Member")) {
continue;
Expand Down Expand Up @@ -489,7 +490,7 @@ std::shared_ptr<IOperator> TPushFilterRule::SimpleTestAndApply(const std::shared
auto rightInput = join->GetRightInput();

if (pushLeft.size()) {
auto leftLambda = BuildFilterLambdaFromConjuncts(leftInput->Pos, pushLeft, ctx.ExprCtx);
auto leftLambda = BuildFilterLambdaFromConjuncts(leftInput->Pos, pushLeft, ctx.ExprCtx, props.PgSyntax);
leftInput = std::make_shared<TOpFilter>(leftInput, input->Pos, leftLambda);
}

Expand All @@ -504,14 +505,14 @@ std::shared_ptr<IOperator> TPushFilterRule::SimpleTestAndApply(const std::shared
}
}
if (predicatesForRightSide.size()) {
auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx);
auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx, props.PgSyntax);
rightInput = std::make_shared<TOpFilter>(rightInput, input->Pos, rightLambda);
join->JoinKind = "Inner";
} else {
return input;
}
} else {
auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx);
auto rightLambda = BuildFilterLambdaFromConjuncts(rightInput->Pos, pushRight, ctx.ExprCtx, props.PgSyntax);
rightInput = std::make_shared<TOpFilter>(rightInput, input->Pos, rightLambda);
}
}
Expand All @@ -524,7 +525,7 @@ std::shared_ptr<IOperator> TPushFilterRule::SimpleTestAndApply(const std::shared
join->Children[1] = rightInput;

if (topLevelPreds.size()) {
auto topFilterLambda = BuildFilterLambdaFromConjuncts(join->Pos, topLevelPreds, ctx.ExprCtx);
auto topFilterLambda = BuildFilterLambdaFromConjuncts(join->Pos, topLevelPreds, ctx.ExprCtx, props.PgSyntax);
output = std::make_shared<TOpFilter>(join, input->Pos, topFilterLambda);
} else {
output = join;
Expand Down
Loading
Loading