Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost coordinator steps during mediator reconnect race #2037

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions ydb/core/tx/coordinator/coordinator_volatile_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tx/coordinator/public/events.h>
#include <ydb/core/tx/coordinator/coordinator_impl.h>
#include <ydb/core/tx/tx.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
Expand Down Expand Up @@ -258,6 +259,104 @@ namespace NKikimr::NFlatTxCoordinator::NTest {
observedSteps.clear();
}

Y_UNIT_TEST(MediatorReconnectPlanRace) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(1)
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);

auto &runtime = *server->GetRuntime();
runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG);

auto sender = runtime.AllocateEdgeActor();
ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain);
ui64 tabletId = ChangeStateStorage(TTestTxConfig::TxTablet0, server->GetSettings().Domain);

CreateTestBootstrapper(runtime,
CreateTestTabletInfo(tabletId, TTabletTypes::Dummy),
[](const TActorId& tablet, TTabletStorageInfo* info) {
return new TPlanTargetTablet(tablet, info);
});

{
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot, 1));
runtime.DispatchEvents(options);
}

TActorId mediatorQueue;
std::vector<std::unique_ptr<IEventHandle>> mediatorQueueSteps;
auto blockMediatorQueueSteps = runtime.AddObserver<TEvMediatorQueueStep>([&](TEvMediatorQueueStep::TPtr& ev) {
mediatorQueue = ev->GetRecipientRewrite();
mediatorQueueSteps.emplace_back(ev.Release());
Cerr << "... blocked TEvMediatorQueueStep for " << mediatorQueue << Endl;
});

std::vector<ui64> observedSteps;
auto stepsObserver = runtime.AddObserver<TEvTxProcessing::TEvPlanStep>([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) {
auto* msg = ev->Get();
observedSteps.push_back(msg->Record.GetStep());
});

auto waitFor = [&](const auto& condition, const TString& description) {
for (int i = 0; i < 5 && !condition(); ++i) {
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
}
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
};

ui64 txId = 12345678;
if (auto propose = std::make_unique<TEvTxProxy::TEvProposeTransaction>(coordinatorId, txId, 0, Min<ui64>(), Max<ui64>())) {
auto* tx = propose->Record.MutableTransaction();
// Not necessary, but we test volatile transactions here
tx->SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
auto* affected = tx->AddAffectedSet();
affected->SetTabletId(tabletId);
affected->SetFlags(TEvTxProxy::TEvProposeTransaction::AffectedWrite);

runtime.SendToPipe(coordinatorId, sender, propose.release());
}

waitFor([&]{ return mediatorQueueSteps.size() > 0; }, "TEvMediatorQueueStep");
UNIT_ASSERT_VALUES_EQUAL(mediatorQueueSteps.size(), 1u);

// We shouldn't see any steps yet
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 0u);

auto injectMediatorQueueStep = runtime.AddObserver<TEvTabletPipe::TEvClientDestroyed>([&](TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
if (ev->GetRecipientRewrite() == mediatorQueue) {
Cerr << "... found pipe disconnect at " << mediatorQueue << Endl;
// Stop blocking mediator queue steps
// This seems to be safe, since we remove someone else from std::list
blockMediatorQueueSteps.Remove();
// Inject blocked mediator steps into queue mailbox, they will be handled after the disconnect
for (auto& ev : mediatorQueueSteps) {
runtime.Send(ev.release(), 0, true);
}
mediatorQueueSteps.clear();
}
});

Cerr << "... rebooting mediator" << Endl;
RebootTablet(runtime, mediatorId, sender);

waitFor([&]{ return mediatorQueueSteps.empty(); }, "injected mediator steps");

// We must observe the plan step soon
runtime.SimulateSleep(TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 1u);
}

} // Y_UNIT_TEST_SUITE(CoordinatorVolatile)

} // namespace NKikimr::NFlatTxCoordinator::NTest
3 changes: 2 additions & 1 deletion ydb/core/tx/coordinator/mediator_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi

STFUNC(StateSync) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvMediatorQueueStep, Handle);
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvTxCoordinator::TEvCoordinatorSyncResult, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
Expand All @@ -294,8 +295,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvMediatorQueueStep, Handle);
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
CFunc(TEvents::TSystem::PoisonPill, Die)
}
Expand Down
Loading