diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 04593cc01906..e8f2c6495e76 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -712,6 +712,12 @@ void TPartition::InitComplete(const TActorContext& ctx) { for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) { ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n"; } + for (const auto& h : CompactionBlobEncoder.DataKeysBody) { + ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; + } + for (const auto& h : CompactionBlobEncoder.HeadKeys) { + ss << "SYNC INIT HEAD KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; + } for (const auto& h : BlobEncoder.DataKeysBody) { ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n"; } @@ -1139,6 +1145,10 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con const TPartitionBlobEncoder& TPartition::GetBlobEncoder(ui64 offset) const { + if ((offset >= CompactionBlobEncoder.EndOffset) && (offset < BlobEncoder.StartOffset)) { + offset = BlobEncoder.StartOffset; + } + if (BlobEncoder.DataKeysBody.empty()) { return CompactionBlobEncoder; } @@ -1169,8 +1179,13 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { } const TPartitionBlobEncoder& blobEncoder = GetBlobEncoder(offset); + offset = Max(offset, blobEncoder.StartOffset); const std::deque& container = GetContainer(blobEncoder, offset); - PQ_ENSURE(!container.empty()); + PQ_ENSURE(!container.empty()) + ("offset", offset) + ("cz.StartOffset", CompactionBlobEncoder.StartOffset)("cz.EndOffset", CompactionBlobEncoder.EndOffset) + ("fwz.StartOffset", BlobEncoder.StartOffset)("fwz.EndOffset", BlobEncoder.EndOffset) + ; auto it = std::upper_bound(container.begin(), container.end(), offset, [](const ui64 offset, const TDataKey& p) { @@ -1178,8 +1193,10 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0; }); // Always greater - PQ_ENSURE(it != container.begin())("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset) - ("offset", offset)("containter size", container.size())("first-elem", container.front().Key.ToString()) + PQ_ENSURE(it != container.begin()) + ("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset) + ("offset", offset) + ("containter size", container.size())("first-elem", container.front().Key.ToString()) ("is-fast-write", blobEncoder.ForFastWrite); PQ_ENSURE(it == container.end() || offset < it->Key.GetOffset() || diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 4d468f4d6714..a91996652fc7 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -499,15 +499,6 @@ TMaybe TReadInfo::AddBlobsFromBody(const TVector())("Offset is too big", Offset); @@ -719,13 +715,14 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx); } -void CollectReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, +void CollectReadRequestFromBody(ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset, TBlobKeyTokens* blobKeyTokens, TPartitionBlobEncoder& zone, TVector& result) { AFL_ENSURE(rcount && rsize); + startOffset = Max(startOffset, zone.DataKeysBody.empty() ? zone.StartOffset : zone.DataKeysBody.front().Key.GetOffset()); auto blobs = zone.GetBlobsFromBody(startOffset, partNo, maxCount, @@ -879,7 +876,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim LOG_D("read cookie " << cookie << " Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user - << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset() + << " offset " << read->Offset << " partno " << read->PartNo << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset() << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); if (offset == GetEndOffset() && !(read->Timeout == 0 && read->IsInternal())) { // Why? If read timeout = 0 we wait? @@ -1087,14 +1084,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u GetReadRequestFromCompactedBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, &info.BlobKeyTokens, blobs); info.CompactedBlobsCount = blobs.size(); - GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, - &info.BlobKeyTokens, blobs); - - info.Blobs = blobs; - ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset(); - - LOG_D("read cookie " << cookie << " added " << info.Blobs.size() - << " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset()); if (blobs.empty() || ((info.CompactedBlobsCount > 0) && (blobs[info.CompactedBlobsCount - 1].Key == CompactionBlobEncoder.DataKeysBody.back().Key))) { // read from head only when all blobs from body processed @@ -1106,6 +1095,15 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u info.CachedOffset = insideHeadOffset; } + GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset, + &info.BlobKeyTokens, blobs); + + info.Blobs = blobs; + ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset(); + + LOG_D("read cookie " << cookie << " added " << info.Blobs.size() + << " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset()); + PQ_ENSURE(info.BlobKeyTokens.Size() == info.Blobs.size()); if (info.Destination != 0) { ++userInfo.ActiveReads; diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 80b8230fa65b..e0ec36efd0db 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -1291,6 +1291,66 @@ Y_UNIT_TEST(TestWritePQ) { TestWritePQImpl(false); } +Y_UNIT_TEST(Read_From_Different_Zones_What_Was_Written_With_Gaps) +{ + // The test creates messages in different zones. There are gaps in the offsets between the zones. + // We check that the client can read from any offset from any zone. + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function setup, bool& activeZone) { + activeZone = false; + TFinalizer finalizer(tc); + tc.EnableDetailedPQLog = true; + tc.Prepare(dispatchName, setup, activeZone); + tc.Runtime->SetScheduledLimit(100); + + // Important client, lifetimeseconds=0 - never delete + PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {{"user", true}}, tc); + + TVector> data; + + data.emplace_back(1, TString(1'000, 'x')); + + // CompactZone.Body + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 100); + ++data[0].first; + data[0].second = TString(7'000'000, 'x'); + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 101); + + CmdRunCompaction(0, tc); + + // CompactZone.Head + ++data[0].first; + data[0].second = TString(1'000, 'x'); + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 200); + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 201); + + CmdRunCompaction(0, tc); + + // FastWriteZone.Body + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 300); + ++data[0].first; + CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 301); + + PQGetPartInfo(100, 302, tc); + + CmdRead(0, 102, Max(), Max(), 4, false, tc, {200, 201, 300, 301}); + CmdRead(0, 202, Max(), Max(), 2, false, tc, {300, 301}); + + // The client has committed an offset between the zones + CmdSetOffset(0, "user", 103, false, tc); + PQTabletRestart(tc); + + CmdSetOffset(0, "user", 203, false, tc); + PQTabletRestart(tc); + + CmdRead(0, 102, Max(), Max(), 4, false, tc, {200, 201, 300, 301}); + CmdRead(0, 202, Max(), Max(), 2, false, tc, {300, 301}); + }); +} Y_UNIT_TEST(TestSourceIdDropByUserWrites) { TTestContext tc;