Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,16 +551,19 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP

if (ev->Get()->Graceful || !DirectRead) {
if (!partitionInfo.Releasing) {
auto p = partitionInfo.Partition;
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
<< "graceful release of partition that is not requested for release is forbiden for " << p, ctx);
<< "graceful release of partition that is not requested for release is forbiden for " << partitionInfo.Partition, ctx);
}

if (partitionInfo.Stopping) { // Ignore release for graceful request if alredy got stopping
return;
}
if (!DirectRead) {
ReleasePartition(it, true, ctx);
} else {

if (DirectRead) {
if (!partitionInfo.DirectReads.empty()) {
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
<< "releasing partition, but not all direct reads are done " << partitionInfo.Partition, ctx);
}
SendReleaseSignal(it->second, true, ctx);
}
} else {
Expand All @@ -569,9 +572,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
<< "release of partition that is not requested is forbiden for " << partitionInfo.Partition, ctx);
}

//TODO: filter all direct reads
ReleasePartition(it, true, ctx);
}

ReleasePartition(it, true, ctx);
}

template <bool UseMigrationProtocol>
Expand Down Expand Up @@ -1744,9 +1749,8 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(ui64 tabletId,
if (jt->second.LockSent) {
SendReleaseSignal(jt->second, true, ctx);
}
if (!DirectRead || !jt->second.LockSent) { // in direct read mode wait for final release from client
ReleasePartition(jt, true, ctx);
}

ReleasePartition(jt, true, ctx);
} else {
++it;
}
Expand Down
77 changes: 71 additions & 6 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}


Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs) {
void TestStreamReadCreateAndDestroyMsgs(bool directRead = false) {
TPersQueueV1TestServer server;
SET_LOCALS;
MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
Expand All @@ -400,8 +399,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::StreamReadMessage::FromServer resp;

req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1");

req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_direct_read(directRead);

if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
Expand Down Expand Up @@ -480,8 +479,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::StreamReadMessage::FromServer resp;

req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1");

req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_direct_read(directRead);

if (!readStreamSecond->Write(req)) {
ythrow yexception() << "write fail";
Expand All @@ -507,6 +506,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {

req.Clear();
req.mutable_stop_partition_session_response()->set_partition_session_id(stream_id);
if (directRead) {
req.mutable_stop_partition_session_response()->set_graceful(resp.stop_partition_session_request().graceful());
}

if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
Expand Down Expand Up @@ -546,6 +549,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}

Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs) {
TestStreamReadCreateAndDestroyMsgs(false);
}

Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs_DirectRead) {
TestStreamReadCreateAndDestroyMsgs(true);
}

Y_UNIT_TEST(StreamReadCommitAndStatusMsgs) {
TPersQueueV1TestServer server;
Expand Down Expand Up @@ -933,6 +943,26 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}

ui64 ExpectAnyPartitionRelease() {
Cerr << "Get StopPartitionSessionRequest\n";
Topic::StreamReadMessage::FromServer resp;
UNIT_ASSERT(ControlStream->Read(&resp));

TStringBuilder msg;
msg << "Got message from control session (expect partition release): " << resp.DebugString() << "\n";

Cerr << msg;
UNIT_ASSERT(resp.server_message_case() == Topic::StreamReadMessage::FromServer::kStopPartitionSessionRequest);
auto assignId = resp.stop_partition_session_request().partition_session_id();
Topic::StreamReadMessage::FromClient req;
req.mutable_stop_partition_session_response()->set_partition_session_id(assignId);
req.mutable_stop_partition_session_response()->set_graceful(resp.stop_partition_session_request().graceful());
if (!ControlStream->Write(req)) {
ythrow yexception() << "write fail";
}
return assignId;
}

void DoWrite(NYdb::TDriver* driver, const TString& topic, ui64 size, ui32 count,
const TString& srcId = "srcID", const std::optional<ui64>& partGroup = {})
{
Expand Down Expand Up @@ -1606,8 +1636,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "Kill PQRB \n";
server.Server->AnnoyingClient->KillTablet(*(server.Server->CleverServer), tabletId);
setup.DoWrite(pqClient->GetDriver(), "acc/topic3", 10_MB, 1);
setup.SendDirectReadAck(assignId, 1);
range = setup.ReadDataNoAck(assignId, 2).Range;
setup.ExpectPartitionRelease(assignId);
setup.ExpectDestroyPartitionSession(assignId);
assignRes = setup.GetNextAssign("acc/topic3");
Expand Down Expand Up @@ -1707,6 +1735,43 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
checkCachedData(nextAssignRes.AssignId, 5, 1, nextAssignRes.Generation);
}

Y_UNIT_TEST(DirectReadTwoSessions) {
TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
SET_LOCALS;
TString topicPath{"acc/topic2"};
TString oldPath{"/Root/PQ/rt3.dc1--acc--topic2"};

// Create a topic with two partitions:
server.Server->AnnoyingClient->CreateTopicNoLegacy({ .Name = oldPath, .PartsCount = 2 });

TDirectReadTestSetup setup1{server};
TDirectReadTestSetup setup2{server};

setup1.InitControlSession(topicPath);
auto assignResult1 = setup1.GetNextAssign(topicPath);
auto assignResult2 = setup1.GetNextAssign(topicPath);
UNIT_ASSERT(assignResult1.PartitionId == 0 && assignResult2.PartitionId == 1 ||
assignResult1.PartitionId == 1 && assignResult2.PartitionId == 0);
setup1.InitDirectSession(topicPath);

setup1.SendReadSessionAssign(assignResult1.AssignId, assignResult1.Generation);
setup1.SendReadSessionAssign(assignResult2.AssignId, assignResult2.Generation);

setup2.InitControlSession(topicPath);

auto releasedAssignId = setup1.ExpectAnyPartitionRelease();
auto partitionId = releasedAssignId == assignResult1.AssignId
? assignResult1.PartitionId
: assignResult2.PartitionId;

auto assignResult3 = setup2.GetNextAssign(topicPath);
UNIT_ASSERT(assignResult3.PartitionId == partitionId);
setup2.InitDirectSession(topicPath);

setup2.SendReadSessionAssign(assignResult3.AssignId, assignResult3.Generation);

}

Y_UNIT_TEST(StreamReadManyUpdateTokenAndRead) {
TPersQueueV1TestServer server;
SET_LOCALS;
Expand Down
Loading