Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions ydb/core/kqp/common/kqp_batch_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,20 @@ TSerializedTableRange MakePartitionRange(TMaybe<TKeyDesc::TPartitionRangeInfo> b
TVector<TCell> tableBegin;
TVector<TCell> tableEnd;

bool inclusiveTableBegin = (begin) ? begin->IsInclusive : false;
bool inclusiveTableEnd = (end) ? end->IsInclusive : false;
bool inclusiveTableBegin = !begin || begin->IsInclusive;
bool inclusiveTableEnd = !end || end->IsInclusive;

if (!begin || !begin->EndKeyPrefix) {
inclusiveTableBegin = true;
tableBegin.resize(keySize, TCell());
} else {
if (begin && begin->EndKeyPrefix) {
const auto& cells = begin->EndKeyPrefix.GetCells();
tableBegin.assign(cells.begin(), cells.end());
} else {
tableBegin.resize(keySize, TCell()); // -inf
}

if (!end || !end->EndKeyPrefix) {
inclusiveTableEnd = true;
} else {
if (end && end->EndKeyPrefix) {
const auto& cells = end->EndKeyPrefix.GetCells();
tableEnd.assign(cells.begin(), cells.end());
}
} // else empty vector is +inf

return TSerializedTableRange{tableBegin, inclusiveTableBegin, tableEnd, inclusiveTableEnd};
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
bool isParallelPointRead = EnableParallelPointReadConsolidation && !isSequentialInFlight && !source.GetSorted() && IsParallelPointReadPossible(partitions);

if (partitions.size() > 0 && (isSequentialInFlight || isParallelPointRead || singlePartitionedStage)) {
auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());
auto [startShard, shardInfo] = PartitionPruner.MakeVirtualTablePartition(source, stageInfo);

YQL_ENSURE(Stats);

Expand Down
65 changes: 30 additions & 35 deletions ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,6 @@ TVector<TSerializedPointOrRange> FillReadRangesInternal(const TVector<NScheme::T
return BuildFullRange(keyColumnTypes);
}

TVector<TPartitionWithRange> GetKeyRangesIntersectionPartitions(const TVector<TTableRange>& ranges,
const TVector<NScheme::TTypeInfo>& keyColumnTypes, const TVector<TKeyDesc::TPartitionInfo>& partitions)
{
if (ranges.empty()) {
return {};
}

TTableRange intersection = ranges.front();
for (size_t i = 1; i < ranges.size(); ++i) {
intersection = Intersect(keyColumnTypes, intersection, ranges[i]);
}

return GetKeyRangePartitions(intersection, partitions, keyColumnTypes);
}

} // anonymous namespace

TVector<TSerializedPointOrRange> FillReadRanges(const TVector<NScheme::TTypeInfo>& keyColumnTypes,
Expand Down Expand Up @@ -465,17 +450,14 @@ THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRange&
isFullScan = IsFullRange(keyColumnTypes, range);

TTableRange tableRange = range.ToTableRange();
TVector<TPartitionWithRange> readPartitions;

if (prunerConfig.BatchOperationRange) {
isFullScan = false;
readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()},
keyColumnTypes,stageInfo.Meta.ShardKey->GetPartitions());
} else {
readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
keyColumnTypes);
tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange());
}

auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes);

THashMap<ui64, TShardInfo> shardInfoMap;
for (TPartitionWithRange& partitionWithRange : readPartitions) {
auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId];
Expand Down Expand Up @@ -511,17 +493,14 @@ THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges&
TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
: TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());
TVector<TPartitionWithRange> readPartitions;

if (prunerConfig.BatchOperationRange) {
isFullScan = false;
readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()},
keyColumnTypes, stageInfo.Meta.ShardKey->GetPartitions());
} else {
readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
keyColumnTypes);
tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange());
}

auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes);

for (TPartitionWithRange& partitionWithRange : readPartitions) {
auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId];

Expand Down Expand Up @@ -570,7 +549,7 @@ TVector<TSerializedPointOrRange> ExtractRanges(const NKqpProto::TKqpReadRangesSo
}

std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, const TPartitionPruner::TConfig& prunerConfig)
{
auto guard = typeEnv.BindAllocator();
const auto& tableInfo = stageInfo.Meta.TableConstInfo;
Expand All @@ -582,9 +561,14 @@ std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadR
if (!ranges.empty()) {
auto& range = source.GetReverse() ? ranges.back() : ranges[0];
TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true,
std::get<TSerializedCellVec>(range).GetCells(), true, true)
: TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());

if (prunerConfig.BatchOperationRange) {
tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange());
}

auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
keyColumnTypes);

Expand All @@ -600,6 +584,16 @@ std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadR
result.KeyReadRanges.ConstructInPlace();
}

if (prunerConfig.BatchOperationRange) {
TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true,
std::get<TSerializedCellVec>(range).GetCells(), true, true)
: TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());

range = TSerializedTableRange(Intersect(keyColumnTypes,
tableRange, prunerConfig.BatchOperationRange->ToTableRange()));
}

