Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,9 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,

const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TVector<TShardIdx> shardsToMerge;
TString mergeReason;
if ((!index || index->State == NKikimrSchemeOp::EIndexStateReady)
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex)
&& table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, mainTableForIndex, mergeReason)
) {
TTxId txId = Self->GetCachedTxId(ctx);

Expand All @@ -439,6 +440,10 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,

auto request = MergeRequest(Self, txId, Self->ShardInfos[shardIdx].PathId, shardsToMerge);

LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose merge request : " << request->Record.ShortDebugString()
<< ", reason: " << mergeReason);

TMemoryChanges memChanges;
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, MergeOpSideEffects, memChanges, dbChanges};
Expand All @@ -455,12 +460,13 @@ bool TTxStoreTableStats::PersistSingleStats(const TPathId& pathId,
}

bool collectKeySample = false;
if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) {
TString reason;
if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings, reason)) {
// We would like to split by size and do this no matter how many partitions there are
} else if (table->GetPartitions().size() >= table->GetMaxPartitionsCount()) {
// We cannot split as there are max partitions already
return true;
} else if (table->CheckSplitByLoad(Self->SplitSettings, shardIdx, dataSize, rowCount, mainTableForIndex)) {
} else if (table->CheckSplitByLoad(Self->SplitSettings, shardIdx, dataSize, rowCount, mainTableForIndex, reason)) {
collectKeySample = true;
} else {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,12 @@ bool TTxPartitionHistogram::Execute(TTransactionContext& txc, const TActorContex
const TTableInfo* mainTableForIndex = Self->GetMainTableForIndex(tableId);

ESplitReason splitReason = ESplitReason::NO_SPLIT;
if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings)) {
TString splitReasonMsg;
if (table->ShouldSplitBySize(dataSize, forceShardSplitSettings, splitReasonMsg)) {
splitReason = ESplitReason::SPLIT_BY_SIZE;
}

if (splitReason == ESplitReason::NO_SPLIT && table->CheckSplitByLoad(Self->SplitSettings, shardIdx, dataSize, rowCount, mainTableForIndex)) {
if (splitReason == ESplitReason::NO_SPLIT && table->CheckSplitByLoad(Self->SplitSettings, shardIdx, dataSize, rowCount, mainTableForIndex, splitReasonMsg)) {
splitReason = ESplitReason::SPLIT_BY_LOAD;
}

Expand Down Expand Up @@ -427,6 +428,10 @@ bool TTxPartitionHistogram::Execute(TTransactionContext& txc, const TActorContex

auto request = SplitRequest(Self, txId, tableId, datashardId, splitKey.GetBuffer());

LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Propose split request : " << request->Record.ShortDebugString()
<< ", reason: " << splitReasonMsg);

TMemoryChanges memChanges;
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, SplitOpSideEffects, memChanges, dbChanges};
Expand Down
36 changes: 29 additions & 7 deletions ydb/core/tx/schemeshard/schemeshard_info_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
const TForceShardSplitSettings& forceShardSplitSettings,
TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad,
const TTableInfo* mainTableForIndex) const
const TTableInfo* mainTableForIndex, TString& reason) const
{
if (ExpectedPartitionCount + 1 - shardsToMerge.size() <= GetMinPartitionsCount()) {
return false;
Expand Down Expand Up @@ -1904,14 +1904,20 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
bool canMerge = false;

// Check if we can try merging by size
if (IsMergeBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize <= GetSizeToMerge(forceShardSplitSettings)) {
const auto sizeToMerge = GetSizeToMerge(forceShardSplitSettings);
if (IsMergeBySizeEnabled(forceShardSplitSettings) && stats->DataSize + totalSize <= sizeToMerge) {
reason = TStringBuilder() << "merge by size ("
<< "dataSize: " << stats->DataSize << ", "
<< "totalSize: " << stats->DataSize + totalSize << ", "
<< "sizeToMerge: " << sizeToMerge << ")";
canMerge = true;
}

// Check if we can try merging by load
TInstant now = AppData()->TimeProvider->Now();
TDuration minUptime = TDuration::Seconds(splitSettings.MergeByLoadMinUptimeSec);
if (!canMerge && IsMergeByLoadEnabled(mainTableForIndex) && stats->StartTime && stats->StartTime + minUptime < now) {
reason = "merge by load";
canMerge = true;
}

Expand All @@ -1936,6 +1942,11 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,

if (shardLoad + totalLoad > cpuUsageThreshold *0.7)
return false;

reason = TStringBuilder() << "merge by load ("
<< "shardLoad: " << shardLoad << ", "
<< "totalLoad: " << shardLoad + totalLoad << ", "
<< "loadThreshold: " << cpuUsageThreshold * 0.7 << ")";
}

// Merged shards must not have borrowed parts from the same original tablet
Expand All @@ -1956,7 +1967,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
const TForceShardSplitSettings& forceShardSplitSettings,
TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
const TTableInfo* mainTableForIndex) const
const TTableInfo* mainTableForIndex, TString& reason) const
{
// Don't split/merge backup tables
if (IsBackup) {
Expand Down Expand Up @@ -1989,20 +2000,21 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
THashSet<TTabletId> partOwners;

// Make sure we can actually merge current shard first
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex)) {
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, shardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex, reason)) {
return false;
}

