Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/base/blobstorage_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
namespace NKikimr {

TEvNodeWardenStorageConfig::TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
TBridgeInfo::TPtr bridgeInfo)
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo)
: Config(std::move(config))
, ProposedConfig(std::move(proposedConfig))
, SelfManagementEnabled(selfManagementEnabled)
, BridgeInfo(std::move(bridgeInfo))
{}
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/base/blobstorage_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,13 +592,11 @@ namespace NKikimr {
: TEventLocal<TEvNodeWardenStorageConfig, TEvBlobStorage::EvNodeWardenStorageConfig>
{
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> Config;
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> ProposedConfig;
bool SelfManagementEnabled;
TBridgeInfo::TPtr BridgeInfo;

TEvNodeWardenStorageConfig(std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> config,
std::shared_ptr<const NKikimrBlobStorage::TStorageConfig> proposedConfig, bool selfManagementEnabled,
TBridgeInfo::TPtr bridgeInfo);
bool selfManagementEnabled, TBridgeInfo::TPtr bridgeInfo);
~TEvNodeWardenStorageConfig();
};

Expand Down
53 changes: 28 additions & 25 deletions ydb/core/blobstorage/nodewarden/distconf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ namespace NKikimr::NStorage {
ProposedStorageConfig.reset();
}

ReportStorageConfigToNodeWarden(0);
ReportStorageConfigToNodeWarden();

