Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@
{"Index": 3, "Name": "Priority", "Type": "TCoAtom"},
{"Index": 4, "Name": "StreamWrite", "Type": "TCoAtom"},
{"Index": 5, "Name": "IsBatch", "Type": "TCoAtom"},
{"Index": 6, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
{"Index": 6, "Name": "IsIndexImplTable", "Type": "TCoAtom"},
{"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
},
{
Expand Down
20 changes: 16 additions & 4 deletions ydb/core/kqp/opt/kqp_opt_effects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH

TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table,
const bool allowInconsistentWrites, const bool enableStreamWrite, bool isBatch,
const TStringBuf mode, const TVector<TCoNameValueTuple>& settings, const i64 order, TExprContext& ctx) {
const TStringBuf mode, const bool isIndexImplTable, const TVector<TCoNameValueTuple>& settings, const i64 order, TExprContext& ctx) {
Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));

auto settingsNode = Build<TCoNameValueTupleList>(ctx, expr.Pos())
Expand Down Expand Up @@ -65,6 +65,9 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table,
.IsBatch(isBatch
? ctx.NewAtom(expr.Pos(), "true")
: ctx.NewAtom(expr.Pos(), "false"))
.IsIndexImplTable(isIndexImplTable
? ctx.NewAtom(expr.Pos(), "true")
: ctx.NewAtom(expr.Pos(), "false"))
.Settings(settingsNode)
.Build()
.Build()
Expand Down Expand Up @@ -127,7 +130,7 @@ bool BuildFillTableEffect(const TKqlFillTable& node, TExprContext& ctx,
stageInput = RebuildPureStageWithSink(
node.Input(), table,
/* allowInconsistentWrites */ true, /* useStreamWrite */ true,
/* isBatch */ false, "fill_table", settings,
/* isBatch */ false, "fill_table", /* isIndexImplTable */ false, settings,
priority, ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
Expand Down Expand Up @@ -162,6 +165,7 @@ bool BuildFillTableEffect(const TKqlFillTable& node, TExprContext& ctx,
.Mode(ctx.NewAtom(node.Pos(), "fill_table"))
.Priority(ctx.NewAtom(node.Pos(), ToString(priority)))
.IsBatch(ctx.NewAtom(node.Pos(), "false"))
.IsIndexImplTable(ctx.NewAtom(node.Pos(), "false"))
.Settings(settingsNode)
.Build()
.Done();
Expand Down Expand Up @@ -212,6 +216,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
const bool useStreamWriteForConsistentSink = CanEnableStreamWrite(table, kqpCtx)
&& (!HasReadTable(node.Table().PathId().Value(), node.Input().Ptr()) || settings.IsConditionalUpdate);
const bool useStreamWrite = sinkEffect && (settings.AllowInconsistentWrites || useStreamWriteForConsistentSink);
const bool isIndexImplTable = table.Metadata->IsIndexImplTable;

const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap);
const i64 priority = isOlap ? 0 : order;
Expand All @@ -221,7 +226,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
stageInput = RebuildPureStageWithSink(
node.Input(), node.Table(),
settings.AllowInconsistentWrites, useStreamWrite,
node.IsBatch() == "true", settings.Mode, {}, priority, ctx);
node.IsBatch() == "true", settings.Mode, isIndexImplTable, {}, priority, ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
.SinkIndex().Build("0")
Expand Down Expand Up @@ -267,6 +272,9 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.Mode(ctx.NewAtom(node.Pos(), settings.Mode))
.Priority(ctx.NewAtom(node.Pos(), ToString(priority)))
.IsBatch(node.IsBatch())
.IsIndexImplTable(isIndexImplTable
? ctx.NewAtom(node.Pos(), "true")
: ctx.NewAtom(node.Pos(), "false"))
.Settings()
.Build()
.Build()
Expand Down Expand Up @@ -354,6 +362,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
const bool useStreamWriteForConsistentSink = CanEnableStreamWrite(table, kqpCtx)
&& (!HasReadTable(node.Table().PathId().Value(), node.Input().Ptr()) || settings.IsConditionalDelete);
const bool useStreamWrite = sinkEffect && useStreamWriteForConsistentSink;
const bool isIndexImplTable = table.Metadata->IsIndexImplTable;

const bool isOlap = (table.Metadata->Kind == EKikimrTableKind::Olap);
const i64 priority = isOlap ? 0 : order;
Expand All @@ -364,7 +373,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
stageInput = RebuildPureStageWithSink(
node.Input(), node.Table(),
false, useStreamWrite, node.IsBatch() == "true",
"delete", {}, priority, ctx);
"delete", isIndexImplTable, {}, priority, ctx);
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
.Stage(stageInput.Cast().Ptr())
.SinkIndex().Build("0")
Expand Down Expand Up @@ -406,6 +415,9 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
.Mode(ctx.NewAtom(node.Pos(), "delete"))
.Priority(ctx.NewAtom(node.Pos(), ToString(priority)))
.IsBatch(node.IsBatch())
.IsIndexImplTable(isIndexImplTable
? ctx.NewAtom(node.Pos(), "true")
: ctx.NewAtom(node.Pos(), "false"))
.Settings()
.Build()
.Build()
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
settingsProto.SetIsBatch(true);
}

if (const auto isIndexImplTable = settings.IsIndexImplTable().Cast(); isIndexImplTable.StringValue() == "true") {
settingsProto.SetIsIndexImplTable(true);
}

internalSinkProto.MutableSettings()->PackFrom(settingsProto);
} else {
YQL_ENSURE(false, "Unsupported sink type");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ message TKqpTableSinkSettings {
repeated string InputColumns = 18; // Only for MODE_FILL
repeated TKqpTableSinkIndexSettings Indexes = 19;
repeated TKqpColumnMetadataProto LookupColumns = 20;
optional bool IsIndexImplTable = 21;
}

message TKqpStreamLookupSettings {
Expand Down
Loading