result.KeyReadRanges->Add(std::move(range));
}
return {shard, result};
Expand All @@ -623,17 +617,14 @@ THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpReadRangesSource
TTableRange tableRange = std::holds_alternative<TSerializedCellVec>(range)
? TTableRange(std::get<TSerializedCellVec>(range).GetCells(), true, std::get<TSerializedCellVec>(range).GetCells(), true, true)
: TTableRange(std::get<TSerializedTableRange>(range).ToTableRange());
TVector<TPartitionWithRange> readPartitions;

if (prunerConfig.BatchOperationRange) {
isFullScan = false;
readPartitions = GetKeyRangesIntersectionPartitions({tableRange, prunerConfig.BatchOperationRange->ToTableRange()},
keyColumnTypes, stageInfo.Meta.ShardKey->GetPartitions());
} else {
readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(),
keyColumnTypes);
tableRange = Intersect(keyColumnTypes, tableRange, prunerConfig.BatchOperationRange->ToTableRange());
}

auto readPartitions = GetKeyRangePartitions(tableRange, stageInfo.Meta.ShardKey->GetPartitions(), keyColumnTypes);

for (TPartitionWithRange& partitionWithRange : readPartitions) {
auto& shardInfo = shardInfoMap[partitionWithRange.PartitionInfo->ShardId];

Expand Down Expand Up @@ -903,4 +894,8 @@ THashMap<ui64, TShardInfo> TPartitionPruner::PruneEffect(const NKqpProto::TKqpPh
return PruneEffectPartitions(operation, stageInfo, *HolderFactory, *TypeEnv, Config);
}

std::pair<ui64, TShardInfo> TPartitionPruner::MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo) {
return ::NKikimr::NKqp::MakeVirtualTablePartition(source, stageInfo, *HolderFactory, *TypeEnv, Config);
}

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_partition_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class TPartitionPruner {

THashMap<ui64, TShardInfo> PruneEffect(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo);

std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo);

private:
const NMiniKQL::THolderFactory* HolderFactory;
const NMiniKQL::TTypeEnvironment* TypeEnv;
Expand Down
37 changes: 31 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,24 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
auto it = ExecuterToPartition.find(ev->Sender);
if (it != ExecuterToPartition.end()) {
PE_LOG_D("Got TEvKqp::EvAbortExecution from ActorId = " << ev->Sender
<< " , status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
<< ", message: " << issues.ToOneLineString() << ", abort child executers");

ReturnIssues.AddIssues(issues);
ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder()
<< "while preparing/executing by KqpPartitionedExecuterActor"));
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
<< ", message: " << issues.ToOneLineString() << ", abort child executers");

auto [_, partInfo] = *it;
AbortBuffer(partInfo->ExecuterId);
ForgetExecuterAndBuffer(partInfo);
ForgetPartition(partInfo);
} else {
PE_LOG_D("Got TEvKqp::TEvAbortExecution from unknown actor with Id = " << ev->Sender
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode())
<< ", message: " << issues.ToOneLineString() << ", ignore");
}

if (ReturnStatus == Ydb::StatusIds::SUCCESS) {
ReturnStatus = Ydb::StatusIds::ABORTED;
ReturnIssues.AddIssues(issues);
ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder()
<< "aborting by KqpPartitionedExecuterActor"));
}

Abort();
Expand Down Expand Up @@ -557,6 +564,15 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
void OnSuccessResponse(TBatchPartitionInfo::TPtr& partInfo, TEvKqpExecuter::TEvTxResponse* ev) {
TSerializedCellVec minKey = GetMinCellVecKey(std::move(ev->BatchOperationMaxKeys), std::move(ev->BatchOperationKeyIds));
if (minKey) {
if (!IsKeyInPartition(minKey.GetCells(), partInfo)) {
ReturnStatus = Ydb::StatusIds::PRECONDITION_FAILED;
ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder()
<< "The next key from KqpReadActor does not belong to the partition with PartitionIndex = "
<< partInfo->PartitionIndex));
ForgetPartition(partInfo);
return Abort();
}

partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(minKey, /* IsInclusive */ false, /* IsPoint */ false);
return RetryPartExecution(partInfo);
}
Expand All @@ -578,6 +594,15 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
}
}

bool IsKeyInPartition(const TConstArrayRef<TCell>& key, const TBatchPartitionInfo::TPtr& partInfo) {
bool isGEThanBegin = !partInfo->BeginRange || CompareBorders<true, true>(key,
partInfo->BeginRange->EndKeyPrefix.GetCells(), true, true, KeyColumnTypes) >= 0;
bool isLEThanEnd = !partInfo->EndRange || CompareBorders<true, true>(key,
partInfo->EndRange->EndKeyPrefix.GetCells(), true, true, KeyColumnTypes) <= 0;

return isGEThanBegin && isLEThanEnd;
}

void RetryPartExecution(const TBatchPartitionInfo::TPtr& partInfo) {
PE_LOG_D("Retry query execution for PartitionIndex = " << partInfo->PartitionIndex
<< ", RetryDelayMs = " << partInfo->RetryDelayMs);
Expand Down
Loading