diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp index 0bba41e1d6e3..e080086837a3 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp @@ -333,6 +333,10 @@ const std::deque& TStorage::GetDLQMessages() const { return DLQQueue; } +const std::unordered_set& TStorage::GetLockedMessageGroupsId() const { + return LockedMessageGroupsId; +} + std::pair TStorage::GetMessageInt(ui64 offset, EMessageStatus expectedStatus) { auto [message, slowZone] = GetMessageInt(offset); if (!message) { diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h index 7042dfceb1ce..d0422b70488b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h @@ -151,6 +151,7 @@ class TStorage { TInstant GetMessageDeadline(ui64 message); std::pair GetMessage(ui64 message); const std::deque& GetDLQMessages() const; + const std::unordered_set& GetLockedMessageGroupsId() const; struct TPosition { diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp index c1b86964e61c..fd9a34feca6b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage__serialization.cpp @@ -310,6 +310,7 @@ bool TStorage::Initialize(const NKikimrPQ::TMLPStorageSnapshot& snapshot) { ++Metrics.UnprocessedMessageCount; break; case EMessageStatus::DLQ: + ++Metrics.DLQMessageCount; break; } } diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index d08388e9d10b..5a60ad72ccf1 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -33,7 +33,7 @@ struct TUtils { { Storage.SetKeepMessageOrder(true); Storage.SetMaxMessageProcessingCount(1); - Storage.SetRetentionPeriod(TDuration::Seconds(7 * 13)); + Storage.SetRetentionPeriod(TDuration::Seconds(10)); } TIntrusivePtr TimeProvider; @@ -42,6 +42,10 @@ struct TUtils { TInstant BaseWriteTimestamp; ui64 Offset = 0; + NKikimrPQ::TMLPStorageSnapshot BeginSnapshot; + NKikimrPQ::TMLPStorageSnapshot EndSnapshot; + NKikimrPQ::TMLPStorageWAL WAL; + void AddMessage(size_t count) { for (size_t i = 0; i < count; ++i) { Storage.AddMessage(Offset, true, Offset, BaseWriteTimestamp + TDuration::Seconds(Offset)); @@ -49,6 +53,31 @@ struct TUtils { } } + void Begin() { + BeginSnapshot = CreateSnapshot(); + } + + void End() { + WAL = CreateWAL(); + EndSnapshot = CreateSnapshot(); + } + + void AssertLoad() { + { + TUtils utils; + utils.LoadSnapshot(BeginSnapshot); + utils.LoadWAL(WAL); + + utils.AssertEquals(*this); + } + { + TUtils utils; + utils.LoadSnapshot(EndSnapshot); + + utils.AssertEquals(*this); + } + } + NKikimrPQ::TMLPStorageSnapshot CreateSnapshot() { // Clear batch auto batch = Storage.GetBatch(); @@ -144,11 +173,12 @@ struct TUtils { UNIT_ASSERT(i == other.Storage.end()); UNIT_ASSERT(m == Storage.end()); - auto join = [](const std::deque vs) { + auto join = [](const auto& vs) { return JoinRange(",", vs.begin(), vs.end()); }; UNIT_ASSERT_VALUES_EQUAL(join(other.Storage.GetDLQMessages()), join(Storage.GetDLQMessages())); + UNIT_ASSERT_VALUES_EQUAL(join(other.Storage.GetLockedMessageGroupsId()), join(Storage.GetLockedMessageGroupsId())); auto ometrics = other.Storage.GetMetrics(); auto metrics = Storage.GetMetrics(); @@ -1545,9 +1575,9 @@ Y_UNIT_TEST(MoveBaseDeadline) { Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1557,20 +1587,16 @@ Y_UNIT_TEST(SlowZone_MoveUnprocessedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { TUtils utils; utils.AddMessage(6); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1580,29 +1606,21 @@ Y_UNIT_TEST(SlowZone_MoveLockedToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveCommittedToSlowZone) { TUtils utils; utils.AddMessage(6); UNIT_ASSERT(utils.Commit(0)); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); // Committed message isn't moved to SlowZone utils.AssertSlowZone({ }); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { @@ -1610,9 +1628,9 @@ Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { utils.AddMessage(6); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1622,20 +1640,16 @@ Y_UNIT_TEST(SlowZone_MoveDLQToSlowZone) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1645,38 +1659,30 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndLock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndCommit) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT(utils.Commit(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ }); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { TUtils utils; utils.AddMessage(6); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); utils.AddMessage(1); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0 }); auto message = utils.GetMessage(0); @@ -1686,19 +1692,15 @@ Y_UNIT_TEST(SlowZone_MoveToSlowZoneAndDLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_Lock) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0, 1 }); auto message = utils.GetMessage(0); @@ -1708,52 +1710,40 @@ Y_UNIT_TEST(SlowZone_Lock) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, utils.TimeProvider->Now() + TDuration::Seconds(13)); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_Commit_First) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({1}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_Commit) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(1)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_DLQ) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT_VALUES_EQUAL(utils.Next(TDuration::Seconds(13)), 0); UNIT_ASSERT(utils.Unlock(0)); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({ 0, 1 }); auto message = utils.GetMessage(0); @@ -1763,47 +1753,86 @@ Y_UNIT_TEST(SlowZone_DLQ) { UNIT_ASSERT_VALUES_EQUAL(message->ProcessingDeadline, TInstant::Zero()); UNIT_ASSERT_VALUES_EQUAL(message->WriteTimestamp, utils.BaseWriteTimestamp); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_CommitToFast) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(2)); utils.Storage.Compact(); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0, 1}); // Compaction removed the message with offset 2 UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 7); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); - - utilsD.AssertEquals(utils); + utils.AssertLoad(); } Y_UNIT_TEST(SlowZone_CommitAndAdd) { TUtils utils; utils.AddMessage(8); - auto snapshot = utils.CreateSnapshot(); + utils.Begin(); UNIT_ASSERT(utils.Commit(1)); utils.AddMessage(1); - auto wal = utils.CreateWAL(); + utils.End(); utils.AssertSlowZone({0, 2}); - TUtils utilsD; - utilsD.LoadSnapshot(snapshot); - utilsD.LoadWAL(wal); + utils.AssertLoad(); +} + +Y_UNIT_TEST(SlowZone_Retention_1message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(3)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({1}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 7); + + utils.AssertLoad(); +} + +Y_UNIT_TEST(SlowZone_Retention_2message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(4)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 2); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 6); + + utils.AssertLoad(); +} + +Y_UNIT_TEST(SlowZone_Retention_3message) { + TUtils utils; + utils.AddMessage(8); + utils.Begin(); + + utils.TimeProvider->Tick(TDuration::Seconds(5)); + utils.Storage.Compact(); + + utils.End(); + + utils.AssertSlowZone({}); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetFirstOffset(), 3); + UNIT_ASSERT_VALUES_EQUAL(utils.Storage.GetMetrics().InflyMessageCount, 5); - utilsD.AssertEquals(utils); + utils.AssertLoad(); } }