diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 35f0a960ec08..ebd968b3fb46 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -551,16 +551,19 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReleased::TP if (ev->Get()->Graceful || !DirectRead) { if (!partitionInfo.Releasing) { - auto p = partitionInfo.Partition; return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() - << "graceful release of partition that is not requested for release is forbiden for " << p, ctx); + << "graceful release of partition that is not requested for release is forbiden for " << partitionInfo.Partition, ctx); } + if (partitionInfo.Stopping) { // Ignore release for graceful request if alredy got stopping return; } - if (!DirectRead) { - ReleasePartition(it, true, ctx); - } else { + + if (DirectRead) { + if (!partitionInfo.DirectReads.empty()) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "releasing partition, but not all direct reads are done " << partitionInfo.Partition, ctx); + } SendReleaseSignal(it->second, true, ctx); } } else { @@ -569,9 +572,11 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReleased::TP return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() << "release of partition that is not requested is forbiden for " << partitionInfo.Partition, ctx); } + //TODO: filter all direct reads - ReleasePartition(it, true, ctx); } + + ReleasePartition(it, true, ctx); } template @@ -1744,9 +1749,8 @@ void TReadSessionActor::ProcessBalancerDead(ui64 tabletId, if (jt->second.LockSent) { SendReleaseSignal(jt->second, true, ctx); } - if (!DirectRead || !jt->second.LockSent) { // in direct read mode wait for final release from client - ReleasePartition(jt, true, ctx); - } + + ReleasePartition(jt, true, ctx); } else { ++it; } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 26983ef68474..db9ba2366968 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -379,8 +379,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } - - Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs) { + void TestStreamReadCreateAndDestroyMsgs(bool directRead = false) { TPersQueueV1TestServer server; SET_LOCALS; MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService); @@ -400,8 +399,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::StreamReadMessage::FromServer resp; req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1"); - req.mutable_init_request()->set_consumer("user"); + req.mutable_init_request()->set_direct_read(directRead); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -480,8 +479,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::StreamReadMessage::FromServer resp; req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1"); - req.mutable_init_request()->set_consumer("user"); + req.mutable_init_request()->set_direct_read(directRead); if (!readStreamSecond->Write(req)) { ythrow yexception() << "write fail"; @@ -507,6 +506,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.Clear(); req.mutable_stop_partition_session_response()->set_partition_session_id(stream_id); + if (directRead) { + req.mutable_stop_partition_session_response()->set_graceful(resp.stop_partition_session_request().graceful()); + } + if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } @@ -546,6 +549,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs) { + TestStreamReadCreateAndDestroyMsgs(false); + } + + Y_UNIT_TEST(StreamReadCreateAndDestroyMsgs_DirectRead) { + TestStreamReadCreateAndDestroyMsgs(true); + } Y_UNIT_TEST(StreamReadCommitAndStatusMsgs) { TPersQueueV1TestServer server; @@ -933,6 +943,26 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + ui64 ExpectAnyPartitionRelease() { + Cerr << "Get StopPartitionSessionRequest\n"; + Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(ControlStream->Read(&resp)); + + TStringBuilder msg; + msg << "Got message from control session (expect partition release): " << resp.DebugString() << "\n"; + + Cerr << msg; + UNIT_ASSERT(resp.server_message_case() == Topic::StreamReadMessage::FromServer::kStopPartitionSessionRequest); + auto assignId = resp.stop_partition_session_request().partition_session_id(); + Topic::StreamReadMessage::FromClient req; + req.mutable_stop_partition_session_response()->set_partition_session_id(assignId); + req.mutable_stop_partition_session_response()->set_graceful(resp.stop_partition_session_request().graceful()); + if (!ControlStream->Write(req)) { + ythrow yexception() << "write fail"; + } + return assignId; + } + void DoWrite(NYdb::TDriver* driver, const TString& topic, ui64 size, ui32 count, const TString& srcId = "srcID", const std::optional& partGroup = {}) { @@ -1606,8 +1636,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << "Kill PQRB \n"; server.Server->AnnoyingClient->KillTablet(*(server.Server->CleverServer), tabletId); setup.DoWrite(pqClient->GetDriver(), "acc/topic3", 10_MB, 1); - setup.SendDirectReadAck(assignId, 1); - range = setup.ReadDataNoAck(assignId, 2).Range; setup.ExpectPartitionRelease(assignId); setup.ExpectDestroyPartitionSession(assignId); assignRes = setup.GetNextAssign("acc/topic3"); @@ -1707,6 +1735,43 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { checkCachedData(nextAssignRes.AssignId, 5, 1, nextAssignRes.Generation); } + Y_UNIT_TEST(DirectReadTwoSessions) { + TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; + SET_LOCALS; + TString topicPath{"acc/topic2"}; + TString oldPath{"/Root/PQ/rt3.dc1--acc--topic2"}; + + // Create a topic with two partitions: + server.Server->AnnoyingClient->CreateTopicNoLegacy({ .Name = oldPath, .PartsCount = 2 }); + + TDirectReadTestSetup setup1{server}; + TDirectReadTestSetup setup2{server}; + + setup1.InitControlSession(topicPath); + auto assignResult1 = setup1.GetNextAssign(topicPath); + auto assignResult2 = setup1.GetNextAssign(topicPath); + UNIT_ASSERT(assignResult1.PartitionId == 0 && assignResult2.PartitionId == 1 || + assignResult1.PartitionId == 1 && assignResult2.PartitionId == 0); + setup1.InitDirectSession(topicPath); + + setup1.SendReadSessionAssign(assignResult1.AssignId, assignResult1.Generation); + setup1.SendReadSessionAssign(assignResult2.AssignId, assignResult2.Generation); + + setup2.InitControlSession(topicPath); + + auto releasedAssignId = setup1.ExpectAnyPartitionRelease(); + auto partitionId = releasedAssignId == assignResult1.AssignId + ? assignResult1.PartitionId + : assignResult2.PartitionId; + + auto assignResult3 = setup2.GetNextAssign(topicPath); + UNIT_ASSERT(assignResult3.PartitionId == partitionId); + setup2.InitDirectSession(topicPath); + + setup2.SendReadSessionAssign(assignResult3.AssignId, assignResult3.Generation); + + } + Y_UNIT_TEST(StreamReadManyUpdateTokenAndRead) { TPersQueueV1TestServer server; SET_LOCALS;