From ab3a0a9bcea244f0f4d3d38b30864eb505575c6a Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sat, 25 Oct 2025 12:58:54 +0500 Subject: [PATCH] fixed MaxCommittedTimeLag (#27553) --- ydb/core/persqueue/partition.cpp | 9 +++---- .../persqueue/ut/ut_with_sdk/topic_ut.cpp | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cb5bb8104d47..468e5f904fe8 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -758,9 +758,6 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const { if (userInfo.Offset >= static_cast(EndOffset)) { result.LastCommittedMessage.CreateTimestamp = now; result.LastCommittedMessage.WriteTimestamp = now; - } else if (userInfo.ActualTimestamps) { - result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp; - result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp; } else { auto timestamp = GetWriteTimeEstimate(userInfo.Offset); result.LastCommittedMessage.CreateTimestamp = timestamp; @@ -782,14 +779,14 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const { result.LastReadMessage.WriteTimestamp = userInfo.ReadWriteTimestamp; } else { auto timestamp = GetWriteTimeEstimate(readOffset); - result.LastCommittedMessage.CreateTimestamp = timestamp; - result.LastCommittedMessage.WriteTimestamp = timestamp; + result.LastReadMessage.CreateTimestamp = timestamp; + result.LastReadMessage.WriteTimestamp = timestamp; } if (readOffset < (i64)EndOffset) { result.ReadLag = result.LastReadTimestamp - result.LastReadMessage.WriteTimestamp; } - result.CommitedLag = result.LastCommittedMessage.WriteTimestamp - now; + result.CommitedLag = now - result.LastCommittedMessage.WriteTimestamp; result.TotalLag = TDuration::MilliSeconds(userInfo.GetWriteLagMs()) + result.ReadLag + (now - result.LastReadTimestamp); return result; diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index aadf1ad16276..986fb5e21893 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(WithSDK) { session->Close(TDuration::Seconds(5)); }; - // Check describe for empty topic + Cerr << ">>>>> Check describe for empty topic\n"; { auto d = describe(); UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -75,9 +75,12 @@ Y_UNIT_TEST_SUITE(WithSDK) { } write(3); + Sleep(TDuration::Seconds(2)); write(7); + Sleep(TDuration::Seconds(2)); + write(11); - // Check describe for topic which contains messages, but consumer hasn`t read + Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n"; { auto d = describe(); UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -85,20 +88,20 @@ Y_UNIT_TEST_SUITE(WithSDK) { auto& p = d.GetPartitions()[0]; UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); - UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset()); auto& c = p.GetPartitionConsumerStats(); UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); UNIT_ASSERT_VALUES_EQUAL(0, c->GetCommittedOffset()); UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); // UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); - UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag()); UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero? UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset()); } UNIT_ASSERT(setup.Commit(TString{TEST_TOPIC}, TEST_CONSUMER, 0, 1).IsSuccess()); - // Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example) + Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n"; { auto d = describe(); UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -106,13 +109,13 @@ Y_UNIT_TEST_SUITE(WithSDK) { auto& p = d.GetPartitions()[0]; UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); - UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset()); auto& c = p.GetPartitionConsumerStats(); UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset()); UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); - UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag()); UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero? UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset()); } @@ -143,7 +146,7 @@ Y_UNIT_TEST_SUITE(WithSDK) { session->Close(TDuration::Seconds(1)); } - // Check describe for topic wich contains messages, has commited offset of first message and read second message + Cerr << ">>>>> Check describe for topic wich contains messages, has commited offset of first message and read second message\n"; { auto d = describe(); UNIT_ASSERT_STRINGS_EQUAL(TEST_CONSUMER, d.GetConsumer().GetConsumerName()); @@ -151,15 +154,15 @@ Y_UNIT_TEST_SUITE(WithSDK) { auto& p = d.GetPartitions()[0]; UNIT_ASSERT_VALUES_EQUAL(0, p.GetPartitionId()); UNIT_ASSERT_VALUES_EQUAL(true, p.GetActive()); - UNIT_ASSERT_VALUES_EQUAL(2, p.GetPartitionStats()->GetEndOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, p.GetPartitionStats()->GetEndOffset()); auto& c = p.GetPartitionConsumerStats(); UNIT_ASSERT_VALUES_EQUAL(true, c.has_value()); UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset()); - UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); + //UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag()); UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag()); - UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxCommittedTimeLag()); + UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag()); UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); - UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset()); } } }