if (IsSelfStatic) {
PersistConfig({});
Expand Down Expand Up @@ -177,15 +177,7 @@ namespace NKikimr::NStorage {
}

void TDistributedConfigKeeper::HandleConfigConfirm(STATEFN_SIG) {
if (ev->Cookie) {
STLOG(PRI_DEBUG, BS_NODE, NWDC46, "HandleConfigConfirm", (Cookie, ev->Cookie),
(ProposedStorageConfigCookie, ProposedStorageConfigCookie),
(ProposedStorageConfigCookieUsage, ProposedStorageConfigCookieUsage));
if (ev->Cookie == ProposedStorageConfigCookie && ProposedStorageConfigCookieUsage) {
--ProposedStorageConfigCookieUsage;
}
FinishAsyncOperation(ev->Cookie);
}
Y_UNUSED(ev);
}

void TDistributedConfigKeeper::SendEvent(ui32 nodeId, ui64 cookie, TActorId sessionId, std::unique_ptr<IEventBase> ev) {
Expand Down Expand Up @@ -233,13 +225,22 @@ namespace NKikimr::NStorage {

for (const auto& [cookie, task] : ScatterTasks) {
for (const ui32 nodeId : task.PendingNodes) {
const auto it = DirectBoundNodes.find(nodeId);
Y_ABORT_UNLESS(it != DirectBoundNodes.end());
TBoundNode& info = it->second;
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
TBoundNode& info = it->second;
Y_ABORT_UNLESS(info.ScatterTasks.contains(cookie));
} else {
Y_ABORT_UNLESS(AddedNodesScatterTasks.contains({nodeId, cookie}));
}
}
}

for (const auto& [nodeId, cookie] : AddedNodesScatterTasks) {
const auto it = ScatterTasks.find(cookie);
Y_ABORT_UNLESS(it != ScatterTasks.end());
TScatterTask& task = it->second;
Y_ABORT_UNLESS(task.PendingNodes.contains(nodeId));
}

for (const auto& [nodeId, info] : DirectBoundNodes) {
for (const ui64 cookie : info.ScatterTasks) {
const auto it = ScatterTasks.find(cookie);
Expand All @@ -250,10 +251,12 @@ namespace NKikimr::NStorage {
}

for (const auto& [cookie, task] : ScatterTasks) {
if (task.Origin) {
Y_ABORT_UNLESS(Binding);
Y_ABORT_UNLESS(task.Origin == Binding);
}
std::visit(TOverloaded{
[&](const TBinding& origin) { Y_ABORT_UNLESS(origin == Binding); },
[&](const TActorId& /*actorId*/) { Y_ABORT_UNLESS(!Binding); },
[&](const TScatterTaskOriginFsm&) {},
[&](const TScatterTaskOriginTargeted&) {}
}, task.Origin);
}

for (const auto& [nodeId, subs] : SubscribedSessions) {
Expand All @@ -277,6 +280,10 @@ namespace NKikimr::NStorage {
if (UnsubscribeQueue.contains(nodeId)) {
okay = true;
}
if (!okay) {
const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0});
okay = it != AddedNodesScatterTasks.end() && std::get<0>(*it) == nodeId;
}
Y_ABORT_UNLESS(okay);
if (subs.SubscriptionCookie) {
const auto it = SubscriptionCookieMap.find(subs.SubscriptionCookie);
Expand Down Expand Up @@ -377,16 +384,12 @@ namespace NKikimr::NStorage {
}
}

void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden(ui64 cookie) {
void TDistributedConfigKeeper::ReportStorageConfigToNodeWarden() {
Y_ABORT_UNLESS(StorageConfig);
const TActorId wardenId = MakeBlobStorageNodeWardenID(SelfId().NodeId());
const auto& config = SelfManagementEnabled ? StorageConfig : BaseConfig;
auto proposedConfig = ProposedStorageConfig && SelfManagementEnabled
? std::make_shared<NKikimrBlobStorage::TStorageConfig>(*ProposedStorageConfig)
: nullptr;
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, std::move(proposedConfig), SelfManagementEnabled,
BridgeInfo);
Send(wardenId, ev.release(), 0, cookie);
auto ev = std::make_unique<TEvNodeWardenStorageConfig>(config, SelfManagementEnabled, BridgeInfo);
Send(wardenId, ev.release());
}

STFUNC(TDistributedConfigKeeper::StateFunc) {
Expand Down
39 changes: 29 additions & 10 deletions ydb/core/blobstorage/nodewarden/distconf.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,39 @@ namespace NKikimr::NStorage {
ui64 Id = RandomNumber<ui64>(); // unique id
};

struct TScatterTaskOriginFsm {
TString ToString() const { return "fsm"; }
};

struct TScatterTaskOriginTargeted {
TActorId Sender;
ui64 Cookie;
TActorId InterconnectSessionId;

TString ToString() const { return TStringBuilder() << "{Sender# " << Sender << " Cookie# " << Cookie
<< " InterconnectSessionId# " << InterconnectSessionId << '}'; }
};

using TScatterTaskOrigin = std::variant<
TBinding, // when scatter is received from root
TActorId, // locally generated by invoke processor
TScatterTaskOriginFsm, // locally generated by configuration change FSM
TScatterTaskOriginTargeted // when targeted scatter is issued by cluster leader to newly added nodes
>;

struct TScatterTask {
const std::optional<TBinding> Origin;
const TScatterTaskOrigin Origin;
const ui64 ScepterCounter;
const TActorId ActorId;

THashSet<ui32> PendingNodes;
ui32 AsyncOperationsPending = 0;
TEvScatter Request;
TEvGather Response;
std::vector<TEvGather> CollectedResponses; // from bound nodes

TScatterTask(const std::optional<TBinding>& origin, TEvScatter&& request,
ui64 scepterCounter, TActorId actorId)
: Origin(origin)
TScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, ui64 scepterCounter)
: Origin(std::move(origin))
, ScepterCounter(scepterCounter)
, ActorId(actorId)
{
Request.Swap(&request);
if (Request.HasCookie()) {
Expand Down Expand Up @@ -218,8 +235,7 @@ namespace NKikimr::NStorage {

// proposed storage configuration of the cluster
std::optional<NKikimrBlobStorage::TStorageConfig> ProposedStorageConfig; // proposed one
ui64 ProposedStorageConfigCookie; // if set, then this configuration is being written right now
ui32 ProposedStorageConfigCookieUsage = 0;
std::optional<ui64> ProposedStorageConfigCookie; // if set, then this configuration is being written right now

// most relevant proposed config
using TPersistCallback = std::function<void(TEvPrivate::TEvStorageConfigStored&)>;
Expand Down Expand Up @@ -266,6 +282,7 @@ namespace NKikimr::NStorage {
ui64 NextScatterCookie = RandomNumber<ui64>();
using TScatterTasks = THashMap<ui64, TScatterTask>;
TScatterTasks ScatterTasks;
std::set<std::tuple<ui32, ui64>> AddedNodesScatterTasks;

std::optional<TActorId> StateStorageSelfHealActor;

Expand All @@ -284,6 +301,7 @@ namespace NKikimr::NStorage {
NKikimrBlobStorage::TStorageConfig StorageConfig; // storage config being proposed
TActorId ActorId; // actor id waiting for this operation to complete
bool MindPrev; // mind previous configuration quorum
std::vector<TNodeIdentifier> AddedNodes; // a list of nodes being added in this configuration change
};
std::optional<TProposition> CurrentProposition;

Expand Down Expand Up @@ -348,7 +366,7 @@ namespace NKikimr::NStorage {
void Halt(); // cease any distconf activity, unbind and reject any bindings
bool ApplyStorageConfig(const NKikimrBlobStorage::TStorageConfig& config, bool fromBinding = false);
void HandleConfigConfirm(STATEFN_SIG);
void ReportStorageConfigToNodeWarden(ui64 cookie);
void ReportStorageConfigToNodeWarden();

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// PDisk configuration retrieval and storing
Expand Down Expand Up @@ -411,6 +429,7 @@ namespace NKikimr::NStorage {
void BecomeRoot();
void UnbecomeRoot();
void HandleErrorTimeout();
void UndoCurrentPropositionNodeChange(TProposition& proposition);
void ProcessGather(TEvGather *res);
bool HasConnectedNodeQuorum(const NKikimrBlobStorage::TStorageConfig& config, bool local) const;

Expand Down Expand Up @@ -487,7 +506,7 @@ namespace NKikimr::NStorage {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Scatter/gather logic

void IssueScatterTask(std::optional<TActorId> actorId, TEvScatter&& request);
void IssueScatterTask(TScatterTaskOrigin&& origin, TEvScatter&& request, std::span<TNodeIdentifier> addedNodes = {});
void CheckCompleteScatterTask(TScatterTasks::iterator it);
void FinishAsyncOperation(ui64 cookie);
void IssueScatterTaskForNode(ui32 nodeId, TBoundNode& info, ui64 cookie, TScatterTask& task);
Expand Down
59 changes: 46 additions & 13 deletions ydb/core/blobstorage/nodewarden/distconf_binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ namespace NKikimr::NStorage {
void TDistributedConfigKeeper::Handle(TEvInterconnect::TEvNodesInfo::TPtr ev) {
STLOG(PRI_DEBUG, BS_NODE, NWDC11, "TEvNodesInfo");

if (SelfManagementEnabled) {
// we obtain node list only from current StorageConfig
return;
}

std::vector<std::tuple<TNodeIdentifier, TNodeLocation>> newNodeList;
for (const auto& item : ev->Get()->Nodes) {
if (item.IsStatic) {
Expand All @@ -18,6 +23,8 @@ namespace NKikimr::NStorage {
}

void TDistributedConfigKeeper::ApplyNewNodeList(std::span<std::tuple<TNodeIdentifier, TNodeLocation>> newNodeList) {
STLOG(PRI_DEBUG, BS_NODE, NWDC13, "ApplyNewNodeList", (NewNodeList, newNodeList));

// do not start configuration negotiation for dynamic nodes
if (!IsSelfStatic) {
std::optional<TString> expectedBridgePileName;
Expand Down Expand Up @@ -49,7 +56,8 @@ namespace NKikimr::NStorage {
break;
}
}
Y_ABORT_UNLESS(found);
auto fn = [](const auto& x) { return TStringBuilder() << std::get<0>(x); };
Y_VERIFY_S(found, "SelfNodeId# " << selfNodeId << " NewNodeList# " << FormatList(newNodeList | std::views::transform(fn)));

// process all other nodes, find bindable ones (from our current pile) and build list of all nodes
AllNodeIds.clear();
Expand All @@ -61,7 +69,7 @@ namespace NKikimr::NStorage {

for (const auto& [item, location] : newNodeList) {
const ui32 nodeId = item.NodeId();
AllNodeIds.insert(item.NodeId());
AllNodeIds.insert(nodeId);

// check if node is from the same pile (as this one)
if (location.GetBridgePileName() == SelfNodeBridgePileName) {
Expand Down Expand Up @@ -297,6 +305,13 @@ namespace NKikimr::NStorage {
if (Binding && Binding->NodeId == nodeId) {
AbortBinding("disconnection", false);
}

// abort scatter tasks issued to newly added nodes
for (auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
std::get<0>(*it) == nodeId; it = AddedNodesScatterTasks.erase(it)) {
const auto& [nodeId, cookie] = *it;
AbortScatterTask(cookie, nodeId);
}
}

void TDistributedConfigKeeper::UnsubscribeInterconnect(ui32 nodeId) {
Expand All @@ -309,6 +324,10 @@ namespace NKikimr::NStorage {
if (ConnectedDynamicNodes.contains(nodeId)) {
return;
}
if (const auto it = AddedNodesScatterTasks.lower_bound({nodeId, 0}); it != AddedNodesScatterTasks.end() &&
std::get<0>(*it) == nodeId) {
return;
}
if (const auto it = SubscribedSessions.find(nodeId); it != SubscribedSessions.end()) {
TSessionSubscription& subs = it->second;
STLOG(PRI_DEBUG, BS_NODE, NWDC55, "UnsubscribeInterconnect", (NodeId, nodeId), (Subscription, subs));
Expand All @@ -319,11 +338,17 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(jt != SubscriptionCookieMap.end());
Y_ABORT_UNLESS(jt->second == nodeId);
SubscriptionCookieMap.erase(jt);
if (!AllNodeIds.contains(nodeId)) {
TActivationContext::Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0,
TActivationContext::InterconnectProxy(nodeId), {}, nullptr, 0));
}
} else {
// we already had TEvNodeConnected, so we have to unsubscribe
Y_ABORT_UNLESS(subs.SessionId);
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, subs.SessionId, SelfId(),
nullptr, 0));
ui32 event = AllNodeIds.contains(nodeId)
? TEvents::TSystem::Unsubscribe
: TEvents::TSystem::Poison;
TActivationContext::Send(new IEventHandle(event, 0, subs.SessionId, SelfId(), nullptr, 0));
}
SubscribedSessions.erase(it);
}
Expand Down Expand Up @@ -403,7 +428,8 @@ namespace NKikimr::NStorage {
return; // possible race with unbinding
}

Y_ABORT_UNLESS(Binding->RootNodeId || ScatterTasks.empty());
auto isTargeted = [](const TScatterTaskOrigin& origin) { return std::holds_alternative<TScatterTaskOriginTargeted>(origin); };
Y_ABORT_UNLESS(Binding->RootNodeId || std::ranges::all_of(ScatterTasks | std::views::values, isTargeted, &TScatterTask::Origin));

// check if this binding was accepted and if it is acceptable from our point of view
bool bindingUpdate = false;
Expand Down Expand Up @@ -546,13 +572,10 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(senderNodeId != SelfId().NodeId());
auto& record = ev->Get()->Record;

STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush", (NodeId, senderNodeId), (Cookie, ev->Cookie),
(SessionId, ev->InterconnectSession), (Binding, Binding), (Record, record),
(RootNodeId, GetRootNodeId()));

// check if we have to send our current config to the peer
const NKikimrBlobStorage::TStorageConfig *configToPeer = nullptr;
std::optional<ui64> requestStorageConfigGeneration;
const bool knownNode = AllNodeIds.contains(senderNodeId);
if (StorageConfig) {
for (const auto& item : record.GetBoundNodes()) {
if (item.GetNodeId().GetNodeId() == senderNodeId) {
Expand All @@ -566,9 +589,19 @@ namespace NKikimr::NStorage {
}
}

if (!AllNodeIds.contains(senderNodeId)) {
STLOG(PRI_DEBUG, BS_NODE, NWDC02, "TEvNodeConfigPush",
(NodeId, senderNodeId),
(Cookie, ev->Cookie),
(SessionId, ev->InterconnectSession),
(Binding, Binding),
(Record, record),
(RootNodeId, GetRootNodeId()),
(StorageConfigGeneration, StorageConfig ? (i64)StorageConfig->GetGeneration() : -1),
(KnownNode, knownNode));

if (!knownNode) {
// node has been already deleted from the config, but new subscription is coming through -- ignoring it
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer));
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr));
return;
}

Expand All @@ -577,7 +610,7 @@ namespace NKikimr::NStorage {
STLOG(PRI_DEBUG, BS_NODE, NWDC28, "TEvNodeConfigPush rejected", (NodeId, senderNodeId),
(Cookie, ev->Cookie), (SessionId, ev->InterconnectSession), (Binding, Binding),
(Record, record));
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(configToPeer));
SendEvent(*ev, TEvNodeConfigReversePush::MakeRejected(nullptr));
return;
}

Expand All @@ -589,7 +622,7 @@ namespace NKikimr::NStorage {
// nodes AND this is the root one
} else {
// this is either not the root node, or no quorum for connection
auto response = TEvNodeConfigReversePush::MakeRejected(configToPeer);
auto response = TEvNodeConfigReversePush::MakeRejected(nullptr);
if (Binding && Binding->RootNodeId) {
// command peer to join this specific node
response->Record.SetRootNodeId(Binding->RootNodeId);
Expand Down
Loading
Loading