From 0736b63c88c32c88dff5dfa8094d7bc4e0fef272 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Fri, 27 Jun 2025 15:22:43 +0300 Subject: [PATCH] Add IsIndexImplTable to TKqpTableSinkSettings expr and proto --- ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 3 ++- ydb/core/kqp/opt/kqp_opt_effects.cpp | 20 +++++++++++++++---- .../kqp/query_compiler/kqp_query_compiler.cpp | 4 ++++ ydb/core/protos/kqp.proto | 1 + 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 71a8a8ff33ee..a4a10d956fb9 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -539,7 +539,8 @@ {"Index": 3, "Name": "Priority", "Type": "TCoAtom"}, {"Index": 4, "Name": "StreamWrite", "Type": "TCoAtom"}, {"Index": 5, "Name": "IsBatch", "Type": "TCoAtom"}, - {"Index": 6, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 6, "Name": "IsIndexImplTable", "Type": "TCoAtom"}, + {"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] }, { diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index 5323d252bbad..9a80c0d1ffb8 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -29,7 +29,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const bool allowInconsistentWrites, const bool enableStreamWrite, bool isBatch, - const TStringBuf mode, const TVector& settings, const i64 order, TExprContext& ctx) { + const TStringBuf mode, const bool isIndexImplTable, const TVector& settings, const i64 order, TExprContext& ctx) { Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr)); auto settingsNode = Build(ctx, expr.Pos()) @@ -65,6 +65,9 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, .IsBatch(isBatch ? ctx.NewAtom(expr.Pos(), "true") : ctx.NewAtom(expr.Pos(), "false")) + .IsIndexImplTable(isIndexImplTable + ? ctx.NewAtom(expr.Pos(), "true") + : ctx.NewAtom(expr.Pos(), "false")) .Settings(settingsNode) .Build() .Build() @@ -127,7 +130,7 @@ bool BuildFillTableEffect(const TKqlFillTable& node, TExprContext& ctx, stageInput = RebuildPureStageWithSink( node.Input(), table, /* allowInconsistentWrites */ true, /* useStreamWrite */ true, - /* isBatch */ false, "fill_table", settings, + /* isBatch */ false, "fill_table", /* isIndexImplTable */ false, settings, priority, ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) @@ -162,6 +165,7 @@ bool BuildFillTableEffect(const TKqlFillTable& node, TExprContext& ctx, .Mode(ctx.NewAtom(node.Pos(), "fill_table")) .Priority(ctx.NewAtom(node.Pos(), ToString(priority))) .IsBatch(ctx.NewAtom(node.Pos(), "false")) + .IsIndexImplTable(ctx.NewAtom(node.Pos(), "false")) .Settings(settingsNode) .Build() .Done(); @@ -212,6 +216,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const const bool useStreamWriteForConsistentSink = CanEnableStreamWrite(table, kqpCtx) && (!HasReadTable(node.Table().PathId().Value(), node.Input().Ptr()) || settings.IsConditionalUpdate); const bool useStreamWrite = sinkEffect && (settings.AllowInconsistentWrites || useStreamWriteForConsistentSink); + const bool isIndexImplTable = table.Metadata->IsIndexImplTable; const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap); const i64 priority = isOlap ? 0 : order; @@ -221,7 +226,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const stageInput = RebuildPureStageWithSink( node.Input(), node.Table(), settings.AllowInconsistentWrites, useStreamWrite, - node.IsBatch() == "true", settings.Mode, {}, priority, ctx); + node.IsBatch() == "true", settings.Mode, isIndexImplTable, {}, priority, ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) .SinkIndex().Build("0") @@ -267,6 +272,9 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .Mode(ctx.NewAtom(node.Pos(), settings.Mode)) .Priority(ctx.NewAtom(node.Pos(), ToString(priority))) .IsBatch(node.IsBatch()) + .IsIndexImplTable(isIndexImplTable + ? ctx.NewAtom(node.Pos(), "true") + : ctx.NewAtom(node.Pos(), "false")) .Settings() .Build() .Build() @@ -354,6 +362,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const const bool useStreamWriteForConsistentSink = CanEnableStreamWrite(table, kqpCtx) && (!HasReadTable(node.Table().PathId().Value(), node.Input().Ptr()) || settings.IsConditionalDelete); const bool useStreamWrite = sinkEffect && useStreamWriteForConsistentSink; + const bool isIndexImplTable = table.Metadata->IsIndexImplTable; const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap); const i64 priority = isOlap ? 0 : order; @@ -364,7 +373,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const stageInput = RebuildPureStageWithSink( node.Input(), node.Table(), false, useStreamWrite, node.IsBatch() == "true", - "delete", {}, priority, ctx); + "delete", isIndexImplTable, {}, priority, ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) .SinkIndex().Build("0") @@ -406,6 +415,9 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const .Mode(ctx.NewAtom(node.Pos(), "delete")) .Priority(ctx.NewAtom(node.Pos(), ToString(priority))) .IsBatch(node.IsBatch()) + .IsIndexImplTable(isIndexImplTable + ? ctx.NewAtom(node.Pos(), "true") + : ctx.NewAtom(node.Pos(), "false")) .Settings() .Build() .Build() diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index ccdf306fcb70..01f57c995c33 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1390,6 +1390,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { settingsProto.SetIsBatch(true); } + if (const auto isIndexImplTable = settings.IsIndexImplTable().Cast(); isIndexImplTable.StringValue() == "true") { + settingsProto.SetIsIndexImplTable(true); + } + internalSinkProto.MutableSettings()->PackFrom(settingsProto); } else { YQL_ENSURE(false, "Unsupported sink type"); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index b2ffbd40a300..ee160c8d5c70 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -823,6 +823,7 @@ message TKqpTableSinkSettings { repeated string InputColumns = 18; // Only for MODE_FILL repeated TKqpTableSinkIndexSettings Indexes = 19; repeated TKqpColumnMetadataProto LookupColumns = 20; + optional bool IsIndexImplTable = 21; } message TKqpStreamLookupSettings {