TString mergeReason;
for (i64 pi = partitionIdx - 1; pi >= 0; --pi) {
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex)) {
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex, mergeReason)) {
break;
}
}
// make shardsToMerge ordered by partition index
Reverse(shardsToMerge.begin(), shardsToMerge.end());

for (ui64 pi = partitionIdx + 1; pi < GetPartitions().size(); ++pi) {
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex)) {
if (!TryAddShardToMerge(splitSettings, forceShardSplitSettings, GetPartitions()[pi].ShardIdx, shardsToMerge, partOwners, totalSize, totalLoad, mainTableForIndex, mergeReason)) {
break;
}
}
Expand All @@ -2013,7 +2025,7 @@ bool TTableInfo::CheckCanMergePartitions(const TSplitSettings& splitSettings,
bool TTableInfo::CheckSplitByLoad(
const TSplitSettings& splitSettings, TShardIdx shardIdx,
ui64 dataSize, ui64 rowCount,
const TTableInfo* mainTableForIndex) const
const TTableInfo* mainTableForIndex, TString& reason) const
{
// Don't split/merge backup tables
if (IsBackup)
Expand Down Expand Up @@ -2067,6 +2079,16 @@ bool TTableInfo::CheckSplitByLoad(
return false;
}

reason = TStringBuilder() << "split by load ("
<< "rowCount: " << rowCount << ", "
<< "minRowCount: " << MIN_ROWS_FOR_SPLIT_BY_LOAD << ", "
<< "dataSize: " << dataSize << ", "
<< "minDataSize: " << MIN_SIZE_FOR_SPLIT_BY_LOAD << ", "
<< "shardCount: " << Stats.PartitionStats.size() << ", "
<< "maxShardCount: " << maxShards << ", "
<< "cpuUsage: " << stats.GetCurrentRawCpuUsage() << ", "
<< "cpuUsageThreshold: " << cpuUsageThreshold * 1000000 << ")";

return true;
}

Expand Down
24 changes: 19 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -685,17 +685,17 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
const TForceShardSplitSettings& forceShardSplitSettings,
TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
THashSet<TTabletId>& partOwners, ui64& totalSize, float& totalLoad,
const TTableInfo* mainTableForIndex) const;
const TTableInfo* mainTableForIndex, TString& reason) const;

bool CheckCanMergePartitions(const TSplitSettings& splitSettings,
const TForceShardSplitSettings& forceShardSplitSettings,
TShardIdx shardIdx, TVector<TShardIdx>& shardsToMerge,
const TTableInfo* mainTableForIndex) const;
const TTableInfo* mainTableForIndex, TString& reason) const;

bool CheckSplitByLoad(
const TSplitSettings& splitSettings, TShardIdx shardIdx,
ui64 dataSize, ui64 rowCount,
const TTableInfo* mainTableForIndex) const;
const TTableInfo* mainTableForIndex, TString& reason) const;

bool IsSplitBySizeEnabled(const TForceShardSplitSettings& params) const {
// Respect unspecified SizeToSplit when force shard splits are disabled
Expand Down Expand Up @@ -818,16 +818,30 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
return stats.DataSize >= params.ForceShardSplitDataSize;
}

bool ShouldSplitBySize(ui64 dataSize, const TForceShardSplitSettings& params) const {
bool ShouldSplitBySize(ui64 dataSize, const TForceShardSplitSettings& params, TString& reason) const {
if (!IsSplitBySizeEnabled(params)) {
return false;
}
// When shard is over the maximum size we split even when over max partitions
if (dataSize >= params.ForceShardSplitDataSize && !params.DisableForceShardSplit) {
reason = TStringBuilder() << "force split by size ("
<< "dataSize: " << dataSize << ", "
<< "maxDataSize: " << params.ForceShardSplitDataSize << ")";

return true;
}
// Otherwise we split when we may add one more partition
return Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit(params);
if (Partitions.size() < GetMaxPartitionsCount() && dataSize >= GetShardSizeToSplit(params)) {
reason = TStringBuilder() << "split by size ("
<< "partitionCount: " << Partitions.size() << ", "
<< "maxPartitionCount: " << GetMaxPartitionsCount() << ", "
<< "dataSize: " << dataSize << ", "
<< "maxDataSize: " << GetShardSizeToSplit(params) << ")";

return true;
}

return false;
}

bool NeedRecreateParts() const {
Expand Down
Loading