diff --git a/ydb/core/kqp/common/kqp_batch_operations.cpp b/ydb/core/kqp/common/kqp_batch_operations.cpp index f8d37b94a0d2..2b7e8e593da8 100644 --- a/ydb/core/kqp/common/kqp_batch_operations.cpp +++ b/ydb/core/kqp/common/kqp_batch_operations.cpp @@ -6,23 +6,20 @@ TSerializedTableRange MakePartitionRange(TMaybe b TVector tableBegin; TVector tableEnd; - bool inclusiveTableBegin = (begin) ? begin->IsInclusive : false; - bool inclusiveTableEnd = (end) ? end->IsInclusive : false; + bool inclusiveTableBegin = !begin || begin->IsInclusive; + bool inclusiveTableEnd = !end || end->IsInclusive; - if (!begin || !begin->EndKeyPrefix) { - inclusiveTableBegin = true; - tableBegin.resize(keySize, TCell()); - } else { + if (begin && begin->EndKeyPrefix) { const auto& cells = begin->EndKeyPrefix.GetCells(); tableBegin.assign(cells.begin(), cells.end()); + } else { + tableBegin.resize(keySize, TCell()); // -inf } - if (!end || !end->EndKeyPrefix) { - inclusiveTableEnd = true; - } else { + if (end && end->EndKeyPrefix) { const auto& cells = end->EndKeyPrefix.GetCells(); tableEnd.assign(cells.begin(), cells.end()); - } + } // else empty vector is +inf return TSerializedTableRange{tableBegin, inclusiveTableBegin, tableEnd, inclusiveTableEnd}; } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 98d8380850e1..188def7a7beb 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1555,7 +1555,7 @@ class TKqpExecuterBase : public TActor { bool isParallelPointRead = EnableParallelPointReadConsolidation && !isSequentialInFlight && !source.GetSorted() && IsParallelPointReadPossible(partitions); if (partitions.size() > 0 && (isSequentialInFlight || isParallelPointRead || singlePartitionedStage)) { - auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv()); + auto [startShard, shardInfo] = PartitionPruner.MakeVirtualTablePartition(source, stageInfo); YQL_ENSURE(Stats); diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 4b5f005b5ed6..78c86f6f389f 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -343,21 +343,6 @@ TVector FillReadRangesInternal(const TVector GetKeyRangesIntersectionPartitions(const TVector& ranges, - const TVector& keyColumnTypes, const TVector& partitions) -{ - if (ranges.empty()) { - return {}; - } - - TTableRange intersection = ranges.front(); - for (size_t i = 1; i < ranges.size(); ++i) { - intersection = Intersect(keyColumnTypes, intersection, ranges[i]); - } - - return GetKeyRangePartitions(intersection, partitions, keyColumnTypes); -} - } // anonymous namespace TVector FillReadRanges(const TVector& keyColumnTypes, @@ -465,17 +450,14 @@ THashMap PrunePartitions(const NKqpProto::TKqpPhyOpReadRange& isFullScan = IsFullRange(keyColumnTypes, range); TTableRange tableRange = range.ToTableRange(); - TVector readPartitions; if (prunerConfig.BatchOperationRange) { isFullScan = false; - readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()}, - keyColumnTypes,stageInfo.Meta.ShardKey->GetPartitions()); - } else { - readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), - keyColumnTypes); + tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange()); } + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes); + THashMap shardInfoMap; for (TPartitionWithRange& partitionWithRange : readPartitions) { auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId]; @@ -511,17 +493,14 @@ THashMap PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges& TTableRange tableRange = std::holds_alternative(range) ? TTableRange(std::get(range).GetCells(), true, std::get(range).GetCells(), true, true) : TTableRange(std::get(range).ToTableRange()); - TVector readPartitions; if (prunerConfig.BatchOperationRange) { isFullScan = false; - readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()}, - keyColumnTypes, stageInfo.Meta.ShardKey->GetPartitions()); - } else { - readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), - keyColumnTypes); + tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange()); } + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes); + for (TPartitionWithRange& partitionWithRange : readPartitions) { auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId]; @@ -570,7 +549,7 @@ TVector ExtractRanges(const NKqpProto::TKqpReadRangesSo } std::pair MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, - const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, const TPartitionPruner::TConfig& prunerConfig) { auto guard = typeEnv.BindAllocator(); const auto& tableInfo = stageInfo.Meta.TableConstInfo; @@ -582,9 +561,14 @@ std::pair MakeVirtualTablePartition(const NKqpProto::TKqpReadR if (!ranges.empty()) { auto& range = source.GetReverse() ? ranges.back() : ranges[0]; TTableRange tableRange = std::holds_alternative(range) - ? TTableRange(std::get(range).GetCells(), true, std::get(range).GetCells(), true, true) + ? TTableRange(std::get(range).GetCells(), true, + std::get(range).GetCells(), true, true) : TTableRange(std::get(range).ToTableRange()); + if (prunerConfig.BatchOperationRange) { + tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange()); + } + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes); @@ -600,6 +584,16 @@ std::pair MakeVirtualTablePartition(const NKqpProto::TKqpReadR result.KeyReadRanges.ConstructInPlace(); } + if (prunerConfig.BatchOperationRange) { + TTableRange tableRange = std::holds_alternative(range) + ? TTableRange(std::get(range).GetCells(), true, + std::get(range).GetCells(), true, true) + : TTableRange(std::get(range).ToTableRange()); + + range = TSerializedTableRange(Intersect(keyColumnTypes, + tableRange, prunerConfig.BatchOperationRange->ToTableRange())); + } + result.KeyReadRanges->Add(std::move(range)); } return {shard, result}; @@ -623,17 +617,14 @@ THashMap PrunePartitions(const NKqpProto::TKqpReadRangesSource TTableRange tableRange = std::holds_alternative(range) ? TTableRange(std::get(range).GetCells(), true, std::get(range).GetCells(), true, true) : TTableRange(std::get(range).ToTableRange()); - TVector readPartitions; if (prunerConfig.BatchOperationRange) { isFullScan = false; - readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()}, - keyColumnTypes, stageInfo.Meta.ShardKey->GetPartitions()); - } else { - readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), - keyColumnTypes); + tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange()); } + auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes); + for (TPartitionWithRange& partitionWithRange : readPartitions) { auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId]; @@ -903,4 +894,8 @@ THashMap TPartitionPruner::PruneEffect(const NKqpProto::TKqpPh return PruneEffectPartitions(operation, stageInfo, *HolderFactory, *TypeEnv, Config); } +std::pair TPartitionPruner::MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo) { + return ::NKikimr::NKqp::MakeVirtualTablePartition(source, stageInfo, *HolderFactory, *TypeEnv, Config); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index 148007438338..1547261b7040 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -51,6 +51,8 @@ class TPartitionPruner { THashMap PruneEffect(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo); + std::pair MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo); + private: const NMiniKQL::THolderFactory* HolderFactory; const NMiniKQL::TTypeEnvironment* TypeEnv; diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp index 52eccf9edc56..d640f3d62b6e 100644 --- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp @@ -142,17 +142,24 @@ class TKqpPartitionedExecuter : public TActorBootstrappedSender); if (it != ExecuterToPartition.end()) { PE_LOG_D("Got TEvKqp::EvAbortExecution from ActorId = " << ev->Sender - << " , status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", message: " << issues.ToOneLineString() << ", abort child executers"); - - ReturnIssues.AddIssues(issues); - ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() - << "while preparing/executing by KqpPartitionedExecuterActor")); + << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + << ", message: " << issues.ToOneLineString() << ", abort child executers"); auto [_, partInfo] = *it; AbortBuffer(partInfo->ExecuterId); ForgetExecuterAndBuffer(partInfo); ForgetPartition(partInfo); + } else { + PE_LOG_D("Got TEvKqp::TEvAbortExecution from unknown actor with Id = " << ev->Sender + << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + << ", message: " << issues.ToOneLineString() << ", ignore"); + } + + if (ReturnStatus == Ydb::StatusIds::SUCCESS) { + ReturnStatus = Ydb::StatusIds::ABORTED; + ReturnIssues.AddIssues(issues); + ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() + << "aborting by KqpPartitionedExecuterActor")); } Abort(); @@ -557,6 +564,15 @@ class TKqpPartitionedExecuter : public TActorBootstrappedBatchOperationMaxKeys), std::move(ev->BatchOperationKeyIds)); if (minKey) { + if (!IsKeyInPartition(minKey.GetCells(), partInfo)) { + ReturnStatus = Ydb::StatusIds::PRECONDITION_FAILED; + ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() + << "The next key from KqpReadActor does not belong to the partition with PartitionIndex = " + << partInfo->PartitionIndex)); + ForgetPartition(partInfo); + return Abort(); + } + partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(minKey, /* IsInclusive */ false, /* IsPoint */ false); return RetryPartExecution(partInfo); } @@ -578,6 +594,15 @@ class TKqpPartitionedExecuter : public TActorBootstrapped& key, const TBatchPartitionInfo::TPtr& partInfo) { + bool isGEThanBegin = !partInfo->BeginRange || CompareBorders(key, + partInfo->BeginRange->EndKeyPrefix.GetCells(), true, true, KeyColumnTypes) >= 0; + bool isLEThanEnd = !partInfo->EndRange || CompareBorders(key, + partInfo->EndRange->EndKeyPrefix.GetCells(), true, true, KeyColumnTypes) <= 0; + + return isGEThanBegin && isLEThanEnd; + } + void RetryPartExecution(const TBatchPartitionInfo::TPtr& partInfo) { PE_LOG_D("Retry query execution for PartitionIndex = " << partInfo->PartitionIndex << ", RetryDelayMs = " << partInfo->RetryDelayMs);