diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.cpp b/ydb/core/blobstorage/bridge/syncer/syncer.cpp index cbb5da5b749b..8e87b287c04f 100644 --- a/ydb/core/blobstorage/bridge/syncer/syncer.cpp +++ b/ydb/core/blobstorage/bridge/syncer/syncer.cpp @@ -6,11 +6,14 @@ namespace NKikimr::NBridge { TSyncerActor::TSyncerActor(TIntrusivePtr info, TGroupId sourceGroupId, TGroupId targetGroupId, - std::shared_ptr syncerDataStats) + std::shared_ptr syncerDataStats, TReplQuoter::TPtr syncRateQuoter, + TBlobStorageGroupType sourceGroupType) : Info(std::move(info)) , SourceGroupId(sourceGroupId) , TargetGroupId(targetGroupId) , SyncerDataStats(std::move(syncerDataStats)) + , SyncRateQuoter(std::move(syncRateQuoter)) + , SourceGroupType(sourceGroupType) { Y_ABORT_UNLESS(Info); Y_ABORT_UNLESS(Info->IsBridged()); @@ -368,7 +371,8 @@ namespace NKikimr::NBridge { return false; } - void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr ev, TQueryPayload payload) { + void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr ev, TQueryPayload payload, + ui64 quoterBytes) { switch (ev->Type()) { #define MSG(TYPE) \ case TEvBlobStorage::TYPE: { \ @@ -399,11 +403,21 @@ namespace NKikimr::NBridge { std::unique_ptr handle(CreateEventForBSProxy(SelfId(), toTargetGroup ? TargetGroupId : SourceGroupId, ev.release(), cookie)); + const TMonotonic now = TActivationContext::Monotonic(); + const TDuration timeout = SyncRateQuoter && quoterBytes + ? SyncRateQuoter->Take(now, quoterBytes) + : TDuration::Zero(); + const TMonotonic timestamp = now + timeout; + if (QueriesInFlight < MaxQueriesInFlight) { - TActivationContext::Send(handle.release()); + if (now < timestamp) { + TActivationContext::Schedule(timestamp, handle.release()); + } else { + TActivationContext::Send(handle.release()); + } ++QueriesInFlight; } else { - PendingQueries.push_back(std::move(handle)); + PendingQueries.emplace_back(std::move(handle), timestamp); } } @@ -479,8 +493,9 @@ namespace NKikimr::NBridge { ++SyncerDataStats->BlobsDone; } else if (r.Status == NKikimrProto::NODATA) { // we have to query this blob and do full rewrite -- there was no data for it + const ui64 quoterBytes = r.Id.BlobSize() * SourceGroupType.TotalPartCount() / SourceGroupType.DataParts(); IssueQuery(false, std::make_unique(r.Id, 0, 0, TInstant::Max(), - NKikimrBlobStorage::FastRead)); + NKikimrBlobStorage::FastRead), {}, quoterBytes); } else if (r.Status == NKikimrProto::ERROR) { SyncerDataStats->BytesError += r.Id.BlobSize(); ++SyncerDataStats->BlobsError; @@ -512,7 +527,13 @@ namespace NKikimr::NBridge { --QueriesInFlight; } else { Y_ABORT_UNLESS(QueriesInFlight == MaxQueriesInFlight); - TActivationContext::Send(PendingQueries.front().release()); + TMonotonic now = TActivationContext::Monotonic(); + auto& [handle, timestamp] = PendingQueries.front(); + if (now < timestamp) { + TActivationContext::Schedule(timestamp, handle.release()); + } else { + TActivationContext::Send(handle.release()); + } PendingQueries.pop_front(); } @@ -601,8 +622,10 @@ namespace NKikimr::NBridge { ) IActor *CreateSyncerActor(TIntrusivePtr info, TGroupId sourceGroupId, TGroupId targetGroupId, - std::shared_ptr syncerDataStats) { - return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats)); + std::shared_ptr syncerDataStats, TReplQuoter::TPtr syncRateQuoter, + TBlobStorageGroupType sourceGroupType) { + return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats), + std::move(syncRateQuoter), sourceGroupType); } } // NKikimr::NBridge diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.h b/ydb/core/blobstorage/bridge/syncer/syncer.h index e5fbd25892aa..9bcc9988f92e 100644 --- a/ydb/core/blobstorage/bridge/syncer/syncer.h +++ b/ydb/core/blobstorage/bridge/syncer/syncer.h @@ -3,6 +3,7 @@ #include "defs.h" #include +#include namespace NKikimr::NBridge { @@ -16,6 +17,7 @@ namespace NKikimr::NBridge { }; IActor *CreateSyncerActor(TIntrusivePtr info, TGroupId sourceGroupId, TGroupId targetGroupId, - std::shared_ptr syncerDataStats); + std::shared_ptr syncerDataStats, TReplQuoter::TPtr syncRateQuoter, + TBlobStorageGroupType sourceGroupType); } // NKikimr::NBridge diff --git a/ydb/core/blobstorage/bridge/syncer/syncer_impl.h b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h index bd179ff3586f..4533f8b561a6 100644 --- a/ydb/core/blobstorage/bridge/syncer/syncer_impl.h +++ b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h @@ -24,10 +24,13 @@ namespace NKikimr::NBridge { bool Finished = false; ui32 Step = 0; std::deque RestoreQueue; + TReplQuoter::TPtr SyncRateQuoter; + const TBlobStorageGroupType SourceGroupType; public: TSyncerActor(TIntrusivePtr info, TGroupId sourceGroupId, TGroupId targetGroupId, - std::shared_ptr syncerDataStats); + std::shared_ptr syncerDataStats, TReplQuoter::TPtr syncRateQuoter, + TBlobStorageGroupType sourceGroupType); void Bootstrap(); void BeginNextStep(); @@ -50,7 +53,7 @@ namespace NKikimr::NBridge { const ui32 MaxQueriesInFlight = 16; ui32 QueriesInFlight = 0; THashMap Payloads; - std::deque> PendingQueries; + std::deque, TMonotonic>> PendingQueries; ui64 NextCookie = 1; bool Errors = false; @@ -63,7 +66,8 @@ namespace NKikimr::NBridge { bool DoMergeEntities(std::deque& source, std::deque& target, bool sourceFinished, bool targetFinished, TCallback&& merge, std::optional& lastMerged); - void IssueQuery(bool toTargetGroup, std::unique_ptr ev, TQueryPayload queryPayload = {}); + void IssueQuery(bool toTargetGroup, std::unique_ptr ev, TQueryPayload queryPayload = {}, + ui64 quoterBytes = 0); void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev); void Handle(TEvBlobStorage::TEvPutResult::TPtr ev); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp index ed49b5d60945..94355010fad2 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp @@ -187,7 +187,7 @@ namespace NKikimr { } TString details = TStringBuilder() << " GroupId# " << GroupId - << "UnconfiguredStateTs# " << UnconfiguredStateTs + << " UnconfiguredStateTs# " << UnconfiguredStateTs << " UnconfiguredStateReason# " << UnconfiguredStateReasonStr(UnconfiguredStateReason); LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY, @@ -217,7 +217,7 @@ namespace NKikimr { return; } TString details = TStringBuilder() << " GroupId# " << GroupId - << "EstablishingSessionsStateTs# " << EstablishingSessionsStateTs + << " EstablishingSessionsStateTs# " << EstablishingSessionsStateTs << " NumUnconnectedDisks# " << NumUnconnectedDisks; LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY, "StateEstablishingSessions Wakeup TIMEOUT Marker# DSP12 " << details); diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp index 94fd9b0c888f..f46aa341b516 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_storage_config.cpp @@ -476,6 +476,7 @@ namespace NKikimr::NStorage { case EControllerOp::OTHER: record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig); + record.MutableStorageConfig()->PackFrom(ProposedStorageConfig); break; case EControllerOp::UNSET: diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp index 0580279a4737..2df313e8c20f 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp @@ -114,6 +114,16 @@ namespace NKikimr::NStorage { const auto [it, _] = Groups.try_emplace(groupId); TGroupRecord& group = it->second; group.MaxKnownGeneration = Max(group.MaxKnownGeneration, generation); + if (newGroup) { + if (newGroup->HasErasureSpecies()) { + const auto erasure = static_cast(newGroup->GetErasureSpecies()); + Y_DEBUG_ABORT_UNLESS(!group.GType || group.GType->GetErasure() == erasure); + group.GType.emplace(erasure); + } else { + Y_ABORT_UNLESS(newGroup->RingsSize() == 0); // ensure no VDisks in group + group.GType.emplace(TBlobStorageGroupType::ErasureNone); + } + } // forget pending queries if (fromController) { @@ -380,8 +390,16 @@ namespace NKikimr::NStorage { if (startNew) { syncer.BridgeProxyGroupGeneration = syncer.PendingBridgeProxyGroupGeneration; syncer.SyncerDataStats = std::make_unique(); + + TBlobStorageGroupType sourceGroupType(TBlobStorageGroupType::ErasureNone); + if (const auto it = Groups.find(syncer.SourceGroupId.GetRawId()); it != Groups.end() && it->second.GType) { + sourceGroupType = *it->second.GType; + } else { + Y_DEBUG_ABORT("can't obtain source group type"); + } + syncer.ActorId = Register(NBridge::CreateSyncerActor(group.Info, syncer.SourceGroupId, syncer.TargetGroupId, - syncer.SyncerDataStats)); + syncer.SyncerDataStats, SyncRateQuoter, sourceGroupType)); syncer.Finished = false; syncer.ErrorReason.reset(); ++syncer.NumStart; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index bce2318ce56d..0686a591b0e3 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -456,6 +456,9 @@ void TNodeWarden::Bootstrap() { actorSystem->RegisterLocalService(MakeBlobStorageSyncBrokerID(), Register( CreateSyncBrokerActor(MaxInProgressSyncCount))); + // create bridge syncer rate quoter + SyncRateQuoter = std::make_shared(Cfg->BlobStorageConfig.GetBridgeSyncRateBytesPerSecond()); + // determine if we are running in 'mock' mode EnableProxyMock = Cfg->BlobStorageConfig.GetServiceSet().GetEnableProxyMock(); @@ -1403,6 +1406,10 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app const auto& bsFrom = appConfig.GetBlobStorageConfig(); auto *bsTo = config->MutableBlobStorageConfig(); + const auto hasStaticGroupInfo = [](const NKikimrBlobStorage::TNodeWardenServiceSet& ss) { + return ss.PDisksSize() && ss.VDisksSize() && ss.GroupsSize(); + }; + if (bsFrom.HasServiceSet()) { const auto& ssFrom = bsFrom.GetServiceSet(); auto *ssTo = bsTo->MutableServiceSet(); @@ -1419,10 +1426,6 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app ssTo->ClearReplBrokerConfig(); } - const auto hasStaticGroupInfo = [](const NKikimrBlobStorage::TNodeWardenServiceSet& ss) { - return ss.PDisksSize() && ss.VDisksSize() && ss.GroupsSize(); - }; - // update static group information unless distconf is enabled if (!hasStaticGroupInfo(ssFrom) && config->GetSelfManagementConfig().GetEnabled()) { // distconf enabled, keep it as is @@ -1556,6 +1559,25 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app bsTo->ClearBscSettings(); } + // copy PDiskConfig from DefineHostConfig/DefineBox if this section is managed automatically + if (!hasStaticGroupInfo(bsFrom.GetServiceSet()) && config->GetSelfManagementConfig().GetEnabled()) { + THashMap, NKikimrBlobStorage::TPDiskConfig> pdiskConfigs; + auto callback = [&](const auto& node, const auto& drive) { + if (drive.HasPDiskConfig()) { + pdiskConfigs.emplace(std::make_tuple(node.GetNodeId(), drive.GetPath()), drive.GetPDiskConfig()); + } + }; + EnumerateConfigDrives(*config, 0, callback, nullptr, true); + for (auto& pdisk : *bsTo->MutableServiceSet()->MutablePDisks()) { + const auto key = std::make_tuple(pdisk.GetNodeID(), pdisk.GetPath()); + if (const auto it = pdiskConfigs.find(key); it != pdiskConfigs.end()) { + pdisk.MutablePDiskConfig()->CopyFrom(it->second); + } else { + pdisk.ClearPDiskConfig(); + } + } + } + // copy nameservice-related things if (!appConfig.HasNameserviceConfig()) { *errorReason = "origin config missing mandatory NameserviceConfig section"; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 3e31f097b992..fd1463e5a4ee 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -529,6 +529,7 @@ namespace NKikimr::NStorage { struct TGroupRecord { TIntrusivePtr Info; // current group info + std::optional GType; ui32 MaxKnownGeneration = 0; // maximum seen generation std::optional Group; // group info as a protobuf NKikimrBlobStorage::TGroupInfo EncryptionParams; // latest encryption parameters; set only when encryption enabled; overlay in respect to Group @@ -680,6 +681,8 @@ namespace NKikimr::NStorage { std::set WorkingSyncers; + TReplQuoter::TPtr SyncRateQuoter; + void ApplyWorkingSyncers(const NKikimrBlobStorage::TEvControllerNodeServiceSetUpdate& update); void StartSyncerIfNeeded(TWorkingSyncer& syncer); void Handle(TAutoPtr> ev); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp index 8cac4d609ad7..7fa97a664c2f 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_resource.cpp @@ -96,7 +96,8 @@ void TNodeWarden::Handle(TEvNodeWardenStorageConfig::TPtr ev) { BridgeInfo = std::move(msg->BridgeInfo); if (StorageConfig->HasBlobStorageConfig()) { - if (const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); bsConfig.HasServiceSet()) { + const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); + if (bsConfig.HasServiceSet()) { const NKikimrBlobStorage::TNodeWardenServiceSet *proposed = nullptr; if (const auto& proposedConfig = ev->Get()->ProposedConfig) { Y_VERIFY_S(StorageConfig->GetGeneration() < proposedConfig->GetGeneration(), @@ -109,6 +110,7 @@ void TNodeWarden::Handle(TEvNodeWardenStorageConfig::TPtr ev) { } ApplyStorageConfig(bsConfig.GetServiceSet(), proposed); } + SyncRateQuoter->UpdateBytesPerSecond(bsConfig.GetBridgeSyncRateBytesPerSecond()); } if (StorageConfig->HasStateStorageConfig() && StorageConfig->HasStateStorageBoardConfig() && StorageConfig->HasSchemeBoardConfig()) { diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/activity.h b/ydb/core/blobstorage/ut_blobstorage/lib/activity.h index 9e4c0cb70172..3db8f0417610 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/activity.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/activity.h @@ -166,7 +166,7 @@ class TActivityActor : public TActorBootstrapped { TString buffer = GenerateBuffer(); const TLogoBlobID id(TabletId, Generation, Step++, 0, buffer.size(), 0); LOG_DEBUG_S(*TlsActivationContext, NActorsServices::TEST, Prefix << "sending TEvPut Id# " << id); - SendToProxy(Quoter.Take(TActivationContext::Now(), 1), new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max())); + SendToProxy(Quoter.Take(TActivationContext::Monotonic(), 1), new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max())); Inflight.emplace(id, std::move(buffer)); } if (!NumWritesRemaining && Inflight.empty()) { diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp index 6f9829a81dc6..15f95e19660b 100644 --- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp @@ -315,7 +315,7 @@ namespace NKikimr { quoter = QueryCtx->HullCtx->VCtx->ReplNodeResponseQuoter; } const TDuration duration = quoter - ? quoter->Take(TActivationContext::Now(), Result->CalculateSerializedSizeCached()) + ? quoter->Take(TActivationContext::Monotonic(), Result->CalculateSerializedSizeCached()) : TDuration::Zero(); if (duration != TDuration::Zero()) { Schedule(duration, new TEvents::TEvWakeup); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp index 6864a1b2e240..0be6cd91d963 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp @@ -275,7 +275,7 @@ namespace NKikimr { // limit bandwidth const auto& quoter = ReplCtx->VCtx->ReplNodeRequestQuoter; const TDuration duration = quoter - ? quoter->Take(TActivationContext::Now(), actualBytes) + ? quoter->Take(TActivationContext::Monotonic(), actualBytes) : TDuration::Zero(); std::unique_ptr event(ev->Release().Release()); if (duration != TDuration::Zero()) { diff --git a/ydb/core/blobstorage/vdisk/repl/repl_quoter.h b/ydb/core/blobstorage/vdisk/repl/repl_quoter.h index 00a895477670..32fe3c88ab6b 100644 --- a/ydb/core/blobstorage/vdisk/repl/repl_quoter.h +++ b/ydb/core/blobstorage/vdisk/repl/repl_quoter.h @@ -7,36 +7,40 @@ namespace NKikimr { using TPtr = std::shared_ptr; private: - using TAtomicInstant = std::atomic; - static_assert(TAtomicInstant::is_always_lock_free); + using TAtomicMonotonic = std::atomic; + static_assert(TAtomicMonotonic::is_always_lock_free); - TAtomicInstant NextQueueItemTimestamp; - const ui64 BytesPerSecond; + TAtomicMonotonic NextQueueItemTimestamp; + std::atomic_uint64_t BytesPerSecond; public: TReplQuoter(ui64 bytesPerSecond) : BytesPerSecond(bytesPerSecond) { - NextQueueItemTimestamp = TInstant::Zero(); + NextQueueItemTimestamp = TMonotonic::Zero(); } - TDuration Take(TInstant now, ui64 bytes) { - TDuration duration = TDuration::MicroSeconds(bytes * 1000000 / BytesPerSecond); + TDuration Take(TMonotonic now, ui64 bytes) { + TDuration duration = TDuration::MicroSeconds(bytes * 1000000 / BytesPerSecond.load()); for (;;) { - TInstant current = NextQueueItemTimestamp; - const TInstant notBefore = now - GetCapacity(); - const TInstant base = Max(current, notBefore); + TMonotonic current = NextQueueItemTimestamp; + const TMonotonic notBefore = now - GetCapacity(); + const TMonotonic base = Max(current, notBefore); const TDuration res = base - now; // time to wait until submitting desired query - const TInstant next = base + duration; + const TMonotonic next = base + duration; if (NextQueueItemTimestamp.compare_exchange_weak(current, next)) { return res; } } } + void UpdateBytesPerSecond(ui64 bytesPerSecond) { + BytesPerSecond.store(bytesPerSecond); + } + static void QuoteMessage(const TPtr& quoter, std::unique_ptr ev, ui64 bytes) { const TDuration timeout = quoter - ? quoter->Take(TActivationContext::Now(), bytes) + ? quoter->Take(TActivationContext::Monotonic(), bytes) : TDuration::Zero(); if (timeout != TDuration::Zero()) { TActivationContext::Schedule(timeout, ev.release()); @@ -46,7 +50,7 @@ namespace NKikimr { } ui64 GetMaxPacketSize() const { - return BytesPerSecond * GetCapacity().MicroSeconds() / 1000000; + return BytesPerSecond.load() * GetCapacity().MicroSeconds() / 1000000; } static constexpr TDuration GetCapacity() { diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index 6ebc9233a40f..fd24652a3688 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -599,6 +599,8 @@ std::unique_ptr TBlobStorageControll request->SetRollback(true); } + request->MutableStorageConfig()->PackFrom(storageConfig); + if (request->CommandSize()) { return ev; } @@ -761,21 +763,24 @@ void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerDistconfRequest break; } - const TString& effectiveConfig = storageYaml ? *storageYaml : *mainYaml; NKikimrBlobStorage::TStorageConfig storageConfig; - - try { - NKikimrConfig::TAppConfig appConfig = NYaml::Parse(effectiveConfig); - TString errorReason; - if (!NKikimr::NStorage::DeriveStorageConfig(appConfig, &storageConfig, &errorReason)) { + if (record.HasStorageConfig()) { + record.GetStorageConfig().UnpackTo(&storageConfig); + } else { + const TString& effectiveConfig = storageYaml ? *storageYaml : *mainYaml; + try { + NKikimrConfig::TAppConfig appConfig = NYaml::Parse(effectiveConfig); + TString errorReason; + if (!NKikimr::NStorage::DeriveStorageConfig(appConfig, &storageConfig, &errorReason)) { + rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); + rr.SetErrorReason("failed to derive storage config: " + errorReason); + break; + } + } catch (const std::exception& ex) { rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); - rr.SetErrorReason("failed to derive storage config: " + errorReason); + rr.SetErrorReason(TStringBuilder() << "failed to parse YAML: " << ex.what()); break; } - } catch (const std::exception& ex) { - rr.SetStatus(NKikimrBlobStorage::TEvControllerDistconfResponse::Error); - rr.SetErrorReason(TStringBuilder() << "failed to parse YAML: " << ex.what()); - break; } const ui64 cookie = NextValidationCookie++; diff --git a/ydb/core/mind/bscontroller/config.h b/ydb/core/mind/bscontroller/config.h index 1fc5343a27ed..c579216e2ec0 100644 --- a/ydb/core/mind/bscontroller/config.h +++ b/ydb/core/mind/bscontroller/config.h @@ -128,6 +128,9 @@ namespace NKikimr { const ui32 DefaultMaxSlots; // static pdisk/vdisk states + std::map NewStaticVSlots; + std::map NewStaticPDisks; + std::map NewStaticGroups; std::map& StaticVSlots; std::map& StaticPDisks; std::map& StaticGroups; @@ -147,7 +150,7 @@ namespace NKikimr { public: TConfigState(TBlobStorageController &controller, const THostRecordMap &hostRecords, TInstant timestamp, - TMonotonic mono) + TMonotonic mono, const NKikimrBlobStorage::TStorageConfig *storageConfig = nullptr) : Self(controller) , HostConfigs(&controller.HostConfigs) , Boxes(&controller.Boxes) @@ -168,14 +171,33 @@ namespace NKikimr { , Mono(mono) , DonorMode(controller.DonorMode) , DefaultMaxSlots(controller.DefaultMaxSlots) - , StaticVSlots(controller.StaticVSlots) - , StaticPDisks(controller.StaticPDisks) - , StaticGroups(controller.StaticGroups) + , StaticVSlots(storageConfig && storageConfig->HasBlobStorageConfig() ? NewStaticVSlots : controller.StaticVSlots) + , StaticPDisks(storageConfig && storageConfig->HasBlobStorageConfig() ? NewStaticPDisks : controller.StaticPDisks) + , StaticGroups(storageConfig && storageConfig->HasBlobStorageConfig() ? NewStaticGroups : controller.StaticGroups) , SerialManagementStage(&controller.SerialManagementStage) , StoragePoolStat(*controller.StoragePoolStat) , BridgeInfo(controller.BridgeInfo) { Y_ABORT_UNLESS(HostRecords); + if (storageConfig && storageConfig->HasBlobStorageConfig()) { + const auto& bsConfig = storageConfig->GetBlobStorageConfig(); + const auto& ss = bsConfig.GetServiceSet(); + for (const auto& pdisk : ss.GetPDisks()) { + const TPDiskId pdiskId(pdisk.GetNodeID(), pdisk.GetPDiskID()); + NewStaticPDisks.try_emplace(pdiskId, pdisk, controller.StaticPDisks); + } + for (const auto& vslot : ss.GetVDisks()) { + const auto& location = vslot.GetVDiskLocation(); + const TPDiskId pdiskId(location.GetNodeID(), location.GetPDiskID()); + const TVSlotId vslotId(pdiskId, location.GetVDiskSlotID()); + NewStaticVSlots.try_emplace(vslotId, vslot, controller.StaticVSlots, Mono); + ++StaticPDisks.at(pdiskId).StaticSlotUsage; + } + for (const auto& group : ss.GetGroups()) { + const auto groupId = TGroupId::FromProto(&group, &NKikimrBlobStorage::TGroupInfo::GetGroupID); + NewStaticGroups.try_emplace(groupId, group, controller.StaticGroups); + } + } } void Commit() { diff --git a/ydb/core/mind/bscontroller/config_cmd.cpp b/ydb/core/mind/bscontroller/config_cmd.cpp index dcaca16ae97b..3fa20538d5e5 100644 --- a/ydb/core/mind/bscontroller/config_cmd.cpp +++ b/ydb/core/mind/bscontroller/config_cmd.cpp @@ -185,7 +185,12 @@ namespace NKikimr::NBsController { } const auto& hostRecords = EnforceHostRecords ? *EnforceHostRecords : Self->HostRecords; - State.emplace(*Self, hostRecords, TActivationContext::Now(), TActivationContext::Monotonic()); + std::optional storageConfig; + if (Cmd.HasStorageConfig() && Self->SelfManagementEnabled) { + Cmd.GetStorageConfig().UnpackTo(&storageConfig.emplace()); + } + State.emplace(*Self, hostRecords, TActivationContext::Now(), TActivationContext::Monotonic(), + storageConfig ? &storageConfig.value() : nullptr); State->CheckConsistency(); TString m; diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index 698c102d59fd..2ac5214099aa 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -2426,7 +2426,7 @@ class TBlobStorageController : public TActor, public TTa TGroupInfo::TGroupStatus Status; bool LayoutCorrect = true; - TStaticGroupInfo(const NKikimrBlobStorage::TGroupInfo& group, std::map& prev) { + TStaticGroupInfo(const NKikimrBlobStorage::TGroupInfo& group, const std::map& prev) { TStringStream err; Info = TBlobStorageGroupInfo::Parse(group, nullptr, &err); Y_VERIFY_DEBUG_S(Info, "failed to parse static group, error# " << err.Str()); @@ -2434,7 +2434,7 @@ class TBlobStorageController : public TActor, public TTa return; } if (const auto it = prev.find(Info->GroupID); it != prev.end()) { - TStaticGroupInfo& item = it->second; + const TStaticGroupInfo& item = it->second; Status = item.Status; LayoutCorrect = item.LayoutCorrect; } @@ -2462,15 +2462,15 @@ class TBlobStorageController : public TActor, public TTa bool MetricsDirty = false; TStaticVSlotInfo(const NKikimrBlobStorage::TNodeWardenServiceSet::TVDisk& vdisk, - std::map& prev, TMonotonic mono) + const std::map& prev, TMonotonic mono) : VDiskId(VDiskIDFromVDiskID(vdisk.GetVDiskID())) , VDiskKind(vdisk.GetVDiskKind()) { const auto& loc = vdisk.GetVDiskLocation(); const TVSlotId vslotId(loc.GetNodeID(), loc.GetPDiskID(), loc.GetVDiskSlotID()); if (const auto it = prev.find(vslotId); it != prev.end()) { - TStaticVSlotInfo& item = it->second; - VDiskMetrics = std::move(item.VDiskMetrics); + const TStaticVSlotInfo& item = it->second; + VDiskMetrics = item.VDiskMetrics; VDiskStatus = item.VDiskStatus; VDiskStatusTimestamp = item.VDiskStatusTimestamp; ReadySince = item.ReadySince; @@ -2499,7 +2499,7 @@ class TBlobStorageController : public TActor, public TTa std::optional PDiskMetrics; TStaticPDiskInfo(const NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk& pdisk, - std::map& prev) + const std::map& prev) : NodeId(pdisk.GetNodeID()) , PDiskId(pdisk.GetPDiskID()) , Path(pdisk.GetPath()) @@ -2517,8 +2517,8 @@ class TBlobStorageController : public TActor, public TTa const TPDiskId pdiskId(NodeId, PDiskId); if (const auto it = prev.find(pdiskId); it != prev.end()) { - TStaticPDiskInfo& item = it->second; - PDiskMetrics = std::move(item.PDiskMetrics); + const TStaticPDiskInfo& item = it->second; + PDiskMetrics = item.PDiskMetrics; } } diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index b5e0de82e781..d5ee5d0928a4 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -8,6 +8,7 @@ import "ydb/core/protos/blobstorage_config.proto"; import "ydb/core/protos/blobstorage_disk.proto"; import "ydb/core/protos/blobstorage_disk_color.proto"; import "ydb/core/protos/node_whiteboard.proto"; +import "google/protobuf/any.proto"; package NKikimrBlobStorage; option java_package = "ru.yandex.kikimr.proto"; @@ -1563,6 +1564,7 @@ message TEvControllerDistconfRequest { optional uint64 ExpectedStorageConfigVersion = 5; optional string PeerName = 6; optional bytes UserToken = 7; + optional google.protobuf.Any StorageConfig = 8; // TStorageConfig with this request that is going to be applied after operation } message TEvControllerDistconfResponse { diff --git a/ydb/core/protos/blobstorage_config.proto b/ydb/core/protos/blobstorage_config.proto index b5cde6822172..030651ec3d7d 100644 --- a/ydb/core/protos/blobstorage_config.proto +++ b/ydb/core/protos/blobstorage_config.proto @@ -7,6 +7,7 @@ import "ydb/core/protos/blobstorage_disk_color.proto"; import "ydb/core/protos/blobstorage_pdisk_config.proto"; import "ydb/core/protos/blob_depot_config.proto"; import "ydb/library/actors/protos/interconnect.proto"; +import "google/protobuf/any.proto"; package NKikimrBlobStorage; @@ -686,6 +687,9 @@ message TConfigRequest { // requesting user identifier string UserSID = 13; + + // current TStorageConfig against which we are processing this request + google.protobuf.Any StorageConfig = 14; } enum ETriStateBool { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 245764cef25e..88396c5f640b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -355,6 +355,8 @@ message TBlobStorageConfig { optional NKikimrBlobStorage.TDefineBox DefineBox = 11; optional NKikimrBlobStorage.TBscConfig BscSettings = 12; + + optional uint64 BridgeSyncRateBytesPerSecond = 13 [default = 500000000]; // default 500 MB/s per node } message TSelfManagementConfig {