Skip to content

Commit

Permalink
Merge ba273f4 into 6d3e410
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury committed Feb 16, 2024
2 parents 6d3e410 + ba273f4 commit 0c4f274
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
99 changes: 99 additions & 0 deletions ydb/core/tx/coordinator/coordinator_volatile_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/tx/coordinator/public/events.h>
#include <ydb/core/tx/coordinator/coordinator_impl.h>
#include <ydb/core/tx/tx.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
Expand Down Expand Up @@ -258,6 +259,104 @@ namespace NKikimr::NFlatTxCoordinator::NTest {
observedSteps.clear();
}

Y_UNIT_TEST(MediatorReconnectPlanRace) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetNodeCount(1)
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);

auto &runtime = *server->GetRuntime();
runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG);

auto sender = runtime.AllocateEdgeActor();
ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain);
ui64 tabletId = ChangeStateStorage(TTestTxConfig::TxTablet0, server->GetSettings().Domain);

CreateTestBootstrapper(runtime,
CreateTestTabletInfo(tabletId, TTabletTypes::Dummy),
[](const TActorId& tablet, TTabletStorageInfo* info) {
return new TPlanTargetTablet(tablet, info);
});

{
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot, 1));
runtime.DispatchEvents(options);
}

TActorId mediatorQueue;
std::vector<std::unique_ptr<IEventHandle>> mediatorQueueSteps;
auto blockMediatorQueueSteps = runtime.AddObserver<TEvMediatorQueueStep>([&](TEvMediatorQueueStep::TPtr& ev) {
mediatorQueue = ev->GetRecipientRewrite();
mediatorQueueSteps.emplace_back(ev.Release());
Cerr << "... blocked TEvMediatorQueueStep for " << mediatorQueue << Endl;
});

std::vector<ui64> observedSteps;
auto stepsObserver = runtime.AddObserver<TEvTxProcessing::TEvPlanStep>([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) {
auto* msg = ev->Get();
observedSteps.push_back(msg->Record.GetStep());
});

auto waitFor = [&](const auto& condition, const TString& description) {
for (int i = 0; i < 5 && !condition(); ++i) {
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
}
UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
};

ui64 txId = 12345678;
if (auto propose = std::make_unique<TEvTxProxy::TEvProposeTransaction>(coordinatorId, txId, 0, Min<ui64>(), Max<ui64>())) {
auto* tx = propose->Record.MutableTransaction();
// Not necessary, but we test volatile transactions here
tx->SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
auto* affected = tx->AddAffectedSet();
affected->SetTabletId(tabletId);
affected->SetFlags(TEvTxProxy::TEvProposeTransaction::AffectedWrite);

runtime.SendToPipe(coordinatorId, sender, propose.release());
}

waitFor([&]{ return mediatorQueueSteps.size() > 0; }, "TEvMediatorQueueStep");
UNIT_ASSERT_VALUES_EQUAL(mediatorQueueSteps.size(), 1u);

// We shouldn't see any steps yet
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 0u);

auto injectMediatorQueueStep = runtime.AddObserver<TEvTabletPipe::TEvClientDestroyed>([&](TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
if (ev->GetRecipientRewrite() == mediatorQueue) {
Cerr << "... found pipe disconnect at " << mediatorQueue << Endl;
// Stop blocking mediator queue steps
// This seems to be safe, since we remove someone else from std::list
blockMediatorQueueSteps.Remove();
// Inject blocked mediator steps into queue mailbox, they will be handled after the disconnect
for (auto& ev : mediatorQueueSteps) {
runtime.Send(ev.release(), 0, true);
}
mediatorQueueSteps.clear();
}
});

Cerr << "... rebooting mediator" << Endl;
RebootTablet(runtime, mediatorId, sender);

waitFor([&]{ return mediatorQueueSteps.empty(); }, "injected mediator steps");

// We must observe the plan step soon
runtime.SimulateSleep(TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 1u);
}

} // Y_UNIT_TEST_SUITE(CoordinatorVolatile)

} // namespace NKikimr::NFlatTxCoordinator::NTest
3 changes: 2 additions & 1 deletion ydb/core/tx/coordinator/mediator_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi

STFUNC(StateSync) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvMediatorQueueStep, Handle);
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvTxCoordinator::TEvCoordinatorSyncResult, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
Expand All @@ -294,8 +295,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrapped<TTxCoordinatorMedi

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvMediatorQueueStep, Handle);
HFunc(TEvTxProcessing::TEvPlanStepAck, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
CFunc(TEvents::TSystem::PoisonPill, Die)
}
Expand Down

0 comments on commit 0c4f274

Please sign in to comment.