diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index bc994f2e9253..c9e50cefd075 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -1392,7 +1392,15 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, con } } + auto txIter = TransactionsInflight.begin(); + if (txIter->second->ProposeConfig) { + PQ_ENSURE(!ChangeConfig); + ChangeConfig = + MakeSimpleShared(TopicConverter, + txIter->second->ProposeConfig->Config); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + } if (ChangeConfig) { PQ_ENSURE(TransactionsInflight.size() == 1)("Step", ev->Step)("TxId", ev->TxId); PendingExplicitMessageGroups = ev->ExplicitMessageGroups; @@ -1549,7 +1557,9 @@ void TPartition::WriteInfoResponseHandler( ProcessTxsAndUserActs(ctx); } -TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) { +TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ bool isImmediate = (tx.ProposeTransaction != nullptr); PQ_ENSURE(tx.WriteInfo); PQ_ENSURE(!tx.WriteInfoApplied); @@ -1568,22 +1578,23 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) EProcessResult ret = EProcessResult::Continue; const auto& knownSourceIds = SourceIdStorage.GetInMemorySourceIds(); - THashSet txSourceIds; - for (auto& s : srcIdInfo) { + TVector txSourceIds; + for (const auto& s : srcIdInfo) { if (TxAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("TxAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } if (isImmediate) { - WriteAffectedSourcesIds.insert(s.first); + txSourceIds.push_back(s.first); + LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } else { if (WriteAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("WriteAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } - txSourceIds.insert(s.first); + txSourceIds.push_back(s.first); LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } @@ -1617,12 +1628,16 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) } } } + if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) { - TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end()); + auto& sourceIds = + (isImmediate ? affectedSourceIdsAndConsumers.WriteSourcesIds : affectedSourceIdsAndConsumers.TxWriteSourcesIds); + sourceIds = std::move(txSourceIds); tx.WriteInfoApplied = true; - WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size(); - WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size(); + + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->BodyKeys.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->SrcIdInfo.size(); } return ret; @@ -2264,22 +2279,24 @@ size_t TPartition::GetUserActCount(const TString& consumer) const } } -void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) +void TPartition::ProcessTxsAndUserActs(const TActorContext&) { if (KVWriteInProgress) { - LOG_D("Writing. Can't process transactions and user actions"); + LOG_D("Writing. Can't process user action and tx events"); return; } + if (DeletePartitionState == DELETION_INITED) { if (!PersistRequest) { PersistRequest = MakeHolder(); } + ScheduleNegativeReplies(); ScheduleDeletePartitionDone(); AddCmdDeleteRangeForAllKeys(*PersistRequest); - ctx.Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); + Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); PersistRequest = nullptr; CurrentPersistRequestSpan = std::move(PersistRequestSpan); PersistRequestSpan = NWilson::TSpan(); @@ -2288,118 +2305,126 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) return; } - LOG_D("Batching state before ContinueProcessTxsAndUserActs: " << (int)BatchingState); - if (CanProcessUserActionAndTransactionEvents()) { - ContinueProcessTxsAndUserActs(ctx); - } - // Still preprocessing? Waiting for something - if (CanProcessUserActionAndTransactionEvents()) { - LOG_D("Still preprocessing - waiting for something"); + + LOG_D("Process user action and tx events"); + ProcessUserActionAndTxEvents(); + DumpTheSizeOfInternalQueues(); + if (!UserActionAndTxPendingWrite.empty()) { + LOG_D("Waiting for the batch to finish"); return; } - LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState); - // Preprocessing complete; + LOG_D("Process user action and tx pending commits"); + ProcessUserActionAndTxPendingCommits(); + DumpTheSizeOfInternalQueues(); + if (CurrentBatchSize > 0) { LOG_D("Batch completed (" << CurrentBatchSize << ")"); Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); } CurrentBatchSize = 0; - if (UserActionAndTxPendingCommit.empty()) { - // Processing stopped and nothing to commit - finalize - BatchingState = ETxBatchingState::Finishing; - } else { - // Process commit queue - ProcessCommitQueue(); - } - // BatchingState can go to Finishing in ContinueProcessTxsAndUserActs. Therefore, it is necessary to check - // the size of the UserActionAndTxPendingCommit queue here. - if (!UserActionAndTxPendingCommit.empty()) { - // Still pending for come commits - LOG_D("Still pending for come commits"); - return; - } LOG_D("Try persist"); - // Here we have an empty UserActionAndTxPendingCommit queue and BatchingState is equal to Finishing. RunPersist(); } -bool TPartition::CanProcessUserActionAndTransactionEvents() const +void TPartition::ProcessUserActionAndTxEvents() { - return (BatchingState == ETxBatchingState::PreProcessing); -} - -void TPartition::ContinueProcessTxsAndUserActs(const TActorContext&) -{ - PQ_ENSURE(!KVWriteInProgress); - - if (WriteCycleSizeEstimate >= MAX_WRITE_CYCLE_SIZE || WriteKeysSizeEstimate >= MAX_KEYS) { - BatchingState = ETxBatchingState::Finishing; - return; - } - auto visitor = [this](auto& event) { - return this->PreProcessUserActionOrTransaction(event); - }; - while (CanProcessUserActionAndTransactionEvents() && !UserActionAndTransactionEvents.empty()) { + while (!UserActionAndTransactionEvents.empty()) { if (ChangingConfig) { - BatchingState = ETxBatchingState::Finishing; break; } + auto& front = UserActionAndTransactionEvents.front(); if (TMessage* msg = std::get_if(&front.Event); msg && msg->WaitPreviousWriteSpan) { msg->WaitPreviousWriteSpan.End(); } + + auto visitor = [this, &front](auto& event) { + return this->ProcessUserActionAndTxEvent(event, front.AffectedSourceIdsAndConsumers); + }; switch (std::visit(visitor, front.Event)) { case EProcessResult::Continue: - MoveUserActOrTxToCommitState(); - FirstEvent = false; + MoveUserActionAndTxToPendingCommitQueue(); break; case EProcessResult::ContinueDrop: UserActionAndTransactionEvents.pop_front(); break; case EProcessResult::Break: - MoveUserActOrTxToCommitState(); - BatchingState = ETxBatchingState::Finishing; - FirstEvent = false; + MoveUserActionAndTxToPendingCommitQueue(); break; case EProcessResult::Blocked: - BatchingState = ETxBatchingState::Executing; return; case EProcessResult::NotReady: return; } - CurrentBatchSize += 1; } - if (UserActionAndTransactionEvents.empty()) { - BatchingState = ETxBatchingState::Executing; - return; +} + +void TPartition::DumpTheSizeOfInternalQueues() const +{ + LOG_D("Events: " << UserActionAndTransactionEvents.size() << + ", PendingCommits: " << UserActionAndTxPendingCommit.size() << + ", PendingWrites: " << UserActionAndTxPendingWrite.size()); +} + +TString GetTransactionType(const TTransaction& tx) +{ + if (tx.Tx) { + return "Tx"; + } else if (tx.ProposeTransaction) { + return "ImmediateTx"; + } else if (tx.ProposeConfig) { + return "ProposeConfig"; + } else if (tx.ChangeConfig) { + return "ChangeConfig"; + } else { + return "???"; } +} +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + LOG_D("TPartition::ProcessUserActionAndTxEvent(TEvPQ::TEvSetClientInfo)"); + return PreProcessUserActionOrTransaction(event, affectedSourceIdsAndConsumers); } -void TPartition::MoveUserActOrTxToCommitState() { - auto& front = UserActionAndTransactionEvents.front(); - UserActionAndTxPendingCommit.push_back(std::move(front)); - UserActionAndTransactionEvents.pop_front(); +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + LOG_D("TPartition::ProcessUserActionAndTxEvent(TTransaction[" << GetTransactionType(*tx) << "])"); + return PreProcessUserActionOrTransaction(tx, affectedSourceIdsAndConsumers); +} + +auto TPartition::ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + LOG_D("TPartition::ProcessUserActionAndTxEvent(TMessage)"); + return PreProcessUserActionOrTransaction(msg, affectedSourceIdsAndConsumers); } -void TPartition::ProcessCommitQueue() { +bool TPartition::WritingCycleDoesNotExceedTheLimits() const +{ + return WriteCycleSizeEstimate < MAX_WRITE_CYCLE_SIZE && WriteKeysSizeEstimate < MAX_KEYS; +} + +void TPartition::ProcessUserActionAndTxPendingCommits() { CurrentBatchSize = 0; PQ_ENSURE(!KVWriteInProgress); if (!PersistRequest) { PersistRequest = MakeHolder(); } - auto visitor = [this, request = PersistRequest.Get()](auto& event) { - return this->ExecUserActionOrTransaction(event, request); - }; - while (!UserActionAndTxPendingCommit.empty()) { + + while (!UserActionAndTxPendingCommit.empty() && WritingCycleDoesNotExceedTheLimits()) { auto& front = UserActionAndTxPendingCommit.front(); auto state = ECommitState::Committed; + if (auto* tx = get_if>(&front.Event)) { state = tx->Get()->State; } + switch (state) { case ECommitState::Pending: return; @@ -2408,13 +2433,110 @@ void TPartition::ProcessCommitQueue() { case ECommitState::Committed: break; } - auto event = std::move(front.Event); + + UserActionAndTxPendingWrite.push_back(std::move(front)); UserActionAndTxPendingCommit.pop_front(); + + auto& event = UserActionAndTxPendingWrite.back().Event; + auto visitor = [this, request = PersistRequest.Get()](auto& event) { + return this->ProcessUserActionAndTxPendingCommit(event, request); + }; std::visit(visitor, event); + + ++CurrentBatchSize; } - if (UserActionAndTxPendingCommit.empty()) { - BatchingState = ETxBatchingState::Finishing; +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request) +{ + LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TEvPQ::TEvSetClientInfo)"); + ExecUserActionOrTransaction(event, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request) +{ + LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TTransaction[" << GetTransactionType(*tx) << "])"); + ExecUserActionOrTransaction(tx, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request) +{ + LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TMessage)"); + ExecUserActionOrTransaction(msg, request); +} + +static void AppendToSet(const TVector& p, THashMap& q) +{ + for (const auto& s : p) { + ++q[s]; + } +} + +void TPartition::AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + AppendToSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + AppendToSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + WriteKeysSizeEstimate += affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers() +{ + if (UserActionAndTxPendingWrite.empty()) { + return; } + + for (const auto& e : UserActionAndTxPendingWrite) { + DeleteAffectedSourceIdsAndConsumers(e.AffectedSourceIdsAndConsumers); + + if (auto* tx = std::get_if>(&e.Event); tx) { + if (auto txId = (*tx)->GetTxId(); txId.Defined()) { + TransactionsInflight.erase(*txId); + } + + if ((*tx)->ChangeConfig || (*tx)->ProposeConfig) { + ChangingConfig = false; + } + } + } + + UserActionAndTxPendingWrite.clear(); +} + +void TPartition::DeleteFromSet(const TVector& p, THashMap& q) const +{ + for (const auto& s : p) { + auto i = q.find(s); + PQ_ENSURE(i != q.end()); + PQ_ENSURE(i->second > 0); + if (--i->second) { + continue; + } + q.erase(s); + } +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + DeleteFromSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + DeleteFromSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + PQ_ENSURE(WriteKeysSizeEstimate >= affectedSourceIdsAndConsumers.WriteKeysSize); + WriteKeysSizeEstimate -= affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::MoveUserActionAndTxToPendingCommitQueue() { + auto& front = UserActionAndTransactionEvents.front(); + AppendAffectedSourceIdsAndConsumers(front.AffectedSourceIdsAndConsumers); + UserActionAndTxPendingCommit.push_back(std::move(front)); + UserActionAndTransactionEvents.pop_front(); } ui64 TPartition::NextReadCookie() @@ -2426,9 +2548,6 @@ ui64 TPartition::NextReadCookie() } void TPartition::RunPersist() { - TransactionsInflight.clear(); - - PQ_ENSURE(UserActionAndTxPendingCommit.empty()); const auto& ctx = ActorContext(); const auto now = ctx.Now(); if (!PersistRequest) { @@ -2571,7 +2690,7 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest() return haveChanges; } -//void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) +//void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const //{ // DBGTRACE_LOG("=== DumpKeyValueRequest ==="); // DBGTRACE_LOG("--- delete ----------------"); @@ -2639,7 +2758,8 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx) Replies.clear(); } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { auto span = t->CalcPredicateSpan.CreateChild(TWilsonTopic::TopicTopLevel, "Topic.Partition.PreProcess", @@ -2651,7 +2771,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::NotReady; } if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied - result = ApplyWriteInfoResponse(*t); + result = ApplyWriteInfoResponse(*t, affectedSourceIdsAndConsumers); if (!t->WriteInfoApplied) { // Tried to apply write info but couldn't - TX must be blocked. PQ_LOG_TX_D("The TxId " << t->GetTxId() << " must be blocked"); PQ_ENSURE(result != EProcessResult::Continue); @@ -2664,23 +2784,24 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::Continue; } t->Predicate.ConstructInPlace(true); - return PreProcessImmediateTx(t->ProposeTransaction->GetRecord()); + return PreProcessImmediateTx(*t, affectedSourceIdsAndConsumers); } else if (t->Tx) { // Distributed TX if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined. ReplyToProposeOrPredicate(t, true); return EProcessResult::Continue; } - result = BeginTransaction(*t->Tx, t->Predicate, t->Message); + result = BeginTransactionData(*t, affectedSourceIdsAndConsumers); if (t->Predicate.Defined()) { ReplyToProposeOrPredicate(t, true); } return result; } else if (t->ProposeConfig) { - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } - t->Predicate = BeginTransaction(*t->ProposeConfig); + t->Predicate = BeginTransactionConfig(); ChangingConfig = true; PendingPartitionConfig = GetPartitionConfig(t->ProposeConfig->Config); //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); @@ -2690,7 +2811,8 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple PQ_ENSURE(t->ChangeConfig); PQ_ENSURE(!ChangeConfig && !ChangingConfig); - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } ChangingConfig = true; @@ -2702,7 +2824,13 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return result; } -bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, TEvKeyValue::TEvRequest*) +bool TPartition::HasPendingCommitsOrPendingWrites() const +{ + return !UserActionAndTxPendingCommit.empty() || !UserActionAndTxPendingWrite.empty(); +} + +bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, + TEvKeyValue::TEvRequest*) { auto span = t->CommitSpan.CreateChild(TWilsonTopic::TopicTopLevel, "Topic.Partition.Process", @@ -2721,35 +2849,35 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, case ECommitState::Committed: break; } - const auto& ctx = ActorContext(); if (t->ChangeConfig) { PQ_ENSURE(!ChangeConfig); PQ_ENSURE(ChangingConfig); ChangeConfig = t->ChangeConfig; SendChangeConfigReply = t->SendReply; - BeginChangePartitionConfig(ChangeConfig->Config, ctx); + BeginChangePartitionConfig(ChangeConfig->Config); } else if (t->ProposeConfig) { - PQ_ENSURE(ChangingConfig); - ChangeConfig = MakeSimpleShared(TopicConverter, - t->ProposeConfig->Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + PQ_ENSURE(ChangeConfig); SendChangeConfigReply = false; } CommitTransaction(t); return true; } -TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, - TMaybe& predicateOut, TString& issueMsg) +TPartition::EProcessResult TPartition::BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const TEvPQ::TEvTxCalcPredicate& tx = *t.Tx; + TMaybe& predicateOut = t.Predicate; + TString& issueMsg = t.Message; + if (tx.ForcePredicateFalse) { predicateOut = false; return EProcessResult::Continue; } - THashSet consumers; + TVector consumers; bool result = true; - for (auto& operation : tx.Operations) { + for (const auto& operation : tx.Operations) { const TString& consumer = operation.GetConsumer(); if (TxAffectedConsumers.contains(consumer)) { PQ_LOG_TX_D("TxAffectedConsumers contains consumer " << consumer << ". TxId " << tx.TxId); @@ -2834,25 +2962,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } break; } - consumers.insert(consumer); + consumers.push_back(consumer); PQ_LOG_TX_D("TxId " << tx.TxId << " affect consumer " << consumer); } } if (result) { - TxAffectedConsumers.insert(consumers.begin(), consumers.end()); + affectedSourceIdsAndConsumers.TxReadConsumers = std::move(consumers); } predicateOut = result; return EProcessResult::Continue; } -bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) +bool TPartition::BeginTransactionConfig() { - ChangeConfig = - MakeSimpleShared(TopicConverter, - event.Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); - SendChangeConfigReply = false; return true; } @@ -2951,7 +3074,6 @@ void TPartition::CommitWriteOperations(TTransaction& t) void TPartition::CommitTransaction(TSimpleSharedPtr& t) { - const auto& ctx = ActorContext(); if (t->Tx) { PQ_ENSURE(t->Predicate.Defined() && *t->Predicate); @@ -2996,7 +3118,7 @@ void TPartition::CommitTransaction(TSimpleSharedPtr& t) } else if (t->ProposeConfig) { PQ_ENSURE(t->Predicate.Defined() && *t->Predicate); - BeginChangePartitionConfig(t->ProposeConfig->Config, ctx); + BeginChangePartitionConfig(t->ProposeConfig->Config); ExecChangePartitionConfig(); ChangePlanStepAndTxId(t->ProposeConfig->Step, t->ProposeConfig->TxId); @@ -3028,15 +3150,14 @@ void TPartition::RollbackTransaction(TSimpleSharedPtr& t) } } -void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx) +void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config) { TSet hasReadRule; for (auto&& [consumer, info] : UsersInfoStorage->ViewAll()) { hasReadRule.insert(consumer); } - for (auto& consumer : config.GetConsumers()) { + for (const auto& consumer : config.GetConsumers()) { auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0); TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); @@ -3052,21 +3173,22 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co auto act = MakeHolder(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); - auto res = PreProcessUserAct(*act, ctx); - ChangeConfigActs.emplace_back(std::move(act)); - + auto res = PreProcessUserAct(*act, nullptr); PQ_ENSURE(res == EProcessResult::Continue); + + ChangeConfigActs.emplace_back(std::move(act)); } hasReadRule.erase(consumer.GetName()); } - for (auto& consumer : hasReadRule) { + for (const auto& consumer : hasReadRule) { GetOrCreatePendingUser(consumer); auto act = MakeHolder(0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); - auto res = PreProcessUserAct(*act, ctx); + auto res = PreProcessUserAct(*act, nullptr); PQ_ENSURE(res == EProcessResult::Continue); + ChangeConfigActs.emplace_back(std::move(act)); } } @@ -3079,14 +3201,8 @@ void TPartition::ExecChangePartitionConfig() { } void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) { - FirstEvent = true; - TxAffectedConsumers.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - SetOffsetAffectedConsumers.clear(); - BatchingState = ETxBatchingState::PreProcessing; + DeleteAffectedSourceIdsAndConsumers(); WriteCycleSizeEstimate = 0; - WriteKeysSizeEstimate = 0; if (ChangeConfig) { EndChangePartitionConfig(std::move(ChangeConfig->Config), @@ -3171,8 +3287,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) ChangeConfig = nullptr; PendingPartitionConfig = nullptr; } - ChangingConfig = false; - BatchingState = ETxBatchingState::PreProcessing; } void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, @@ -3259,15 +3373,18 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) TxIdHasChanged = true; } -TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx) +TPartition::EProcessResult TPartition::PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const NKikimrPQ::TEvProposeTransaction& tx = t.ProposeTransaction->GetRecord(); + if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } PQ_ENSURE(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); PQ_ENSURE(tx.HasData()); - THashSet consumers; - for (auto& operation : tx.GetData().GetOperations()) { + TVector consumers; + for (const auto& operation : tx.GetData().GetOperations()) { if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } @@ -3294,10 +3411,10 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - consumers.insert(user); + consumers.push_back(user); } - SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); - WriteKeysSizeEstimate += consumers.size(); + affectedSourceIdsAndConsumers.ReadConsumers = std::move(consumers); + affectedSourceIdsAndConsumers.WriteKeysSize += consumers.size(); return EProcessResult::Continue; } @@ -3390,22 +3507,25 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } - return PreProcessUserAct(*act, ActorContext()); + + return PreProcessUserAct(*act, &affectedSourceIdsAndConsumers); } -bool TPartition::ExecUserActionOrTransaction( - TSimpleSharedPtr& event, TEvKeyValue::TEvRequest* -) { +bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest*) +{ CommitUserAct(*event); return true; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (WriteCycleSize >= MAX_WRITE_CYCLE_SIZE) { return EProcessResult::Blocked; @@ -3413,20 +3533,22 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessag auto result = EProcessResult::Continue; if (msg.IsWrite()) { - result = PreProcessRequest(msg.GetWrite()); + result = PreProcessRequest(msg.GetWrite(), affectedSourceIdsAndConsumers); } else if (msg.IsRegisterMessageGroup()) { - result = PreProcessRequest(msg.GetRegisterMessageGroup()); + result = PreProcessRequest(msg.GetRegisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsDeregisterMessageGroup()) { - result = PreProcessRequest(msg.GetDeregisterMessageGroup()); + result = PreProcessRequest(msg.GetDeregisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsSplitMessageGroup()) { - result = PreProcessRequest(msg.GetSplitMessageGroup()); + result = PreProcessRequest(msg.GetSplitMessageGroup(), affectedSourceIdsAndConsumers); } else { PQ_ENSURE(msg.IsOwnership()); } + return result; } -bool TPartition::ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request) +bool TPartition::ExecUserActionOrTransaction(TMessage& msg, + TEvKeyValue::TEvRequest* request) { const auto& ctx = ActorContext(); if (!HaveWriteMsg) { @@ -3456,9 +3578,9 @@ bool TPartition::ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequ return true; } -TPartition::EProcessResult TPartition::PreProcessUserAct( - TEvPQ::TEvSetClientInfo& act, const TActorContext& -) { +TPartition::EProcessResult TPartition::PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers) +{ PQ_ENSURE(!KVWriteInProgress); const TString& user = act.ClientId; @@ -3467,8 +3589,12 @@ TPartition::EProcessResult TPartition::PreProcessUserAct( return EProcessResult::Blocked; } } - WriteKeysSizeEstimate += 1; - SetOffsetAffectedConsumers.insert(user); + + if (affectedSourceIdsAndConsumers) { + ++affectedSourceIdsAndConsumers->WriteKeysSize; + affectedSourceIdsAndConsumers->ReadConsumers.push_back(user); + } + return EProcessResult::Continue; } @@ -4416,4 +4542,5 @@ void TPartition::ResetDetailedMetrics() { BytesWrittenPerPartition.Reset(); MessagesWrittenPerPartition.Reset(); } + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index 751cbcf79fb4..7bc679a681aa 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -354,11 +354,31 @@ class TPartition : public TBaseTabletActor { TAutoPtr MakeHasDataInfoResponse(ui64 lagSize, const TMaybe& cookie, bool readingFinished = false); void ProcessTxsAndUserActs(const TActorContext& ctx); - void ContinueProcessTxsAndUserActs(const TActorContext& ctx); - void ProcessCommitQueue(); void RunPersist(); - void MoveUserActOrTxToCommitState(); + enum class EProcessResult; + struct TAffectedSourceIdsAndConsumers; + + void ProcessUserActionAndTxEvents(); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + + void MoveUserActionAndTxToPendingCommitQueue(); + + void ProcessUserActionAndTxPendingCommits(); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request); + + bool WritingCycleDoesNotExceedTheLimits() const; + void PushBackDistrTx(TSimpleSharedPtr event); void PushBackDistrTx(TSimpleSharedPtr event); void PushFrontDistrTx(TSimpleSharedPtr event); @@ -440,14 +460,13 @@ class TPartition : public TBaseTabletActor { const TString& reason); THolder MakeCommitDone(ui64 step, ui64 txId); - bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event); + bool BeginTransactionConfig(); void CommitTransaction(TSimpleSharedPtr& t); void RollbackTransaction(TSimpleSharedPtr& t); - void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx); + void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config); void ExecChangePartitionConfig(); void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx); @@ -752,10 +771,10 @@ class TPartition : public TBaseTabletActor { TMaybe KafkaProducerEpoch = 0; }; - THashSet TxAffectedSourcesIds; - THashSet WriteAffectedSourcesIds; - THashSet TxAffectedConsumers; - THashSet SetOffsetAffectedConsumers; + THashMap TxAffectedSourcesIds; + THashMap WriteAffectedSourcesIds; + THashMap TxAffectedConsumers; + THashMap SetOffsetAffectedConsumers; THashMap TxSourceIdForPostPersist; THashMap TxInflightMaxSeqNoPerSourceId; @@ -802,45 +821,60 @@ class TPartition : public TBaseTabletActor { mutable TMaybe InitLogPrefix; mutable TMaybe UnknownLogPrefix; - // template T& GetUserActionAndTransactionEventsFront(); - // template T& GetCurrentEvent(); - //TSimpleSharedPtr& GetCurrentTransaction(); + struct TAffectedSourceIdsAndConsumers { + TVector TxWriteSourcesIds; + TVector WriteSourcesIds; + TVector TxReadConsumers; + TVector ReadConsumers; + ui32 WriteKeysSize = 0; + }; - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx); - EProcessResult PreProcessUserActionOrTransaction(TMessage& msg); + void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, TEvKeyValue::TEvRequest* request); + void DeleteAffectedSourceIdsAndConsumers(); + void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + void DeleteFromSet(const TVector& p, THashMap& q) const; + + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, - TEvKeyValue::TEvRequest* request); + bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TSimpleSharedPtr& tx, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx); + [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers); void CommitUserAct(TEvPQ::TEvSetClientInfo& act); - [[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx); + [[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecImmediateTx(TTransaction& tx); - EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TWriteMsg& msg); + EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TWriteMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, - TMaybe& predicate, TString& issueMsg); + [[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - EProcessResult ApplyWriteInfoResponse(TTransaction& tx); + EProcessResult ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool FirstEvent = true; bool HaveWriteMsg = false; bool HaveCheckDisk = false; bool HaveDrop = false; @@ -860,13 +894,12 @@ class TPartition : public TBaseTabletActor { void BeginAppendHeadWithNewWrites(const TActorContext& ctx); void EndAppendHeadWithNewWrites(const TActorContext& ctx); + bool HasPendingCommitsOrPendingWrites() const; + // // user actions and transactions // struct TUserActionAndTransactionEvent { - std::variant, // user actions - TSimpleSharedPtr, // distributed transaction or update config - TMessage> Event; TUserActionAndTransactionEvent(TSimpleSharedPtr&& transaction) : Event(std::move(transaction)) {} @@ -876,10 +909,16 @@ class TPartition : public TBaseTabletActor { TUserActionAndTransactionEvent(TMessage&& message) : Event(std::move(message)) {} + + std::variant, // user actions + TSimpleSharedPtr, // distributed transaction or update config + TMessage> Event; + TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers; }; std::deque UserActionAndTransactionEvents; std::deque UserActionAndTxPendingCommit; + std::deque UserActionAndTxPendingWrite; TVector> WriteInfosApplied; THashMap> TransactionsInflight; @@ -902,15 +941,6 @@ class TPartition : public TBaseTabletActor { TMessageQueue Responses; ui64 CurrentBatchSize = 0; - enum class ETxBatchingState{ - PreProcessing, - Executing, - Finishing - }; - ETxBatchingState BatchingState = ETxBatchingState::PreProcessing; - // - // - // std::deque> UpdateUserInfoTimestamp; bool ReadingTimestamp; TString ReadingForUser; @@ -1126,7 +1156,7 @@ class TPartition : public TBaseTabletActor { size_t WriteNewSizeFromSupportivePartitions = 0; bool TryAddDeleteHeadKeysToPersistRequest(); - void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request); + void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const; TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key); @@ -1146,6 +1176,7 @@ class TPartition : public TBaseTabletActor { size_t CompactionBlobsCount = 0; void DumpZones(const char* file = nullptr, unsigned line = 0) const; + void DumpTheSizeOfInternalQueues() const; const TPartitionBlobEncoder& GetBlobEncoder(ui64 offset) const; @@ -1164,7 +1195,6 @@ class TPartition : public TBaseTabletActor { TDeque TxForPersistTraceIds; TDeque TxForPersistSpans; - bool CanProcessUserActionAndTransactionEvents() const; ui64 GetCompactedBlobSizeLowerBound() const; bool CompactRequestedBlob(const TRequestedBlob& requestedBlob, diff --git a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp index ba553eeba531..79e982591649 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp @@ -82,7 +82,7 @@ bool TPartition::ExecRequestForCompaction(TWriteMsg& p, TProcessParametersBase& auto newWrite = CompactionBlobEncoder.PartitionedBlob.Add(std::move(blob)); if (newWrite && !newWrite->Value.empty()) { - AddCmdWrite(newWrite, request, blobCreationUnixTime, ctx); + AddCmdWrite(newWrite, request, blobCreationUnixTime, ctx, false); LOG_D("Topic '" << TopicName() << "' partition " << Partition << @@ -400,7 +400,7 @@ void TPartition::RenameCompactedBlob(TDataKey& k, auto write = CompactionBlobEncoder.PartitionedBlob.Add(k.Key, size, k.Timestamp, false); if (write && !write->Value.empty()) { // надо записать содержимое головы перед первым большим блобом - AddCmdWrite(write, compactionRequest, k.Timestamp, ctx); + AddCmdWrite(write, compactionRequest, k.Timestamp, ctx, false); CompactionBlobEncoder.CompactedKeys.emplace_back(write->Key, write->Value.size()); } diff --git a/ydb/core/persqueue/pqtablet/partition/partition_write.cpp b/ydb/core/persqueue/pqtablet/partition/partition_write.cpp index bbb1cd9b9e2c..60f8118b6f2b 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_write.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_write.cpp @@ -535,10 +535,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { TxSourceIdForPostPersist.clear(); TxInflightMaxSeqNoPerSourceId.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - TxAffectedConsumers.clear(); - SetOffsetAffectedConsumers.clear(); if (UserActionAndTransactionEvents.empty()) { WriteInfosToTx.clear(); } @@ -896,7 +892,9 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx, StartProcessChangeOwnerRequests(ctx); } -TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -911,7 +909,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMs if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -928,7 +926,9 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); } -TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -942,7 +942,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroup if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -951,7 +951,9 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& } -TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -967,16 +969,16 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } for (auto& body : msg.Deregistrations) { if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } - return EProcessResult::Continue; + return EProcessResult::Continue; } @@ -996,7 +998,9 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para } } -TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { +TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { WriteInflightSize -= p.Msg.Data.size(); ScheduleReplyError(p.Cookie, false, InactivePartitionErrorCode, @@ -1022,7 +1026,8 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { return EProcessResult::Blocked; } } - WriteAffectedSourcesIds.insert(p.Msg.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(p.Msg.SourceId); + return EProcessResult::Continue; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 054497962c4f..4661513d9b1b 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -1520,7 +1520,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { BatchSizes.push_back(msg->BatchSize); } } else if (ev->CastAsLocal()) { - Cerr << "Got KV request\n"; + Cerr << "Got KV request" << Endl; with_lock(Lock) { HadKvRequest = true; } @@ -2187,6 +2187,9 @@ Y_UNIT_TEST_F(CorrectRange_Commit, TPartitionFixture) SendCommitTx(step, txId); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=2}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); @@ -2217,6 +2220,9 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) SendCommitTx(step, txId_1); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); @@ -2228,6 +2234,9 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_3); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_2, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); } @@ -2257,6 +2266,12 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, txId); + WaitCmdWrite({.Count=5, .UserInfos={ + {1, {.Session="session-2", .Offset=0}}, + {3, {.Session="session-1", .Offset=0}} + }}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=5, .UserInfos={ {1, {.Session="session-2", .Offset=1}}, {3, {.Session="session-1", .Offset=6}} @@ -2338,6 +2353,9 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true}); } @@ -2375,6 +2393,14 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); SendCommitTx(step, txId_1); Cerr << "Wait cmd write (initial)\n"; + WaitCmdWrite({.Count=8, + .PlanStep=step, .TxId=txId_1, + .UserInfos={ + {1, {.Consumer="client-1", .Session="session-1", .Offset=0}}, + }, + }); + + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitCmdWrite({.Count=8, .PlanStep=step, .TxId=txId_1, .UserInfos={ @@ -2752,6 +2778,7 @@ Y_UNIT_TEST_F(DataTxCalcPredicateOk, TPartitionTxTestHelper) TString data = "data for write"; SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + EmulateKVTablet(); auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); UNIT_ASSERT(ownerEvent != nullptr); auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); @@ -2838,7 +2865,7 @@ void TPartitionTxTestHelper::NonConflictingActsBatchOkTest() { WaitTxPredicateReply(tx2); WaitTxPredicateReply(tx3); - WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; + //WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; SendTxCommit(tx3); SendTxRollback(tx2); @@ -2882,7 +2909,7 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx1); SendTxRollback(tx2); @@ -2890,7 +2917,7 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { SendKvResponse(); WaitTxPredicateReply(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx3); //2 Normal writes with src1 & src4 @@ -2898,17 +2925,19 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx3); WaitTxPredicateReply(tx5); - WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message + //WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message SendTxCommit(tx5); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx5); - WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + //WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; WaitKvRequest(); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); @@ -2925,7 +2954,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx1, true); WaitWriteInfoRequest(tx2, true); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx1); ExpectNoKvRequest(); @@ -2939,7 +2968,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { AddAndSendNormalWrite("src2", 7, 12); auto tx3 = MakeAndSendWriteTx({{"src2", {12, 15}}}); Y_UNUSED(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); ExpectNoCommitDone(); @@ -2957,12 +2986,12 @@ Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx2); WaitKvRequest(); @@ -2988,7 +3017,7 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 1 completion\n"; SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); Cerr << "Expect KV request\n"; WaitKvRequest(); SendKvResponse(); @@ -3002,13 +3031,13 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 3 completion\n"; - WaitBatchCompletion(1); // Immediate Tx 2 - 4. + //WaitBatchCompletion(1); // Immediate Tx 2 - 4. Cerr << "Expect KV request\n"; WaitKvRequest(); SendKvResponse(); SendTxRollback(tx3); SendTxRollback(tx4); - WaitBatchCompletion(2); // Immediate Tx 2 - 4. + //WaitBatchCompletion(2); // Immediate Tx 2 - 4. WaitKvRequest(); SendKvResponse(); @@ -3035,7 +3064,7 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); @@ -3046,11 +3075,13 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe WaitTxPredicateReply(tx3); SendTxRollback(tx2); SendTxCommit(tx3); - WaitBatchCompletion(2); // Tx 2 & 3. + //WaitBatchCompletion(2); // Tx 2 & 3. + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx3); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); WaitKvRequest(); SendKvResponse(); WaitProxyResponse({.AlreadyWritten=true, .SeqNo=1}); @@ -3074,12 +3105,12 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxWithHead, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); Cerr << "Wait 1st KV request\n"; WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); Cerr << "Wait 2nd KV request\n"; WaitKvRequest(); SendKvResponse(); @@ -3144,7 +3175,8 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx1, true); WaitWriteInfoRequest(immTx2, true); - WaitBatchCompletion(4 + 1); + //WaitBatchCompletion(4 + 1); + EmulateKVTablet(); EmulateKVTablet(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); @@ -3158,14 +3190,15 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); + EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); ExpectNoCommitDone(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); } @@ -3177,12 +3210,12 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { auto tx = wrapper.AddTx(); wrapper.Process(); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); } @@ -3195,12 +3228,12 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitTxPredicateReply(tx); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoKvRequest(); SendTxCommit(tx); EmulateKVTablet(); WaitCommitDone(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); } { @@ -3212,12 +3245,12 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); WaitTxPredicateReply(tx); SendTxCommit(tx); ExpectNoCommitDone(); EmulateKVTablet(); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitCommitDone(tx); EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); @@ -3238,11 +3271,11 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(5 + 1); + //WaitBatchCompletion(5 + 1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateFailure(tx); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxRollback(tx); EmulateKVTablet(); @@ -3262,7 +3295,7 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(2 + 1); + //WaitBatchCompletion(2 + 1); WaitTxPredicateReply(tx); ExpectNoKvRequest(); SendTxCommit(tx); @@ -3294,12 +3327,14 @@ Y_UNIT_TEST_F(NonConflictingCommitsBatch, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(5 + 1 /*tmpTx*/); + //WaitBatchCompletion(5 + 1 /*tmpTx*/); SendTxCommit(tx1); SendTxCommit(tx2); WaitKvRequest(); SendKvResponse(); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx1); WaitCommitDone(tx2); WaitImmediateTxComplete(txImm1, false); @@ -3325,28 +3360,32 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitWriteInfoRequest(txTmp, true); WaitTxPredicateReply(txTmp); - WaitBatchCompletion(2); // txTmp + act-1 SendTxRollback(txTmp); + //WaitBatchCompletion(2); // txTmp + act-1 ExpectNoTxPredicateReply(); WaitKvRequest(); SendKvResponse(); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); // tx1 + WaitKvRequest(); + SendKvResponse(); ExpectNoTxPredicateReply(); SendTxCommit(tx1); + //WaitBatchCompletion(1); // tx1 WaitTxPredicateReply(tx2); SendTxCommit(tx2); - WaitBatchCompletion(1); // tx2 + //WaitBatchCompletion(1); // tx2 WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx2); - WaitBatchCompletion(1); // act-2 + //WaitBatchCompletion(1); // act-2 WaitKvRequest(); SendKvResponse(); @@ -3357,8 +3396,8 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(txTmp); SendTxRollback(txTmp); - WaitBatchCompletion(3); WaitKvRequest(); + //WaitBatchCompletion(3); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); @@ -3382,7 +3421,10 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1 + 1); + WaitKvRequest(); + SendKvResponse(); + //WaitBatchCompletion(1 + 1); + //WaitBatchCompletion(1); // для txTmp отправили TEvTxRollback SendTxCommit(tx1); WaitTxPredicateFailure(tx2); @@ -3405,13 +3447,15 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); ExpectNoTxPredicateReply(); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); WaitKvRequest(); SendKvResponse(); WaitTxPredicateFailure(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx3); + //WaitBatchCompletion(1); + WaitKvRequest(); //No user operatiions completed but TxId has changed which will be saved SendKvResponse(); @@ -3442,16 +3486,18 @@ Y_UNIT_TEST_F(ConflictingCommitProccesAfterRollback, TPartitionTxTestHelper) { auto tx2 = MakeAndSendTxOffsetCommit(1, 0, 3); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); WaitKvRequest(); SendKvResponse(); WaitTxPredicateReply(tx2); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx2); + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx2); @@ -3477,7 +3523,7 @@ Y_UNIT_TEST_F(TestBatchingWithChangeConfig, TPartitionTxTestHelper) { WaitBatchCompletion(1); EmulateKVTablet(); auto event = Ctx->Runtime->GrabEdgeEvent(); - WaitBatchCompletion(1); // immTx2 + //WaitBatchCompletion(1); // immTx2 EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); } @@ -3511,10 +3557,10 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) { SendCommitTx(1, proposeTxId); //ToDo - wait propose result; - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitTxDone({.TxId=proposeTxId}); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); }