Skip to content

Commit

Permalink
Fix partial distributed commit of uncommitted changes during shard re…
Browse files Browse the repository at this point in the history
…start race (#2148)
  • Loading branch information
snaury committed Feb 21, 2024
1 parent b3705e4 commit d32315a
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 30 deletions.
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/datashard_dep_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,13 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera

if (lock) {
lock->SetLastOpId(op->GetTxId());
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) {
if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent() && !lock->IsFrozen()) {
// This lock was cached before, and since we know
// it's persistent, we know it was also frozen
// during that lock caching. Restore the frozen
// flag for this lock.
// Note: this code path is only for older shards
// which didn't persist the frozen flag.
lock->SetFrozen();
}
}
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,17 @@ namespace NKqpHelpers {
if (result.result_sets_size() == 0) {
return "<empty>";
}
return FormatResult(result.result_sets(0));
if (result.result_sets_size() == 1) {
return FormatResult(result.result_sets(0));
}
TStringBuilder sb;
for (int i = 0; i < result.result_sets_size(); ++i) {
if (i != 0) {
sb << "\n";
}
sb << FormatResult(result.result_sets(i));
}
return sb;
}

inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {
Expand Down
165 changes: 165 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/tx/tx_proxy/upload_rows.h>

#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
#include <ydb/core/mind/local.h>

#include <ydb/library/yql/minikql/mkql_node_printer.h>

Expand Down Expand Up @@ -3823,6 +3824,170 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration);
}

Y_UNIT_TEST(UncommittedWriteRestartDuringCommit) {
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);

TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetAppConfig(app)
// Bug was with non-volatile transactions
.SetEnableDataShardVolatileTransactions(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;
CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);

// Insert some initial data
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);"));

const auto shards1 = GetTableShards(server, sender, "/Root/table-1");

TString sessionId, txId;

// Start inserting a row into table-1
Cerr << "... sending initial upsert" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
)")),
"<empty>");

// We want to block readsets
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start committing an additional read/write
// Note: select on table-1 flushes accumulated changes
// Note: select on table-2 ensures we have an outgoing readset
Cerr << "... sending commit request" << Endl;
auto commitFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
SELECT key, value FROM `/Root/table-1`
UNION ALL
SELECT key, value FROM `/Root/table-2`
ORDER BY key;
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
)"), sessionId, txId, true /* commitTx */));

WaitFor(runtime, [&]{ return readSets.size() >= 2; }, "readset exchange");
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);

// We want to block local boot to make sure it stays down during rollback
std::vector<std::unique_ptr<IEventHandle>> blockedLocalBoot;
auto blockLocalBoot = runtime.AddObserver<TEvLocal::TEvBootTablet>([&](TEvLocal::TEvBootTablet::TPtr& ev) {
Cerr << "... blocking TEvLocal::TEvBootTablet" << Endl;
blockedLocalBoot.emplace_back(std::move(ev.Release()));
});

// Kill current datashard actor with TEvPoison (so it doesn't have a chance to reply)
Cerr << "... sending TEvPoison to " << shards1.at(0) << Endl;
ForwardToTablet(runtime, shards1.at(0), sender, new TEvents::TEvPoison);

// Wait until hive tries to boot a new instance (old instance is dead by that point)
WaitFor(runtime, [&]{ return blockedLocalBoot.size() > 0; }, "blocked local boot", 3);

// Stop blocking and resend readsets
blockReadSets.Remove();
Cerr << "... resending readsets" << Endl;
for (auto& ev : readSets) {
runtime.Send(ev.release(), 0, true);
}
readSets.clear();

// Wait until commit fails with UNDETERMINED
Cerr << "... waiting for commit result" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(commitFuture))),
"ERROR: UNDETERMINED");

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// We want to detect a restarting datashard and block its progress queue
TActorId shard1actor;
std::vector<std::unique_ptr<IEventHandle>> blockedProgress;
auto blockProgressQueue = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvTablet::TEvBoot::EventType: {
auto* msg = ev->Get<TEvTablet::TEvBoot>();
Cerr << "... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite() << Endl;
if (msg->TabletID == shards1.at(0)) {
shard1actor = ev->GetRecipientRewrite();
}
break;
}
case EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */: {
if (shard1actor && ev->GetRecipientRewrite() == shard1actor) {
Cerr << "... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite() << Endl;
blockedProgress.emplace_back(ev.Release());
return;
}
break;
}
}
});

// Unblock local boot
blockLocalBoot.Remove();
Cerr << "... unblocking local boot" << Endl;
for (auto& ev : blockedLocalBoot) {
runtime.Send(ev.release(), 0, true);
}
blockedLocalBoot.clear();

// Wait until a new instance starts and is blocked at progress queue handling
WaitFor(runtime, [&]{ return blockedProgress.size() > 0; }, "blocked progress", 10);

// Sleep a little to make sure datashard subscribes to lock and handles the response
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Unblock progress queue and resend blocked messages
Cerr << "... resending progress queue" << Endl;
blockProgressQueue.Remove();
for (auto& ev : blockedProgress) {
runtime.Send(ev.release(), 0, true);
}
blockedProgress.clear();

// Sleep a little to make sure everything settles
Cerr << "... sleeping for 1 second" << Endl;
runtime.SimulateSleep(TDuration::Seconds(1));

