From 89507cf4cccdb7644a8ecf09cf25d579ff9cac0a Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Fri, 14 Nov 2025 17:30:59 +0300 Subject: [PATCH 1/3] init --- ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 9385a2011000..10aa74cfd81d 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -2764,7 +2764,7 @@ TMaybe TKqpTasksGraph::BuildScanTasksFromSource(TStageInfo& stageInfo, b bool isSequentialInFlight = source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards(); - if (partitions.size() > 0 && (isSequentialInFlight || singlePartitionedStage)) { + if (partitions.size() > 0 && (isSequentialInFlight || singlePartitionedStage) && !GetMeta().MaxBatchSize) { auto [startShard, shardInfo] = PartitionPruner->MakeVirtualTablePartition(source, stageInfo); if (stats) { @@ -2970,7 +2970,7 @@ size_t TKqpTasksGraph::BuildAllTasks(std::optional llvmSettings, } } - if (stage.GetIsSinglePartition()) { + if (stage.GetIsSinglePartition() && !GetMeta().MaxBatchSize) { YQL_ENSURE(stageInfo.Tasks.size() <= 1, "Unexpected multiple tasks in single-partition stage"); } From f308dc9db134cc100a9516bb99e24083eb8b6927 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Fri, 14 Nov 2025 18:14:45 +0300 Subject: [PATCH 2/3] Revert "init" This reverts commit 89507cf4cccdb7644a8ecf09cf25d579ff9cac0a. --- ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 10aa74cfd81d..9385a2011000 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -2764,7 +2764,7 @@ TMaybe TKqpTasksGraph::BuildScanTasksFromSource(TStageInfo& stageInfo, b bool isSequentialInFlight = source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards(); - if (partitions.size() > 0 && (isSequentialInFlight || singlePartitionedStage) && !GetMeta().MaxBatchSize) { + if (partitions.size() > 0 && (isSequentialInFlight || singlePartitionedStage)) { auto [startShard, shardInfo] = PartitionPruner->MakeVirtualTablePartition(source, stageInfo); if (stats) { @@ -2970,7 +2970,7 @@ size_t TKqpTasksGraph::BuildAllTasks(std::optional llvmSettings, } } - if (stage.GetIsSinglePartition() && !GetMeta().MaxBatchSize) { + if (stage.GetIsSinglePartition()) { YQL_ENSURE(stageInfo.Tasks.size() <= 1, "Unexpected multiple tasks in single-partition stage"); } From 061cda0865cfd5428636af3365ab327b069ce929 Mon Sep 17 00:00:00 2001 From: Daniil Timizhev Date: Fri, 14 Nov 2025 20:07:16 +0300 Subject: [PATCH 3/3] init 2 --- ydb/core/kqp/runtime/kqp_read_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 94015cff1bbf..e02a8bcdaca8 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -463,7 +463,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq } bool StartShards() { - const ui32 maxAllowedInFlight = Settings->GetSorted() ? 1 : MaxInFlight; + const ui32 maxAllowedInFlight = Settings->GetSorted() || Settings->GetIsBatch() ? 1 : MaxInFlight; CA_LOG_D("effective maxinflight " << maxAllowedInFlight << " sorted " << Settings->GetSorted()); bool isFirst = true; while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) { @@ -1073,7 +1073,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq ui64 seqNo = record.GetSeqNo(); Reads[id].RegisterMessage(msg); - if (Settings->GetIsBatch() && msg.GetRowsCount() > 0) { + if (Settings->GetIsBatch() && msg.GetRowsCount() > 0 && BatchOperationMaxRow.GetCells().empty()) { auto cells = msg.GetCells(msg.GetRowsCount() - 1); BatchOperationMaxRow = TSerializedCellVec{cells}; }