Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,12 @@ void TPartition::InitComplete(const TActorContext& ctx) {
for (const auto& s : SourceIdStorage.GetInMemorySourceIds()) {
ss << "SYNC INIT sourceId " << s.first << " seqNo " << s.second.SeqNo << " offset " << s.second.Offset << "\n";
}
for (const auto& h : CompactionBlobEncoder.DataKeysBody) {
ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
}
for (const auto& h : CompactionBlobEncoder.HeadKeys) {
ss << "SYNC INIT HEAD KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
}
for (const auto& h : BlobEncoder.DataKeysBody) {
ss << "SYNC INIT DATA KEY: " << h.Key.ToString() << " size " << h.Size << "\n";
}
Expand Down Expand Up @@ -1123,6 +1129,10 @@ void TPartition::LogAndCollectError(NKikimrServices::EServiceKikimr service, con

const TPartitionBlobEncoder& TPartition::GetBlobEncoder(ui64 offset) const
{
if ((offset >= CompactionBlobEncoder.EndOffset) && (offset < BlobEncoder.StartOffset)) {
offset = BlobEncoder.StartOffset;
}

if (BlobEncoder.DataKeysBody.empty()) {
return CompactionBlobEncoder;
}
Expand Down Expand Up @@ -1153,17 +1163,24 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
}

const TPartitionBlobEncoder& blobEncoder = GetBlobEncoder(offset);
offset = Max(offset, blobEncoder.StartOffset);
const std::deque<TDataKey>& container = GetContainer(blobEncoder, offset);
PQ_ENSURE(!container.empty());
PQ_ENSURE(!container.empty())
("offset", offset)
("cz.StartOffset", CompactionBlobEncoder.StartOffset)("cz.EndOffset", CompactionBlobEncoder.EndOffset)
("fwz.StartOffset", BlobEncoder.StartOffset)("fwz.EndOffset", BlobEncoder.EndOffset)
;

auto it = std::upper_bound(container.begin(), container.end(), offset,
[](const ui64 offset, const TDataKey& p) {
return offset < p.Key.GetOffset() ||
offset == p.Key.GetOffset() && p.Key.GetPartNo() > 0;
});
// Always greater
PQ_ENSURE(it != container.begin())("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset)
("offset", offset)("containter size", container.size())("first-elem", container.front().Key.ToString())
PQ_ENSURE(it != container.begin())
("StartOffset", blobEncoder.StartOffset)("HeadOffset", blobEncoder.Head.Offset)
("offset", offset)
("containter size", container.size())("first-elem", container.front().Key.ToString())
("is-fast-write", blobEncoder.ForFastWrite);
PQ_ENSURE(it == container.end() ||
offset < it->Key.GetOffset() ||
Expand Down
66 changes: 32 additions & 34 deletions ydb/core/persqueue/pqtablet/partition/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,15 +494,6 @@ TMaybe<TReadAnswer> TReadInfo::AddBlobsFromBody(const TVector<NPQ::TRequestedBlo
TClientBlob &res = batch.Blobs[i];
VERIFY_RESULT_BLOB(res, i);

Y_ABORT_UNLESS(PartNo == res.GetPartNo(), "%s",
(TStringBuilder() <<
"\npos=" << pos <<
"\ni=" << i <<
"\nOffset=" << Offset <<
"\nPartNo=" << PartNo <<
"\noffset=" << offset <<
"\npartNo=" << res.GetPartNo()
).data());
AFL_ENSURE(PartNo == res.GetPartNo())("pos", pos)("i", i)("Offset", Offset)("PartNo", PartNo)("offset", offset)("partNo", res.GetPartNo());

if (userInfo) {
Expand Down Expand Up @@ -645,29 +636,34 @@ TReadAnswer TReadInfo::FormAnswer(

AddResultBlob(readResult, writeBlob, Offset);
if (writeBlob.IsLastPart()) {
PartNo = 0;
++Offset;
} else {
++PartNo;
}
if (updateUsage(writeBlob)) {
break;
}
}
}

readAnswer = AddBlobsFromBody(blobs,
CompactedBlobsCount, blobs.size(),
userInfo,
startOffset,
endOffset,
sizeLag,
tablet,
realReadOffset,
readResult,
answer,
needStop,
cnt, size, lastBlobSize,
ctx);
if (readAnswer) {
return std::move(*readAnswer);
if (!needStop && cnt < Count && size < Size) { // body blobs are fully processed and need to take more data
readAnswer = AddBlobsFromBody(blobs,
CompactedBlobsCount, blobs.size(),
userInfo,
startOffset,
endOffset,
sizeLag,
tablet,
realReadOffset,
readResult,
answer,
needStop,
cnt, size, lastBlobSize,
ctx);
if (readAnswer) {
return std::move(*readAnswer);
}
}

AFL_ENSURE(Offset <= (ui64)Max<i64>())("Offset is too big", Offset);
Expand Down Expand Up @@ -706,13 +702,14 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx);
}

void CollectReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount,
void CollectReadRequestFromBody(ui64 startOffset, const ui16 partNo, const ui32 maxCount,
const ui32 maxSize, ui32* rcount, ui32* rsize, ui64 lastOffset,
TBlobKeyTokens* blobKeyTokens,
TPartitionBlobEncoder& zone,
TVector<TRequestedBlob>& result)
{
AFL_ENSURE(rcount && rsize);
startOffset = Max(startOffset, zone.DataKeysBody.empty() ? zone.StartOffset : zone.DataKeysBody.front().Key.GetOffset());
auto blobs = zone.GetBlobsFromBody(startOffset,
partNo,
maxCount,
Expand Down Expand Up @@ -864,7 +861,7 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim

LOG_D("read cookie " << cookie << " Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition
<< " user " << user
<< " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset()
<< " offset " << read->Offset << " partno " << read->PartNo << " count " << read->Count << " size " << read->Size << " endOffset " << GetEndOffset()
<< " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset);

if (offset == GetEndOffset()) {
Expand Down Expand Up @@ -1072,14 +1069,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
GetReadRequestFromCompactedBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
&info.BlobKeyTokens, blobs);
info.CompactedBlobsCount = blobs.size();
GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
&info.BlobKeyTokens, blobs);

info.Blobs = blobs;
ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset();

LOG_D("read cookie " << cookie << " added " << info.Blobs.size()
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset());

if (blobs.empty() ||
((info.CompactedBlobsCount > 0) && (blobs[info.CompactedBlobsCount - 1].Key == CompactionBlobEncoder.DataKeysBody.back().Key))) { // read from head only when all blobs from body processed
Expand All @@ -1091,6 +1080,15 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
info.CachedOffset = insideHeadOffset;
}

GetReadRequestFromFastWriteBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size, info.LastOffset,
&info.BlobKeyTokens, blobs);

info.Blobs = blobs;
ui64 lastOffset = blobs.empty() ? info.Offset : blobs.back().Key.GetOffset();

LOG_D("read cookie " << cookie << " added " << info.Blobs.size()
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset << ", current partition end offset: " << GetEndOffset());

PQ_ENSURE(info.BlobKeyTokens.Size() == info.Blobs.size());
if (info.Destination != 0) {
++userInfo.ActiveReads;
Expand Down
60 changes: 60 additions & 0 deletions ydb/core/persqueue/ut/pq_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,66 @@ Y_UNIT_TEST(TestWritePQ) {
TestWritePQImpl(false);
}

Y_UNIT_TEST(Read_From_Different_Zones_What_Was_Written_With_Gaps)
{
// The test creates messages in different zones. There are gaps in the offsets between the zones.
// We check that the client can read from any offset from any zone.
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
return tc.InitialEventsFilter.Prepare();
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
activeZone = false;
TFinalizer finalizer(tc);
tc.EnableDetailedPQLog = true;
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(100);

// Important client, lifetimeseconds=0 - never delete
PQTabletPrepare({.partitions = 1, .storageLimitBytes = 50_MB}, {{"user", true}}, tc);

TVector<std::pair<ui64, TString>> data;

data.emplace_back(1, TString(1'000, 'x'));

// CompactZone.Body
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 100);
++data[0].first;
data[0].second = TString(7'000'000, 'x');
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 101);

CmdRunCompaction(0, tc);

// CompactZone.Head
++data[0].first;
data[0].second = TString(1'000, 'x');
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 200);
++data[0].first;
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 201);

CmdRunCompaction(0, tc);

// FastWriteZone.Body
++data[0].first;
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 300);
++data[0].first;
CmdWrite(0, "sourceid", data, tc, false, {}, true, "", -1, 301);

PQGetPartInfo(100, 302, tc);

CmdRead(0, 102, Max<i32>(), Max<i32>(), 4, false, tc, {200, 201, 300, 301});
CmdRead(0, 202, Max<i32>(), Max<i32>(), 2, false, tc, {300, 301});

// The client has committed an offset between the zones
CmdSetOffset(0, "user", 103, false, tc);
PQTabletRestart(tc);

CmdSetOffset(0, "user", 203, false, tc);
PQTabletRestart(tc);

CmdRead(0, 102, Max<i32>(), Max<i32>(), 4, false, tc, {200, 201, 300, 301});
CmdRead(0, 202, Max<i32>(), Max<i32>(), 2, false, tc, {300, 301});
});
}

Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
TTestContext tc;
Expand Down
Loading