From 4393d13a93f65aaeca234494231e528e0b330296 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 30 Oct 2025 11:09:51 +0300 Subject: [PATCH 1/6] [-] pull the offset to the beginning of the zone --- .../pqtablet/partition/partition.cpp | 17 +++++- .../pqtablet/partition/partition_read.cpp | 3 +- ydb/core/persqueue/ut/pq_ut.cpp | 60 +++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 04593cc01906..07d7540793d3 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -1139,6 +1139,10 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con const TPartitionBlobEncoder& TPartition::GetBlobEncoder(ui64 offset) const { + if ((offset >= CompactionBlobEncoder.EndOffset) && (offset < BlobEncoder.StartOffset)) { + offset = BlobEncoder.StartOffset; + } + if (BlobEncoder.DataKeysBody.empty()) { return CompactionBlobEncoder; } @@ -1169,8 +1173,13 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { } const TPartitionBlobEncoder& blobEncoder = GetBlobEncoder(offset); + offset = Max(offset, blobEncoder.StartOffset); const std::deque& container = GetContainer(blobEncoder, offset); - PQ_ENSURE(!container.empty()); + PQ_ENSURE(!container.empty()) + ("offset", offset) + ("cz.StartOffset", CompactionBlobEncoder.StartOffset)("cz.EndOffset", CompactionBlobEncoder.EndOffset) + ("fwz.StartOffset", BlobEncoder.StartOffset)("fwz.EndOffset", BlobEncoder.EndOffset) + ; auto it = std::upper_bound(container.begin(), container.end(), offset, [](const ui64 offset, const TDataKey& p) { @@ -1178,8 +1187,10 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0; }); // Always greater - PQ_ENSURE(it != container.begin())("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset) - ("offset", offset)("containter size", container.size())("first-elem", container.front().Key.ToString()) + PQ_ENSURE(it != container.begin()) + ("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset) + ("offset", offset) + ("containter size", container.size())("first-elem", container.front().Key.ToString()) ("is-fast-write", blobEncoder.ForFastWrite); PQ_ENSURE(it == container.end() || offset < it->Key.GetOffset() || diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 4d468f4d6714..ef7183feb476 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -719,13 +719,14 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx); } -void CollectReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, +void CollectReadRequestFromBody(ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset, TBlobKeyTokens* blobKeyTokens, TPartitionBlobEncoder& zone, TVector& result) { AFL_ENSURE(rcount && rsize); + startOffset = Max(startOffset, zone.StartOffset); auto blobs = zone.GetBlobsFromBody(startOffset, partNo, maxCount, diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 80b8230fa65b..8ba940b2d9e1 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -1291,6 +1291,66 @@ Y_UNIT_TEST(TestWritePQ) { TestWritePQImpl(false); } +Y_UNIT_TEST(Read_From_Different_Zones_What_Was_Written_With_Gaps) +{ + // The test creates messages in different zones. There are gaps in the offsets between the zones. + // We check that the client can read from any offset from any zone. + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function setup, bool& activeZone) { + activeZone = false; + TFinalizer finalizer(tc); + tc.EnableDetailedPQLog = true; + tc.Prepare(dispatchName, setup, activeZone); + tc.Runtime->SetScheduledLimit(100); + + // Important client, lifetimeseconds=0 - never delete + PQTabletPrepare({.partitions = 1}, {{"user", true}}, tc); + + TVector> data; + + data.emplace_back(1, TString(1'000, 'x')); + + // CompactZone.Body + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 100); + ++data[0].first; + data[0].second = TString(7'000'000, 'x'); + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 101); + + CmdRunCompaction(0, tc); + + // CompactZone.Head + ++data[0].first; + data[0].second = TString(1'000, 'x'); + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 200); + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 201); + + CmdRunCompaction(0, tc); + + // FastWriteZone.Body + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 300); + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 301); + + PQGetPartInfo(100, 302, tc); + + CmdRead(0, 102, Max(), Max(), 4, false, tc, {200, 201, 300, 301}); + CmdRead(0, 202, Max(), Max(), 2, false, tc, {300, 301}); + + // The client has committed an offset between the zones + CmdSetOffset(0, "user", 103, false, tc); + PQTabletRestart(tc); + + CmdSetOffset(0, "user", 203, false, tc); + PQTabletRestart(tc); + + CmdRead(0, 102, Max(), Max(), 4, false, tc, {200, 201, 300, 301}); + CmdRead(0, 202, Max(), Max(), 2, false, tc, {300, 301}); + }); +} Y_UNIT_TEST(TestSourceIdDropByUserWrites) { TTestContext tc; From 84fa9b1cf9e878b5b383e6e12769d2f99664a144 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 30 Oct 2025 12:22:35 +0300 Subject: [PATCH 2/6] [+] logging --- ydb/core/persqueue/pqtablet/partition/partition.cpp | 6 ++++++ ydb/core/persqueue/ut/pq_ut.cpp | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 07d7540793d3..e8f2c6495e76 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -712,6 +712,12 @@ void TPartition::InitComplete(const TActorContext& ctx) { for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n"; } + for (const auto& h : CompactionBlobEncoder.DataKeysBody) { + ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; + } + for (const auto& h : CompactionBlobEncoder.HeadKeys) { + ss << "SYNC INIT HEAD KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; + } for (const auto& h : BlobEncoder.DataKeysBody) { ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; } diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 8ba940b2d9e1..e0ec36efd0db 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -1306,7 +1306,7 @@ Y_UNIT_TEST(Read_From_Different_Zones_What_Was_Written_With_Gaps) tc.Runtime->SetScheduledLimit(100); // Important client, lifetimeseconds=0 - never delete - PQTabletPrepare({.partitions = 1}, {{"user", true}}, tc); + PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {{"user", true}}, tc); TVector> data; From 4843644050b4b4a9b6033ea32f2521e88a890dc8 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Thu, 30 Oct 2025 20:51:56 +0300 Subject: [PATCH 3/6] [-] PartNo --- ydb/core/persqueue/pqtablet/partition/partition_read.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index ef7183feb476..54a5ac0d9cf4 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -652,7 +652,10 @@ TReadAnswer TReadInfo::FormAnswer( AddResultBlob(readResult, writeBlob, Offset); if (writeBlob.IsLastPart()) { + PartNo = 0; ++Offset; + } else { + ++PartNo; } if (updateUsage(writeBlob)) { break; From a4214fc25e862d19beb3cbe1596b798105d1fa3a Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 31 Oct 2025 08:13:39 +0300 Subject: [PATCH 4/6] [/] unused code --- ydb/core/persqueue/pqtablet/partition/partition_read.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 54a5ac0d9cf4..51f500fbc468 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -499,15 +499,6 @@ TMaybe TReadInfo::AddBlobsFromBody(const TVector Date: Fri, 31 Oct 2025 13:43:55 +0300 Subject: [PATCH 5/6] [-] read from fwz.Body --- .../pqtablet/partition/partition_read.cpp | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 51f500fbc468..7aa0a8da0c2f 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -654,21 +654,23 @@ TReadAnswer TReadInfo::FormAnswer( } } - readAnswer = AddBlobsFromBody(blobs, - CompactedBlobsCount, blobs.size(), - userInfo, - startOffset, - endOffset, - sizeLag, - tablet, - realReadOffset, - readResult, - answer, - needStop, - cnt, size, lastBlobSize, - ctx); - if (readAnswer) { - return std::move(*readAnswer); + if (!needStop && cnt < Count && size < Size) { // body blobs are fully processed and need to take more data + readAnswer = AddBlobsFromBody(blobs, + CompactedBlobsCount, blobs.size(), + userInfo, + startOffset, + endOffset, + sizeLag, + tablet, + realReadOffset, + readResult, + answer, + needStop, + cnt, size, lastBlobSize, + ctx); + if (readAnswer) { + return std::move(*readAnswer); + } } AFL_ENSURE(Offset <= (ui64)Max())("Offset is too big", Offset); @@ -1082,14 +1084,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u GetReadRequestFromCompactedBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, &info.BlobKeyTokens, blobs); info.CompactedBlobsCount = blobs.size(); - GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, - &info.BlobKeyTokens, blobs); - - info.Blobs = blobs; - ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset(); - - LOG_D("read cookie " << cookie << " added " << info.Blobs.size() - << " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset()); if (blobs.empty() || ((info.CompactedBlobsCount > 0) && (blobs[info.CompactedBlobsCount - 1].Key == CompactionBlobEncoder.DataKeysBody.back().Key))) { // read from head only when all blobs from body processed @@ -1101,6 +1095,15 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u info.CachedOffset = insideHeadOffset; } + GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, + &info.BlobKeyTokens, blobs); + + info.Blobs = blobs; + ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset(); + + LOG_D("read cookie " << cookie << " added " << info.Blobs.size() + << " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset()); + PQ_ENSURE(info.BlobKeyTokens.Size() == info.Blobs.size()); if (info.Destination != 0) { ++userInfo.ActiveReads; From ab2511695559f0b82a6758caaf719dcf6db17577 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Fri, 31 Oct 2025 20:54:29 +0300 Subject: [PATCH 6/6] [-] read from fwz.Body --- ydb/core/persqueue/pqtablet/partition/partition_read.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 7aa0a8da0c2f..a91996652fc7 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -722,7 +722,7 @@ void CollectReadRequestFromBody(ui64 startOffset, const ui16 partNo, const ui32 TVector& result) { AFL_ENSURE(rcount && rsize); - startOffset = Max(startOffset, zone.StartOffset); + startOffset = Max(startOffset, zone.DataKeysBody.empty() ? zone.StartOffset : zone.DataKeysBody.front().Key.GetOffset()); auto blobs = zone.GetBlobsFromBody(startOffset, partNo, maxCount, @@ -876,7 +876,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim LOG_D("read cookie " << cookie << " Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user - << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset() + << " offset " << read->Offset << " partno " << read->PartNo << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset() << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); if (offset == GetEndOffset() && !(read->Timeout == 0 && read->IsInternal())) { // Why? If read timeout = 0 we wait?