Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions ydb/core/blobstorage/bridge/syncer/syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
namespace NKikimr::NBridge {

TSyncerActor::TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId,
std::shared_ptr<TSyncerDataStats> syncerDataStats)
std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter,
TBlobStorageGroupType sourceGroupType)
: Info(std::move(info))
, SourceGroupId(sourceGroupId)
, TargetGroupId(targetGroupId)
, SyncerDataStats(std::move(syncerDataStats))
, SyncRateQuoter(std::move(syncRateQuoter))
, SourceGroupType(sourceGroupType)
{
Y_ABORT_UNLESS(Info);
Y_ABORT_UNLESS(Info->IsBridged());
Expand Down Expand Up @@ -368,7 +371,8 @@ namespace NKikimr::NBridge {
return false;
}

void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload payload) {
void TSyncerActor::IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload payload,
ui64 quoterBytes) {
switch (ev->Type()) {
#define MSG(TYPE) \
case TEvBlobStorage::TYPE: { \
Expand Down Expand Up @@ -399,11 +403,21 @@ namespace NKikimr::NBridge {
std::unique_ptr<IEventHandle> handle(CreateEventForBSProxy(SelfId(),
toTargetGroup ? TargetGroupId : SourceGroupId, ev.release(), cookie));

const TMonotonic now = TActivationContext::Monotonic();
const TDuration timeout = SyncRateQuoter && quoterBytes
? SyncRateQuoter->Take(now, quoterBytes)
: TDuration::Zero();
const TMonotonic timestamp = now + timeout;

if (QueriesInFlight < MaxQueriesInFlight) {
TActivationContext::Send(handle.release());
if (now < timestamp) {
TActivationContext::Schedule(timestamp, handle.release());
} else {
TActivationContext::Send(handle.release());
}
++QueriesInFlight;
} else {
PendingQueries.push_back(std::move(handle));
PendingQueries.emplace_back(std::move(handle), timestamp);
}
}

Expand Down Expand Up @@ -479,8 +493,9 @@ namespace NKikimr::NBridge {
++SyncerDataStats->BlobsDone;
} else if (r.Status == NKikimrProto::NODATA) {
// we have to query this blob and do full rewrite -- there was no data for it
const ui64 quoterBytes = r.Id.BlobSize() * SourceGroupType.TotalPartCount() / SourceGroupType.DataParts();
IssueQuery(false, std::make_unique<TEvBlobStorage::TEvGet>(r.Id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::FastRead));
NKikimrBlobStorage::FastRead), {}, quoterBytes);
} else if (r.Status == NKikimrProto::ERROR) {
SyncerDataStats->BytesError += r.Id.BlobSize();
++SyncerDataStats->BlobsError;
Expand Down Expand Up @@ -512,7 +527,13 @@ namespace NKikimr::NBridge {
--QueriesInFlight;
} else {
Y_ABORT_UNLESS(QueriesInFlight == MaxQueriesInFlight);
TActivationContext::Send(PendingQueries.front().release());
TMonotonic now = TActivationContext::Monotonic();
auto& [handle, timestamp] = PendingQueries.front();
if (now < timestamp) {
TActivationContext::Schedule(timestamp, handle.release());
} else {
TActivationContext::Send(handle.release());
}
PendingQueries.pop_front();
}

Expand Down Expand Up @@ -601,8 +622,10 @@ namespace NKikimr::NBridge {
)

IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId,
std::shared_ptr<TSyncerDataStats> syncerDataStats) {
return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats));
std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter,
TBlobStorageGroupType sourceGroupType) {
return new TSyncerActor(std::move(info), sourceGroupId, targetGroupId, std::move(syncerDataStats),
std::move(syncRateQuoter), sourceGroupType);
}

} // NKikimr::NBridge
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/bridge/syncer/syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "defs.h"

#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
#include <ydb/core/blobstorage/vdisk/repl/repl_quoter.h>

namespace NKikimr::NBridge {

Expand All @@ -16,6 +17,7 @@ namespace NKikimr::NBridge {
};

IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId,
std::shared_ptr<TSyncerDataStats> syncerDataStats);
std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter,
TBlobStorageGroupType sourceGroupType);

} // NKikimr::NBridge
10 changes: 7 additions & 3 deletions ydb/core/blobstorage/bridge/syncer/syncer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ namespace NKikimr::NBridge {
bool Finished = false;
ui32 Step = 0;
std::deque<TLogoBlobID> RestoreQueue;
TReplQuoter::TPtr SyncRateQuoter;
const TBlobStorageGroupType SourceGroupType;

public:
TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TGroupId sourceGroupId, TGroupId targetGroupId,
std::shared_ptr<TSyncerDataStats> syncerDataStats);
std::shared_ptr<TSyncerDataStats> syncerDataStats, TReplQuoter::TPtr syncRateQuoter,
TBlobStorageGroupType sourceGroupType);

