Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve CS-resharding #4775

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/rpc_list_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
return "[ListIndexBuilds]";
case TOperationId::SCRIPT_EXECUTION:
return "[ListScriptExecutions]";
case TOperationId::SS_BG_TASKS:
return "[SchemeShardTasks]";
default:
return "[Untagged]";
}
Expand Down Expand Up @@ -165,6 +167,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
case TOperationId::EXPORT:
case TOperationId::IMPORT:
case TOperationId::BUILD_INDEX:
case TOperationId::SS_BG_TASKS:
break;
case TOperationId::SCRIPT_EXECUTION:
SendListScriptExecutions();
Expand All @@ -181,6 +184,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
switch (ev->GetTypeRewrite()) {
hFunc(TEvExport::TEvListExportsResponse, Handle);
hFunc(TEvImport::TEvListImportsResponse, Handle);
hFunc(NSchemeShard::NBackground::TEvListResponse, Handle);
hFunc(TEvIndexBuilder::TEvListResponse, Handle);
hFunc(NKqp::TEvListScriptExecutionOperationsResponse, Handle);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm
if (*modification == "SPLIT") {
Increase = true;
} else if (*modification == "MERGE") {
return TConclusionStatus::Fail("modification is impossible yet");
Increase = false;
} else {
return TConclusionStatus::Fail("undefined modification: \"" + *modification + "\"");
}
Expand All @@ -20,9 +20,10 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm

void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const {
AFL_VERIFY(!isStandalone);
AFL_VERIFY(!!Increase);
scheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
scheme.MutableAlterColumnTable()->SetName(GetStoreName());
*scheme.MutableAlterColumnTable()->MutableReshardColumnTable() = NKikimrSchemeOp::TReshardColumnTable();
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
}

}
149 changes: 149 additions & 0 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "helpers/typed_local.h"
#include "helpers/local.h"
#include "helpers/writer.h"
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
Expand All @@ -7,6 +9,8 @@
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -261,6 +265,151 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
tester.Execute(0, {1, 2}, false, NOlap::TSnapshot(TInstant::Now().MilliSeconds(), 1232123), {0});
tester.WaitNormalization();
}

class TReshardingTest {
private:
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

void WaitResharding() {
const TInstant start = TInstant::Now();
bool clean = false;
while (TInstant::Now() - start < TDuration::Seconds(200)) {
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
if (result.GetList().size() == 0) {
Cerr << "RESHARDING_FINISHED" << Endl;
clean = true;
break;
}
UNIT_ASSERT_VALUES_EQUAL(result.GetList().size(), 1);
Sleep(TDuration::Seconds(1));
Cerr << "WAIT_FINISHED..." << Endl;
}
AFL_VERIFY(clean);
}

void CheckCount(const ui32 expectation) {
auto it = Kikimr.GetTableClient().StreamExecuteScanQuery(R"(
--!syntax_v1

SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)").GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cerr << result << Endl;
CompareYson(result, "[[" + ::ToString(expectation) + "u;]]");
}

TKikimrRunner Kikimr;
public:

TReshardingTest()
: Kikimr(TKikimrSettings().SetWithSampleTables(false))
{

}

void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 16, 4);
auto tableClient = Kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

std::vector<TString> uids;
std::vector<TString> resourceIds;
std::vector<ui32> levels;

{
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
for (ui32 i = 0; i < count; ++i) {
uids.emplace_back("uid_" + ::ToString(startUid + i));
resourceIds.emplace_back(::ToString(startRes + i));
levels.emplace_back(i % 5);
}
};

filler(1000000, 300000000, 10000);
filler(1100000, 300100000, 10000);
filler(1200000, 300200000, 10000);
filler(1300000, 300300000, 10000);
filler(1400000, 300400000, 10000);
filler(2000000, 200000000, 70000);
filler(3000000, 100000000, 110000);

}

CheckCount(230000);
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

WaitResharding();
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
CheckCount(230000);