// Now make a read query, we must not observe partial commit
Cerr << "... checking final table state" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, Q_(R"(
SELECT key, value FROM `/Root/table-1`
UNION ALL
SELECT key, value FROM `/Root/table-2`
ORDER BY key;
)")),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
}

}

} // namespace NKikimr
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "datashard_locks_db.h"

namespace NKikimr {
namespace NDataShard {
Expand Down Expand Up @@ -62,7 +63,8 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
ui64 lockId = pr.first;
auto lock = DataShard.SysLocksTable().GetRawLock(lockId, TRowVersion::Min());
if (lock && lock->IsPersistent()) {
lock->SetFrozen();
TDataShardLocksDb locksDb(DataShard, txc);
lock->SetFrozen(&locksDb);
}
}
tx->MarkLocksStored();
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/store_and_send_write_out_rs_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "datashard_locks_db.h"

namespace NKikimr {
namespace NDataShard {
Expand Down Expand Up @@ -62,7 +63,8 @@ EExecutionStatus TStoreAndSendWriteOutRSUnit::Execute(TOperation::TPtr op,
ui64 lockId = pr.first;
auto lock = DataShard.SysLocksTable().GetRawLock(lockId, TRowVersion::Min());
if (lock && lock->IsPersistent()) {
lock->SetFrozen();
TDataShardLocksDb locksDb(DataShard, txc);
lock->SetFrozen(&locksDb);
}
}
writeOp->MarkLocksStored();
Expand Down
49 changes: 29 additions & 20 deletions ydb/core/tx/locks/locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
, CreationTime(TAppData::TimeProvider->Now())
{}

TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs)
TLockInfo::TLockInfo(TLockLocker * locker, const ILocksDb::TLockRow& row)
: Locker(locker)
, LockId(lockId)
, LockNodeId(lockNodeId)
, Generation(generation)
, Counter(counter)
, CreationTime(createTs)
, LockId(row.LockId)
, LockNodeId(row.LockNodeId)
, Generation(row.Generation)
, Counter(row.Counter)
, CreationTime(TInstant::MicroSeconds(row.CreateTs))
, Flags(ELockFlags(row.Flags))
, Persistent(true)
{
if (counter == Max<ui64>()) {
if (Counter == Max<ui64>()) {
BreakVersion.emplace(TRowVersion::Min());
}
}
Expand Down Expand Up @@ -144,7 +145,7 @@ void TLockInfo::OnRemoved() {
void TLockInfo::PersistLock(ILocksDb* db) {
Y_ABORT_UNLESS(!IsPersistent());
Y_ABORT_UNLESS(db, "Cannot persist lock without a db");
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds());
db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds(), ui64(Flags));
Persistent = true;

PersistRanges(db);
Expand Down Expand Up @@ -297,11 +298,11 @@ void TLockInfo::CleanupConflicts() {
}
}

void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) {
void TLockInfo::RestorePersistentRange(const ILocksDb::TLockRange& rangeRow) {
auto& range = PersistentRanges.emplace_back();
range.Id = rangeId;
range.TableId = tableId;
range.Flags = flags;
range.Id = rangeRow.RangeId;
range.TableId = rangeRow.TableId;
range.Flags = ELockRangeFlags(rangeRow.Flags);

if (!!(range.Flags & ELockRangeFlags::Read)) {
if (ReadTables.insert(range.TableId).second) {
Expand Down Expand Up @@ -333,6 +334,14 @@ void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) {
VolatileDependencies.insert(txId);
}

void TLockInfo::SetFrozen(ILocksDb* db) {
Y_ABORT_UNLESS(IsPersistent());
Flags |= ELockFlags::Frozen;
if (db) {
db->PersistLockFlags(LockId, ui64(Flags));
}
}

// TTableLocks

void TTableLocks::AddShardLock(TLockInfo* lock) {
Expand Down Expand Up @@ -549,14 +558,14 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
return lock;
}

TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
Y_ABORT_UNLESS(Locks.find(lockId) == Locks.end());
TLockInfo::TPtr TLockLocker::AddLock(const ILocksDb::TLockRow& row) {
Y_ABORT_UNLESS(Locks.find(row.LockId) == Locks.end());

TLockInfo::TPtr lock(new TLockInfo(this, lockId, lockNodeId, generation, counter, createTs));
TLockInfo::TPtr lock(new TLockInfo(this, row));
Y_ABORT_UNLESS(lock->IsPersistent());
Locks[lockId] = lock;
if (lockNodeId) {
PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
Locks[row.LockId] = lock;
if (row.LockNodeId) {
PendingSubscribeLocks.emplace_back(row.LockId, row.LockNodeId);
}
return lock;
}
Expand Down Expand Up @@ -1158,9 +1167,9 @@ bool TSysLocks::Load(ILocksDb& db) {
Locker.Clear();

for (auto& lockRow : rows) {
TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs));
TLockInfo::TPtr lock = Locker.AddLock(lockRow);
for (auto& rangeRow : lockRow.Ranges) {
lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags));
lock->RestorePersistentRange(rangeRow);
}
}

Expand Down
Loading

0 comments on commit d32315a

Please sign in to comment.