void Bootstrap();
void BeginNextStep();
Expand All @@ -50,7 +53,7 @@ namespace NKikimr::NBridge {
const ui32 MaxQueriesInFlight = 16;
ui32 QueriesInFlight = 0;
THashMap<ui64, TQueryPayload> Payloads;
std::deque<std::unique_ptr<IEventHandle>> PendingQueries;
std::deque<std::tuple<std::unique_ptr<IEventHandle>, TMonotonic>> PendingQueries;
ui64 NextCookie = 1;

bool Errors = false;
Expand All @@ -63,7 +66,8 @@ namespace NKikimr::NBridge {
bool DoMergeEntities(std::deque<T>& source, std::deque<T>& target, bool sourceFinished, bool targetFinished,
TCallback&& merge, std::optional<TKey>& lastMerged);

void IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload queryPayload = {});
void IssueQuery(bool toTargetGroup, std::unique_ptr<IEventBase> ev, TQueryPayload queryPayload = {},
ui64 quoterBytes = 0);
void Handle(TEvBlobStorage::TEvBlockResult::TPtr ev);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr ev);
void Handle(TEvBlobStorage::TEvPutResult::TPtr ev);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ namespace NKikimr {
}

TString details = TStringBuilder() << " GroupId# " << GroupId
<< "UnconfiguredStateTs# " << UnconfiguredStateTs
<< " UnconfiguredStateTs# " << UnconfiguredStateTs
<< " UnconfiguredStateReason# " << UnconfiguredStateReasonStr(UnconfiguredStateReason);

LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY,
Expand Down Expand Up @@ -217,7 +217,7 @@ namespace NKikimr {
return;
}
TString details = TStringBuilder() << " GroupId# " << GroupId
<< "EstablishingSessionsStateTs# " << EstablishingSessionsStateTs
<< " EstablishingSessionsStateTs# " << EstablishingSessionsStateTs
<< " NumUnconnectedDisks# " << NumUnconnectedDisks;
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY,
"StateEstablishingSessions Wakeup TIMEOUT Marker# DSP12 " << details);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ namespace NKikimr::NStorage {

case EControllerOp::OTHER:
record.SetOperation(NKikimrBlobStorage::TEvControllerDistconfRequest::ValidateConfig);
record.MutableStorageConfig()->PackFrom(ProposedStorageConfig);
break;

case EControllerOp::UNSET:
Expand Down
20 changes: 19 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ namespace NKikimr::NStorage {
const auto [it, _] = Groups.try_emplace(groupId);
TGroupRecord& group = it->second;
group.MaxKnownGeneration = Max(group.MaxKnownGeneration, generation);
if (newGroup) {
if (newGroup->HasErasureSpecies()) {
const auto erasure = static_cast<TBlobStorageGroupType::EErasureSpecies>(newGroup->GetErasureSpecies());
Y_DEBUG_ABORT_UNLESS(!group.GType || group.GType->GetErasure() == erasure);
group.GType.emplace(erasure);
} else {
Y_ABORT_UNLESS(newGroup->RingsSize() == 0); // ensure no VDisks in group
group.GType.emplace(TBlobStorageGroupType::ErasureNone);
}
}

// forget pending queries
if (fromController) {
Expand Down Expand Up @@ -380,8 +390,16 @@ namespace NKikimr::NStorage {
if (startNew) {
syncer.BridgeProxyGroupGeneration = syncer.PendingBridgeProxyGroupGeneration;
syncer.SyncerDataStats = std::make_unique<NBridge::TSyncerDataStats>();

TBlobStorageGroupType sourceGroupType(TBlobStorageGroupType::ErasureNone);
if (const auto it = Groups.find(syncer.SourceGroupId.GetRawId()); it != Groups.end() && it->second.GType) {
sourceGroupType = *it->second.GType;
} else {
Y_DEBUG_ABORT("can't obtain source group type");
}

syncer.ActorId = Register(NBridge::CreateSyncerActor(group.Info, syncer.SourceGroupId, syncer.TargetGroupId,
syncer.SyncerDataStats));
syncer.SyncerDataStats, SyncRateQuoter, sourceGroupType));
syncer.Finished = false;
syncer.ErrorReason.reset();
++syncer.NumStart;
Expand Down
30 changes: 26 additions & 4 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ void TNodeWarden::Bootstrap() {
actorSystem->RegisterLocalService(MakeBlobStorageSyncBrokerID(), Register(
CreateSyncBrokerActor(MaxInProgressSyncCount)));

// create bridge syncer rate quoter
SyncRateQuoter = std::make_shared<TReplQuoter>(Cfg->BlobStorageConfig.GetBridgeSyncRateBytesPerSecond());

// determine if we are running in 'mock' mode
EnableProxyMock = Cfg->BlobStorageConfig.GetServiceSet().GetEnableProxyMock();

Expand Down Expand Up @@ -1403,6 +1406,10 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app
const auto& bsFrom = appConfig.GetBlobStorageConfig();
auto *bsTo = config->MutableBlobStorageConfig();

const auto hasStaticGroupInfo = [](const NKikimrBlobStorage::TNodeWardenServiceSet& ss) {
return ss.PDisksSize() && ss.VDisksSize() && ss.GroupsSize();
};

if (bsFrom.HasServiceSet()) {
const auto& ssFrom = bsFrom.GetServiceSet();
auto *ssTo = bsTo->MutableServiceSet();
Expand All @@ -1419,10 +1426,6 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app
ssTo->ClearReplBrokerConfig();
}

const auto hasStaticGroupInfo = [](const NKikimrBlobStorage::TNodeWardenServiceSet& ss) {
return ss.PDisksSize() && ss.VDisksSize() && ss.GroupsSize();
};

// update static group information unless distconf is enabled
if (!hasStaticGroupInfo(ssFrom) && config->GetSelfManagementConfig().GetEnabled()) {
// distconf enabled, keep it as is
Expand Down Expand Up @@ -1556,6 +1559,25 @@ bool NKikimr::NStorage::DeriveStorageConfig(const NKikimrConfig::TAppConfig& app
bsTo->ClearBscSettings();
}

// copy PDiskConfig from DefineHostConfig/DefineBox if this section is managed automatically
if (!hasStaticGroupInfo(bsFrom.GetServiceSet()) && config->GetSelfManagementConfig().GetEnabled()) {
THashMap<std::tuple<ui32, TString>, NKikimrBlobStorage::TPDiskConfig> pdiskConfigs;
auto callback = [&](const auto& node, const auto& drive) {
if (drive.HasPDiskConfig()) {
pdiskConfigs.emplace(std::make_tuple(node.GetNodeId(), drive.GetPath()), drive.GetPDiskConfig());
}
};
EnumerateConfigDrives(*config, 0, callback, nullptr, true);
for (auto& pdisk : *bsTo->MutableServiceSet()->MutablePDisks()) {
const auto key = std::make_tuple(pdisk.GetNodeID(), pdisk.GetPath());
if (const auto it = pdiskConfigs.find(key); it != pdiskConfigs.end()) {
pdisk.MutablePDiskConfig()->CopyFrom(it->second);
} else {
pdisk.ClearPDiskConfig();
}
}
}

// copy nameservice-related things
if (!appConfig.HasNameserviceConfig()) {
*errorReason = "origin config missing mandatory NameserviceConfig section";
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ namespace NKikimr::NStorage {

struct TGroupRecord {
TIntrusivePtr<TBlobStorageGroupInfo> Info; // current group info
std::optional<TBlobStorageGroupType> GType;
ui32 MaxKnownGeneration = 0; // maximum seen generation
std::optional<NKikimrBlobStorage::TGroupInfo> Group; // group info as a protobuf
NKikimrBlobStorage::TGroupInfo EncryptionParams; // latest encryption parameters; set only when encryption enabled; overlay in respect to Group
Expand Down Expand Up @@ -680,6 +681,8 @@ namespace NKikimr::NStorage {

std::set<TWorkingSyncer> WorkingSyncers;

TReplQuoter::TPtr SyncRateQuoter;

void ApplyWorkingSyncers(const NKikimrBlobStorage::TEvControllerNodeServiceSetUpdate& update);
void StartSyncerIfNeeded(TWorkingSyncer& syncer);
void Handle(TAutoPtr<TEventHandle<TEvNodeWardenNotifySyncerFinished>> ev);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ void TNodeWarden::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
BridgeInfo = std::move(msg->BridgeInfo);

if (StorageConfig->HasBlobStorageConfig()) {
if (const auto& bsConfig = StorageConfig->GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
const auto& bsConfig = StorageConfig->GetBlobStorageConfig();
if (bsConfig.HasServiceSet()) {
const NKikimrBlobStorage::TNodeWardenServiceSet *proposed = nullptr;
if (const auto& proposedConfig = ev->Get()->ProposedConfig) {
Y_VERIFY_S(StorageConfig->GetGeneration() < proposedConfig->GetGeneration(),
Expand All @@ -109,6 +110,7 @@ void TNodeWarden::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
}
ApplyStorageConfig(bsConfig.GetServiceSet(), proposed);
}
SyncRateQuoter->UpdateBytesPerSecond(bsConfig.GetBridgeSyncRateBytesPerSecond());
}

if (StorageConfig->HasStateStorageConfig() && StorageConfig->HasStateStorageBoardConfig() && StorageConfig->HasSchemeBoardConfig()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/ut_blobstorage/lib/activity.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class TActivityActor : public TActorBootstrapped<TActivityActor> {
TString buffer = GenerateBuffer();
const TLogoBlobID id(TabletId, Generation, Step++, 0, buffer.size(), 0);
LOG_DEBUG_S(*TlsActivationContext, NActorsServices::TEST, Prefix << "sending TEvPut Id# " << id);
SendToProxy(Quoter.Take(TActivationContext::Now(), 1), new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max()));
SendToProxy(Quoter.Take(TActivationContext::Monotonic(), 1), new TEvBlobStorage::TEvPut(id, buffer, TInstant::Max()));
Inflight.emplace(id, std::move(buffer));
}
if (!NumWritesRemaining && Inflight.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/query/query_extr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ namespace NKikimr {
quoter = QueryCtx->HullCtx->VCtx->ReplNodeResponseQuoter;
}
const TDuration duration = quoter
? quoter->Take(TActivationContext::Now(), Result->CalculateSerializedSizeCached())
? quoter->Take(TActivationContext::Monotonic(), Result->CalculateSerializedSizeCached())
: TDuration::Zero();
if (duration != TDuration::Zero()) {
Schedule(duration, new TEvents::TEvWakeup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ namespace NKikimr {
// limit bandwidth
const auto& quoter = ReplCtx->VCtx->ReplNodeRequestQuoter;
const TDuration duration = quoter
? quoter->Take(TActivationContext::Now(), actualBytes)
? quoter->Take(TActivationContext::Monotonic(), actualBytes)
: TDuration::Zero();
std::unique_ptr<TEvBlobStorage::TEvVGetResult> event(ev->Release().Release());
if (duration != TDuration::Zero()) {
Expand Down
30 changes: 17 additions & 13 deletions ydb/core/blobstorage/vdisk/repl/repl_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,40 @@ namespace NKikimr {
using TPtr = std::shared_ptr<TReplQuoter>;

private:
using TAtomicInstant = std::atomic<TInstant>;
static_assert(TAtomicInstant::is_always_lock_free);
using TAtomicMonotonic = std::atomic<TMonotonic>;
static_assert(TAtomicMonotonic::is_always_lock_free);

TAtomicInstant NextQueueItemTimestamp;
const ui64 BytesPerSecond;
TAtomicMonotonic NextQueueItemTimestamp;
std::atomic_uint64_t BytesPerSecond;

public:
TReplQuoter(ui64 bytesPerSecond)
: BytesPerSecond(bytesPerSecond)
{
NextQueueItemTimestamp = TInstant::Zero();
NextQueueItemTimestamp = TMonotonic::Zero();
}

TDuration Take(TInstant now, ui64 bytes) {
TDuration duration = TDuration::MicroSeconds(bytes * 1000000 / BytesPerSecond);
TDuration Take(TMonotonic now, ui64 bytes) {
TDuration duration = TDuration::MicroSeconds(bytes * 1000000 / BytesPerSecond.load());
for (;;) {
TInstant current = NextQueueItemTimestamp;
const TInstant notBefore = now - GetCapacity();
const TInstant base = Max(current, notBefore);
TMonotonic current = NextQueueItemTimestamp;
const TMonotonic notBefore = now - GetCapacity();
const TMonotonic base = Max(current, notBefore);
const TDuration res = base - now; // time to wait until submitting desired query
const TInstant next = base + duration;
const TMonotonic next = base + duration;
if (NextQueueItemTimestamp.compare_exchange_weak(current, next)) {
return res;
}
}
}

void UpdateBytesPerSecond(ui64 bytesPerSecond) {
BytesPerSecond.store(bytesPerSecond);
}

static void QuoteMessage(const TPtr& quoter, std::unique_ptr<IEventHandle> ev, ui64 bytes) {
const TDuration timeout = quoter
? quoter->Take(TActivationContext::Now(), bytes)
? quoter->Take(TActivationContext::Monotonic(), bytes)
: TDuration::Zero();
if (timeout != TDuration::Zero()) {
TActivationContext::Schedule(timeout, ev.release());
Expand All @@ -46,7 +50,7 @@ namespace NKikimr {
}

ui64 GetMaxPacketSize() const {
return BytesPerSecond * GetCapacity().MicroSeconds() / 1000000;
return BytesPerSecond.load() * GetCapacity().MicroSeconds() / 1000000;
}

static constexpr TDuration GetCapacity() {
Expand Down
Loading
Loading