i64 count = csController->GetShardingFiltersCount().Val();
AFL_VERIFY(count == 16)("count", count);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
csController->WaitIndexation(TDuration::Seconds(5));
csController->WaitCompactions(TDuration::Seconds(5));

csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);

CheckCount(230000);

AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
const ui32 portionsCount = 8;
for (ui32 i = 0; i < 3; ++i) {
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
WaitResharding();
csController->WaitCleaning(TDuration::Seconds(5));

CheckCount(230000);
AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
count += portionsCount;
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
}
};

Y_UNIT_TEST(TableReshardingConsistency64) {
TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

Y_UNIT_TEST(TableReshardingModuloN) {
TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
}

}

}
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ PEERDIR(
ydb/core/tx/columnshard
ydb/core/kqp/ut/olap/helpers
ydb/core/tx/datashard/ut_common
ydb/public/sdk/cpp/client/ydb_operation
)

YQL_LAST_ABI_VERSION()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,7 @@ message TShardingModification {
message TShardingTransfer {
optional uint64 DestinationTabletId = 1;
repeated uint64 SourceTabletIds = 2;
optional bool Moving = 3 [default = false];
}

message TShardingTransfers {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct TEvColumnShard {
ui64 txId, TString txBody, const ui32 flags = 0)
: TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags)
{
Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
// Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
Record.SetSchemeShardId(ssId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(THashMap<ui64, NEve
auto it = PathIds.find(i.first);
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.ResetShardingVersion();
portion.SetPathId(it->second);
index.UpsertPortion(std::move(portion));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
++p;
} else {
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
i.second.MutablePortions()[p].ResetShardingVersion();
i.second.MutablePortions().pop_back();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ class TBlobSharing {
if (i != selfTabletId) {
TStorageTabletTask task(StorageId, i);
task.AddRemapOwner(BlobId, selfTabletId, toTabletId);
AFL_VERIFY(result.emplace(i, std::move(task)).second);
auto info = result.emplace(i, task);
if (!info.second) {
info.first->second.Merge(task);
}
}

{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class IColumnEngine {
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;

virtual bool HasDataInPathId(const ui64 pathId) const = 0;
virtual bool ErasePathId(const ui64 pathId) = 0;
virtual bool Load(IDbWrapper& db) = 0;
void RegisterTable(const ui64 pathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RegisterTable")("path_id", pathId);
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl

ui64 txSize = 0;
const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
THashSet<ui64> pathsToRemove;
for (ui64 pathId : pathsToDrop) {
if (!HasDataInPathId(pathId)) {
changes->TablesToDrop.emplace(pathId);
Expand All @@ -308,9 +307,6 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl
break;
}
}
for (auto&& i : pathsToRemove) {
pathsToDrop.erase(i);
}
if (changes->TablesToDrop.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -452,10 +448,6 @@ bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr
return true;
}

void TColumnEngineForLogs::EraseTable(const ui64 pathId) {
GranulesStorage->EraseTable(pathId);
}

void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo) {
if (exInfo) {
UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ class TColumnEngineForLogs : public IColumnEngine {
return !!GranulesStorage->GetPortionOptional(pathId, portionId);
}

virtual bool ErasePathId(const ui64 pathId) override {
if (HasDataInPathId(pathId)) {
return false;
}
return GranulesStorage->EraseTable(pathId);
}


virtual bool HasDataInPathId(const ui64 pathId) const override {
auto g = GetGranuleOptional(pathId);
return g && g->GetPortions().size();
Expand Down Expand Up @@ -187,8 +195,6 @@ class TColumnEngineForLogs : public IColumnEngine {
bool LoadShardingInfo(IDbWrapper& db);
bool LoadCounters(IDbWrapper& db);

void EraseTable(const ui64 pathId);

void UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo = nullptr);
bool ErasePortion(const TPortionInfo& portionInfo, bool updateStats = true);
void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT, const TPortionInfo* exPortionInfo = nullptr);
Expand Down
Loading
Loading