From 39e5acaba482dd64439a2c60b700e567aee758fd Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 16:24:02 +0000 Subject: [PATCH 1/6] fix --- .../pqtablet/partition/mlp/mlp_storage.cpp | 4 + .../pqtablet/partition/mlp/mlp_storage.h | 1 + .../mlp/mlp_storage__serialization.cpp | 1 + .../pqtablet/partition/mlp/mlp_storage_ut.cpp | 97 +++++++++---------- 4 files changed, 50 insertions(+), 53 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp index 0bba41e1d6e3..53ebefa61dce 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp @@ -333,6 +333,10 @@ const std::deque& TStorage::GetDLQMessages() const { return DLQQueue; } +const std::unordered_set TStorage::GetLockedMessageGroupsId() const { + return LockedMessageGroupsId; +} + std::pair TStorage::GetMessageInt(ui64 offset, EMessageStatus expectedStatus) { auto [message, slowZone] = GetMessageInt(offset); if (!message) { diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h index 7042dfceb1ce..709fb4935525 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h @@ -151,6 +151,7 @@ class TStorage { TInstant GetMessageDeadline(ui64 message); std::pair GetMessage(ui64 message); const std::deque& GetDLQMessages() const; + const std::unordered_set GetLockedMessageGroupsId() const; struct TPosition { diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp index c1b86964e61c..fd9a34feca6b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp @@ -310,6 +310,7 @@ bool TStorage::Initialize(const NKikimrPQ::TMLPStorageSnapshot& snapshot) { ++Metrics.UnprocessedMessageCount; break; case EMessageStatus::DLQ: + ++Metrics.DLQMessageCount; break; } } diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index d08388e9d10b..0bef176a822b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -144,11 +144,12 @@ struct TUtils { UNIT_ASSERT(i == other.Storage.end()); UNIT_ASSERT(m == Storage.end()); - auto join = [](const std::deque vs) { + auto join = [](const auto& vs) { return JoinRange(",", vs.begin(), vs.end()); }; UNIT_ASSERT_VALUES_EQUAL(join(other.Storage.GetDLQMessages()), join(Storage.GetDLQMessages())); + UNIT_ASSERT_VALUES_EQUAL(join(other.Storage.GetLockedMessageGroupsId()), join(Storage.GetLockedMessageGroupsId())); auto ometrics = other.Storage.GetMetrics(); auto metrics = Storage.GetMetrics(); @@ -1542,6 +1543,22 @@ Y_UNIT_TEST(MoveBaseDeadline) { } } +auto assertDeserialization = [](TUtils& original, auto& firstSnapshot, auto& wal, auto& endSnapshot) { + { + TUtils utils; + utils.LoadSnapshot(firstSnapshot); + utils.LoadWAL(wal); + + utils.AssertEquals(original); + } + { + TUtils utils; + utils.LoadSnapshot(endSnapshot); + + utils.AssertEquals(original); + } +}; + Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { TUtils utils; utils.AddMessage(6); @@ -1557,11 +1574,9 @@ Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { @@ -1580,11 +1595,9 @@ Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveCommittedToSlowZone) { @@ -1598,11 +1611,9 @@ Y_UNIT_TEST(SlowZone_MoveCommittedToSlowZone) { // Committed message isn't moved to SlowZone utils.AssertSlowZone({ }); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { @@ -1622,11 +1633,9 @@ Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { @@ -1645,11 +1654,9 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndCommit) { @@ -1662,11 +1669,9 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndCommit) { utils.AssertSlowZone({ }); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { @@ -1686,11 +1691,9 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_Lock) { @@ -1708,11 +1711,9 @@ Y_UNIT_TEST(SlowZone_Lock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_Commit_First) { @@ -1724,11 +1725,9 @@ Y_UNIT_TEST(SlowZone_Commit_First) { utils.AssertSlowZone({1}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_Commit) { @@ -1740,11 +1739,9 @@ Y_UNIT_TEST(SlowZone_Commit) { utils.AssertSlowZone({0}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_DLQ) { @@ -1763,11 +1760,9 @@ Y_UNIT_TEST(SlowZone_DLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_CommitToFast) { @@ -1782,11 +1777,9 @@ Y_UNIT_TEST(SlowZone_CommitToFast) { // Compaction removed the message with offset 2 UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 7); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } Y_UNIT_TEST(SlowZone_CommitAndAdd) { @@ -1799,11 +1792,9 @@ Y_UNIT_TEST(SlowZone_CommitAndAdd) { utils.AssertSlowZone({0, 2}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + auto snapshot2 = utils.CreateSnapshot(); - utilsD.AssertEquals(utils); + assertDeserialization(utils, snapshot, wal, snapshot2); } } From 9b6344370d1981113344fe56a648cf1c9f1f67cb Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 16:59:28 +0000 Subject: [PATCH 2/6] fix --- .../pqtablet/partition/mlp/mlp_storage_ut.cpp | 150 ++++++++---------- 1 file changed, 68 insertions(+), 82 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index 0bef176a822b..13d7105227fd 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -42,6 +42,10 @@ struct TUtils { TInstant BaseWriteTimestamp; ui64 Offset = 0; + NKikimrPQ::TMLPStorageSnapshot BeginSnapshot; + NKikimrPQ::TMLPStorageSnapshot EndSnapshot; + NKikimrPQ::TMLPStorageWAL WAL; + void AddMessage(size_t count) { for (size_t i = 0; i < count; ++i) { Storage.AddMessage(Offset, true, Offset, BaseWriteTimestamp + TDuration::Seconds(Offset)); @@ -49,6 +53,31 @@ struct TUtils { } } + void Begin() { + BeginSnapshot = CreateSnapshot(); + } + + void End() { + WAL = CreateWAL(); + EndSnapshot = CreateSnapshot(); + } + + void AssertLoad() { + { + TUtils utils; + utils.LoadSnapshot(BeginSnapshot); + utils.LoadWAL(WAL); + + utils.AssertEquals(*this); + } + { + TUtils utils; + utils.LoadSnapshot(EndSnapshot); + + utils.AssertEquals(*this); + } + } + NKikimrPQ::TMLPStorageSnapshot CreateSnapshot() { // Clear batch auto batch = Storage.GetBatch(); @@ -1543,28 +1572,12 @@ Y_UNIT_TEST(MoveBaseDeadline) { } } -auto assertDeserialization = [](TUtils& original, auto& firstSnapshot, auto& wal, auto& endSnapshot) { - { - TUtils utils; - utils.LoadSnapshot(firstSnapshot); - utils.LoadWAL(wal); - - utils.AssertEquals(original); - } - { - TUtils utils; - utils.LoadSnapshot(endSnapshot); - - utils.AssertEquals(original); - } -}; - Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1574,18 +1587,16 @@ Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { TUtils utils; utils.AddMessage(6); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1595,25 +1606,21 @@ Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveCommittedToSlowZone) { TUtils utils; utils.AddMessage(6); UNIT_ASSERT(utils.Commit(0)); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); // Committed message isn't moved to SlowZone utils.AssertSlowZone({ }); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { @@ -1621,9 +1628,9 @@ Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { utils.AddMessage(6); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1633,18 +1640,16 @@ Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1654,34 +1659,30 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndCommit) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT(utils.Commit(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ }); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1691,17 +1692,15 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_Lock) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0, 1 }); auto message = utils.GetMessage(0); @@ -1711,46 +1710,39 @@ Y_UNIT_TEST(SlowZone_Lock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_Commit_First) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({1}); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); -} + utils.AssertLoad();} Y_UNIT_TEST(SlowZone_Commit) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(1)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0}); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_DLQ) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0, 1 }); auto message = utils.GetMessage(0); @@ -1760,41 +1752,35 @@ Y_UNIT_TEST(SlowZone_DLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_CommitToFast) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(2)); utils.Storage.Compact(); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0, 1}); // Compaction removed the message with offset 2 UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 7); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_CommitAndAdd) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(1)); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0, 2}); - auto snapshot2 = utils.CreateSnapshot(); - - assertDeserialization(utils, snapshot, wal, snapshot2); + utils.AssertLoad(); } } From d6ecd9335381996d1627c16723c5d678b64ea55e Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 17:26:24 +0000 Subject: [PATCH 3/6] more tests --- .../pqtablet/partition/mlp/mlp_storage_ut.cpp | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index 13d7105227fd..2a3a881a86e9 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -33,7 +33,7 @@ struct TUtils { { Storage.SetKeepMessageOrder(true); Storage.SetMaxMessageProcessingCount(1); - Storage.SetRetentionPeriod(TDuration::Seconds(7 * 13)); + Storage.SetRetentionPeriod(TDuration::Seconds(10)); } TIntrusivePtr TimeProvider; @@ -1783,6 +1783,54 @@ Y_UNIT_TEST(SlowZone_CommitAndAdd) { utils.AssertLoad(); } +Y_UNIT_TEST(SlowZone_Retention_1message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(3)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({1}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + + utils.AssertLoad(); +} + +Y_UNIT_TEST(SlowZone_Retention_2message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(4)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + + utils.AssertLoad(); +} + +Y_UNIT_TEST(SlowZone_Retention_3message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(5)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 3); + + utils.AssertLoad(); +} + } } // namespace NKikimr::NPQ::NMLP From e394529ba39c3fdbd2b1d1795203bb2fbd35c05f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 17:34:05 +0000 Subject: [PATCH 4/6] more tests --- ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index 2a3a881a86e9..1c44964bbd68 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -1795,6 +1795,7 @@ Y_UNIT_TEST(SlowZone_Retention_1message) { utils.AssertSlowZone({1}); UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 7); utils.AssertLoad(); } @@ -1811,6 +1812,7 @@ Y_UNIT_TEST(SlowZone_Retention_2message) { utils.AssertSlowZone({}); UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 6); utils.AssertLoad(); } @@ -1827,6 +1829,7 @@ Y_UNIT_TEST(SlowZone_Retention_3message) { utils.AssertSlowZone({}); UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 3); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 5); utils.AssertLoad(); } From 759bbfcf350d878cd1364e5e48ae5cb128ad6014 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 17:45:59 +0000 Subject: [PATCH 5/6] fix --- ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h index 709fb4935525..d0422b70488b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h @@ -151,7 +151,7 @@ class TStorage { TInstant GetMessageDeadline(ui64 message); std::pair GetMessage(ui64 message); const std::deque& GetDLQMessages() const; - const std::unordered_set GetLockedMessageGroupsId() const; + const std::unordered_set& GetLockedMessageGroupsId() const; struct TPosition { From 18d0ba654ffb3d84e551d6aeeb228f1acd1945f8 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 7 Nov 2025 17:47:46 +0000 Subject: [PATCH 6/6] fix --- ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp | 2 +- ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp index 53ebefa61dce..e080086837a3 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp @@ -333,7 +333,7 @@ const std::deque& TStorage::GetDLQMessages() const { return DLQQueue; } -const std::unordered_set TStorage::GetLockedMessageGroupsId() const { +const std::unordered_set& TStorage::GetLockedMessageGroupsId() const { return LockedMessageGroupsId; } diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index 1c44964bbd68..5a60ad72ccf1 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -1722,7 +1722,8 @@ Y_UNIT_TEST(SlowZone_Commit_First) { utils.AssertSlowZone({1}); - utils.AssertLoad();} + utils.AssertLoad(); +} Y_UNIT_TEST(SlowZone_Commit) { TUtils utils;