diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b3923051323a..7bed9fd82f72 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1265,6 +1265,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, con } auto txIter = TransactionsInflight.begin(); + if (txIter->second->ProposeConfig) { + Y_ABORT_UNLESS(!ChangeConfig); + ChangeConfig = + MakeSimpleShared(TopicConverter, + txIter->second->ProposeConfig->Config); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + } if (ChangeConfig) { Y_ABORT_UNLESS(TransactionsInflight.size() == 1, "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, @@ -1425,7 +1432,9 @@ void TPartition::WriteInfoResponseHandler( ProcessTxsAndUserActs(ctx); } -TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) { +TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ bool isImmediate = (tx.ProposeTransaction != nullptr); Y_ABORT_UNLESS(tx.WriteInfo); Y_ABORT_UNLESS(!tx.WriteInfoApplied); @@ -1444,22 +1453,23 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) EProcessResult ret = EProcessResult::Continue; const auto& knownSourceIds = SourceIdStorage.GetInMemorySourceIds(); - THashSet txSourceIds; - for (auto& s : srcIdInfo) { + TVector txSourceIds; + for (const auto& s : srcIdInfo) { if (TxAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("TxAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } if (isImmediate) { - WriteAffectedSourcesIds.insert(s.first); + txSourceIds.push_back(s.first); + PQ_LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } else { if (WriteAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("WriteAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } - txSourceIds.insert(s.first); + txSourceIds.push_back(s.first); PQ_LOG_D("Tx " << tx.GetTxId() << " affect SourceId " << s.first); } @@ -1485,14 +1495,19 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) } } } + if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) { - TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end()); + auto& sourceIds = + (isImmediate ? affectedSourceIdsAndConsumers.WriteSourcesIds : affectedSourceIdsAndConsumers.TxWriteSourcesIds); + sourceIds = std::move(txSourceIds); + tx.WriteInfoApplied = true; - WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size(); - WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size(); - WriteKeysSizeEstimate += tx.WriteInfo->BlobsFromHead.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->BodyKeys.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->SrcIdInfo.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->BlobsFromHead.size(); + for (const auto& blob : tx.WriteInfo->BlobsFromHead) { - WriteCycleSizeEstimate += blob.GetBlobSize(); + affectedSourceIdsAndConsumers.WriteCycleSize += blob.GetBlobSize(); } } @@ -2115,22 +2130,24 @@ size_t TPartition::GetUserActCount(const TString& consumer) const } } -void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) +void TPartition::ProcessTxsAndUserActs(const TActorContext&) { if (KVWriteInProgress) { - PQ_LOG_D("Can't process txs"); + PQ_LOG_D("Writing. Can't process user action and tx events"); return; } + if (DeletePartitionState == DELETION_INITED) { if (!PersistRequest) { PersistRequest = MakeHolder(); } + ScheduleNegativeReplies(); ScheduleDeletePartitionDone(); AddCmdDeleteRangeForAllKeys(*PersistRequest); - ctx.Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); + Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); PersistRequest = nullptr; CurrentPersistRequestSpan = std::move(PersistRequestSpan); PersistRequestSpan = NWilson::TSpan(); @@ -2139,145 +2156,241 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) return; } - PQ_LOG_D("Batching state before ContinueProcessTxsAndUserActs: " << (int)BatchingState); - while (true) { - if (CanProcessUserActionAndTransactionEvents()) { - ContinueProcessTxsAndUserActs(ctx); - } - if (BatchingState == ETxBatchingState::PreProcessing) { - PQ_LOG_D("Still preprocessing - waiting for something"); - return; // Still preprocessing - waiting for something; - } - PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState); - // Preprocessing complete; - if (CurrentBatchSize > 0) { - Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); - } - CurrentBatchSize = 0; + PQ_LOG_D("Process user action and tx events"); + ProcessUserActionAndTxEvents(); + DumpTheSizeOfInternalQueues(); + if (!UserActionAndTxPendingWrite.empty()) { + PQ_LOG_D("Waiting for the batch to finish"); + return; + } - if (UserActionAndTxPendingCommit.empty()) { - // Processing stopped and nothing to commit - finalize - BatchingState = ETxBatchingState::Finishing; - } else { - // Process commit queue - ProcessCommitQueue(); - } - if (!UserActionAndTxPendingCommit.empty()) { - // Still pending for come commits - PQ_LOG_D("Still pending for come commits"); - return; - } - // Commit queue processing complete. Now can either swith to persist or continue preprocessing; - if (BatchingState == ETxBatchingState::Finishing) { // Persist required; - RunPersist(); - return; - } - BatchingState = ETxBatchingState::PreProcessing; + PQ_LOG_D("Process user action and tx pending commits"); + ProcessUserActionAndTxPendingCommits(); + DumpTheSizeOfInternalQueues(); + + if (CurrentBatchSize > 0) { + PQ_LOG_D("Batch completed (" << CurrentBatchSize << ")"); + Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); } -} + CurrentBatchSize = 0; -bool TPartition::CanProcessUserActionAndTransactionEvents() const -{ - return - (BatchingState == ETxBatchingState::PreProcessing) || - (BatchingState == ETxBatchingState::Executing); + PQ_LOG_D("Try persist"); + RunPersist(); } -void TPartition::ContinueProcessTxsAndUserActs(const TActorContext&) +void TPartition::ProcessUserActionAndTxEvents() { - Y_ABORT_UNLESS(!KVWriteInProgress); - - if (WriteCycleSizeEstimate >= MAX_WRITE_CYCLE_SIZE || WriteKeysSizeEstimate >= MAX_KEYS) { - BatchingState = ETxBatchingState::Finishing; - return; - } - auto visitor = [this](auto& event) { - return this->PreProcessUserActionOrTransaction(event); - }; - while (CanProcessUserActionAndTransactionEvents() && !UserActionAndTransactionEvents.empty()) { + while (!UserActionAndTransactionEvents.empty()) { if (ChangingConfig) { - BatchingState = ETxBatchingState::Finishing; break; } + auto& front = UserActionAndTransactionEvents.front(); if (TMessage* msg = std::get_if(&front.Event); msg && msg->WaitPreviousWriteSpan) { msg->WaitPreviousWriteSpan.End(); } + + auto visitor = [this, &front](auto& event) { + return this->ProcessUserActionAndTxEvent(event, front.AffectedSourceIdsAndConsumers); + }; switch (std::visit(visitor, front.Event)) { - case EProcessResult::Continue: - MoveUserActOrTxToCommitState(); - FirstEvent = false; - break; - case EProcessResult::ContinueDrop: - UserActionAndTransactionEvents.pop_front(); - break; - case EProcessResult::Break: - MoveUserActOrTxToCommitState(); - BatchingState = ETxBatchingState::Finishing; - FirstEvent = false; - break; - case EProcessResult::Blocked: - BatchingState = ETxBatchingState::Executing; - return; - case EProcessResult::NotReady: - return; + case EProcessResult::Continue: + MoveUserActionAndTxToPendingCommitQueue(); + break; + case EProcessResult::ContinueDrop: + UserActionAndTransactionEvents.pop_front(); + break; + case EProcessResult::Break: + MoveUserActionAndTxToPendingCommitQueue(); + break; + case EProcessResult::Blocked: + return; + case EProcessResult::NotReady: + return; } - CurrentBatchSize += 1; } - if (UserActionAndTransactionEvents.empty()) { - BatchingState = ETxBatchingState::Executing; - return; +} + +void TPartition::DumpTheSizeOfInternalQueues() const +{ + PQ_LOG_D("Events: " << UserActionAndTransactionEvents.size() << + ", PendingCommits: " << UserActionAndTxPendingCommit.size() << + ", PendingWrites: " << UserActionAndTxPendingWrite.size()); +} + +TString GetTransactionType(const TTransaction& tx) +{ + if (tx.Tx) { + return "Tx"; + } else if (tx.ProposeTransaction) { + return "ImmediateTx"; + } else if (tx.ProposeConfig) { + return "ProposeConfig"; + } else if (tx.ChangeConfig) { + return "ChangeConfig"; + } else { + return "???"; } +} +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TEvPQ::TEvSetClientInfo)"); + return PreProcessUserActionOrTransaction(event, affectedSourceIdsAndConsumers); } -void TPartition::MoveUserActOrTxToCommitState() { - auto& front = UserActionAndTransactionEvents.front(); - UserActionAndTxPendingCommit.push_back(std::move(front)); - UserActionAndTransactionEvents.pop_front(); +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TTransaction[" << GetTransactionType(*tx) << "])"); + return PreProcessUserActionOrTransaction(tx, affectedSourceIdsAndConsumers); } -void TPartition::ProcessCommitQueue() { +auto TPartition::ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TMessage)"); + return PreProcessUserActionOrTransaction(msg, affectedSourceIdsAndConsumers); +} + +bool TPartition::WritingCycleDoesNotExceedTheLimits() const +{ + return WriteCycleSizeEstimate < MAX_WRITE_CYCLE_SIZE && WriteKeysSizeEstimate < MAX_KEYS; +} + +void TPartition::ProcessUserActionAndTxPendingCommits() { CurrentBatchSize = 0; Y_ABORT_UNLESS(!KVWriteInProgress); if (!PersistRequest) { PersistRequest = MakeHolder(); } - auto visitor = [this, request=PersistRequest.Get()](auto& event) { - return this->ExecUserActionOrTransaction(event, request); - }; - while (!UserActionAndTxPendingCommit.empty()) { + + while (!UserActionAndTxPendingCommit.empty() && WritingCycleDoesNotExceedTheLimits()) { auto& front = UserActionAndTxPendingCommit.front(); auto state = ECommitState::Committed; + if (auto* tx = get_if>(&front.Event)) { state = tx->Get()->State; } + switch (state) { - case ECommitState::Pending: - return; - case ECommitState::Aborted: - break; - case ECommitState::Committed: - break; + case ECommitState::Pending: + return; + case ECommitState::Aborted: + break; + case ECommitState::Committed: + break; } - auto event = std::move(front.Event); + + UserActionAndTxPendingWrite.push_back(std::move(front)); UserActionAndTxPendingCommit.pop_front(); + + auto& event = UserActionAndTxPendingWrite.back().Event; + auto visitor = [this, request = PersistRequest.Get()](auto& event) { + return this->ProcessUserActionAndTxPendingCommit(event, request); + }; std::visit(visitor, event); + + ++CurrentBatchSize; } - if (UserActionAndTxPendingCommit.empty()) { - TxAffectedConsumers.clear(); - TxAffectedSourcesIds.clear(); - Y_ABORT_UNLESS(UserActionAndTxPendingCommit.empty()); - TransactionsInflight.clear(); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TEvPQ::TEvSetClientInfo)"); + ExecUserActionOrTransaction(event, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TTransaction[" << GetTransactionType(*tx) << "])"); + ExecUserActionOrTransaction(tx, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TMessage)"); + ExecUserActionOrTransaction(msg, request); +} + +static void AppendToSet(const TVector& p, THashMap& q) +{ + for (const auto& s : p) { + ++q[s]; } } -void TPartition::RunPersist() { - TransactionsInflight.clear(); +void TPartition::AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + AppendToSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + AppendToSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + WriteKeysSizeEstimate += affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers() +{ + if (UserActionAndTxPendingWrite.empty()) { + return; + } + + for (const auto& e : UserActionAndTxPendingWrite) { + DeleteAffectedSourceIdsAndConsumers(e.AffectedSourceIdsAndConsumers); + + if (auto* tx = std::get_if>(&e.Event); tx) { + if (auto txId = (*tx)->GetTxId(); txId.Defined()) { + TransactionsInflight.erase(*txId); + } + + if ((*tx)->ChangeConfig || (*tx)->ProposeConfig) { + ChangingConfig = false; + } + } + } - Y_ABORT_UNLESS(UserActionAndTxPendingCommit.empty()); + UserActionAndTxPendingWrite.clear(); +} + +void TPartition::DeleteFromSet(const TVector& p, THashMap& q) const +{ + for (const auto& s : p) { + auto i = q.find(s); + Y_ABORT_UNLESS(i != q.end()); + Y_ABORT_UNLESS(i->second > 0); + if (--i->second) { + continue; + } + q.erase(s); + } +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + DeleteFromSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + DeleteFromSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + Y_ABORT_UNLESS(WriteKeysSizeEstimate >= affectedSourceIdsAndConsumers.WriteKeysSize); + WriteKeysSizeEstimate -= affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::MoveUserActionAndTxToPendingCommitQueue() { + auto& front = UserActionAndTransactionEvents.front(); + AppendAffectedSourceIdsAndConsumers(front.AffectedSourceIdsAndConsumers); + UserActionAndTxPendingCommit.push_back(std::move(front)); + UserActionAndTransactionEvents.pop_front(); +} + +void TPartition::RunPersist() { const auto& ctx = ActorContext(); const auto now = ctx.Now(); if (!PersistRequest) { @@ -2405,7 +2518,7 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest() return haveChanges; } -void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) +void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const { PQ_LOG_D("=== DumpKeyValueRequest ==="); PQ_LOG_D("--- delete ----------------"); @@ -2453,7 +2566,8 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx) Replies.clear(); } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { auto span = t->CalcPredicateSpan.CreateChild(TWilsonTopic::TopicTopLevel, "Topic.Partition.PreProcess", @@ -2465,7 +2579,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::NotReady; } if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied - result = ApplyWriteInfoResponse(*t); + result = ApplyWriteInfoResponse(*t, affectedSourceIdsAndConsumers); if (!t->WriteInfoApplied) { // Tried to apply write info but couldn't - TX must be blocked. PQ_LOG_TX_D("The Tx " << t->GetTxId() << " must be blocked"); Y_ABORT_UNLESS(result != EProcessResult::Continue); @@ -2478,23 +2592,24 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::Continue; } t->Predicate.ConstructInPlace(true); - return PreProcessImmediateTx(t->ProposeTransaction->GetRecord()); + return PreProcessImmediateTx(*t, affectedSourceIdsAndConsumers); } else if (t->Tx) { // Distributed TX if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined. ReplyToProposeOrPredicate(t, true); return EProcessResult::Continue; } - result = BeginTransaction(*t->Tx, t->Predicate, t->Message); + result = BeginTransactionData(*t, affectedSourceIdsAndConsumers); if (t->Predicate.Defined()) { ReplyToProposeOrPredicate(t, true); } return result; } else if (t->ProposeConfig) { - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + PQ_LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } - t->Predicate = BeginTransaction(*t->ProposeConfig); + t->Predicate = BeginTransactionConfig(); ChangingConfig = true; PendingPartitionConfig = GetPartitionConfig(t->ProposeConfig->Config); //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); @@ -2504,7 +2619,8 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple Y_ABORT_UNLESS(t->ChangeConfig); Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig); - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + PQ_LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } ChangingConfig = true; @@ -2516,6 +2632,11 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return result; } +bool TPartition::HasPendingCommitsOrPendingWrites() const +{ + return !UserActionAndTxPendingCommit.empty() || !UserActionAndTxPendingWrite.empty(); +} + bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, TEvKeyValue::TEvRequest*) { auto span = t->CommitSpan.CreateChild(TWilsonTopic::TopicTopLevel, @@ -2535,35 +2656,35 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, case ECommitState::Committed: break; } - const auto& ctx = ActorContext(); if (t->ChangeConfig) { Y_ABORT_UNLESS(!ChangeConfig); Y_ABORT_UNLESS(ChangingConfig); ChangeConfig = t->ChangeConfig; SendChangeConfigReply = t->SendReply; - BeginChangePartitionConfig(ChangeConfig->Config, ctx); + BeginChangePartitionConfig(ChangeConfig->Config); } else if (t->ProposeConfig) { - Y_ABORT_UNLESS(ChangingConfig); - ChangeConfig = MakeSimpleShared(TopicConverter, - t->ProposeConfig->Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + Y_ABORT_UNLESS(ChangeConfig); SendChangeConfigReply = false; } CommitTransaction(t); return true; } -TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, - TMaybe& predicateOut, TString& issueMsg) +TPartition::EProcessResult TPartition::BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const TEvPQ::TEvTxCalcPredicate& tx = *t.Tx; + TMaybe& predicateOut = t.Predicate; + TString& issueMsg = t.Message; + if (tx.ForcePredicateFalse) { predicateOut = false; return EProcessResult::Continue; } - THashSet consumers; + TVector consumers; bool result = true; - for (auto& operation : tx.Operations) { + for (const auto& operation : tx.Operations) { const TString& consumer = operation.GetConsumer(); if (TxAffectedConsumers.contains(consumer)) { PQ_LOG_TX_D("TxAffectedConsumers contains consumer " << consumer << ". TxId " << tx.TxId); @@ -2651,25 +2772,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } break; } - consumers.insert(consumer); + consumers.push_back(consumer); PQ_LOG_TX_D("Tx " << tx.TxId << " affect consumer " << consumer); } } if (result) { - TxAffectedConsumers.insert(consumers.begin(), consumers.end()); + affectedSourceIdsAndConsumers.TxReadConsumers = std::move(consumers); } predicateOut = result; return EProcessResult::Continue; } -bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) +bool TPartition::BeginTransactionConfig() { - ChangeConfig = - MakeSimpleShared(TopicConverter, - event.Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); - SendChangeConfigReply = false; return true; } @@ -2822,7 +2938,6 @@ void TPartition::CommitWriteOperations(TTransaction& t) void TPartition::CommitTransaction(TSimpleSharedPtr& t) { - const auto& ctx = ActorContext(); if (t->Tx) { Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); @@ -2867,7 +2982,7 @@ void TPartition::CommitTransaction(TSimpleSharedPtr& t) } else if (t->ProposeConfig) { Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); - BeginChangePartitionConfig(t->ProposeConfig->Config, ctx); + BeginChangePartitionConfig(t->ProposeConfig->Config); ExecChangePartitionConfig(); ChangePlanStepAndTxId(t->ProposeConfig->Step, t->ProposeConfig->TxId); @@ -2899,8 +3014,7 @@ void TPartition::RollbackTransaction(TSimpleSharedPtr& t) } } -void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx) +void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config) { TSet hasReadRule; for (auto& [consumer, info] : UsersInfoStorage->GetAll()) { @@ -2914,7 +3028,7 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } } - for (auto& consumer : config.GetConsumers()) { + for (const auto& consumer : config.GetConsumers()) { auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0); TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); @@ -2929,21 +3043,22 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co auto act = MakeHolder(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); - auto res = PreProcessUserAct(*act, ctx); - ChangeConfigActs.emplace_back(std::move(act)); - + auto res = PreProcessUserAct(*act, nullptr); Y_ABORT_UNLESS(res == EProcessResult::Continue); + + ChangeConfigActs.emplace_back(std::move(act)); } hasReadRule.erase(consumer.GetName()); } - for (auto& consumer : hasReadRule) { + for (const auto& consumer : hasReadRule) { GetOrCreatePendingUser(consumer); auto act = MakeHolder(0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); - auto res = PreProcessUserAct(*act, ctx); + auto res = PreProcessUserAct(*act, nullptr); Y_ABORT_UNLESS(res == EProcessResult::Continue); + ChangeConfigActs.emplace_back(std::move(act)); } } @@ -2956,14 +3071,8 @@ void TPartition::ExecChangePartitionConfig() { } void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) { - FirstEvent = true; - TxAffectedConsumers.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - SetOffsetAffectedConsumers.clear(); - BatchingState = ETxBatchingState::PreProcessing; + DeleteAffectedSourceIdsAndConsumers(); WriteCycleSizeEstimate = 0; - WriteKeysSizeEstimate = 0; if (ChangeConfig) { EndChangePartitionConfig(std::move(ChangeConfig->Config), @@ -3048,8 +3157,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) ChangeConfig = nullptr; PendingPartitionConfig = nullptr; } - ChangingConfig = false; - BatchingState = ETxBatchingState::PreProcessing; } void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, @@ -3123,15 +3230,18 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) TxIdHasChanged = true; } -TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx) +TPartition::EProcessResult TPartition::PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const NKikimrPQ::TEvProposeTransaction& tx = t.ProposeTransaction->GetRecord(); + if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(tx.HasData()); - THashSet consumers; - for (auto& operation : tx.GetData().GetOperations()) { + TVector consumers; + for (const auto& operation : tx.GetData().GetOperations()) { if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } @@ -3158,10 +3268,10 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - consumers.insert(user); + consumers.push_back(user); } - SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); - WriteKeysSizeEstimate += consumers.size(); + affectedSourceIdsAndConsumers.ReadConsumers = std::move(consumers); + affectedSourceIdsAndConsumers.WriteKeysSize += consumers.size(); return EProcessResult::Continue; } @@ -3254,12 +3364,14 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } - return PreProcessUserAct(*act, ActorContext()); + + return PreProcessUserAct(*act, &affectedSourceIdsAndConsumers); } bool TPartition::ExecUserActionOrTransaction( @@ -3269,7 +3381,8 @@ bool TPartition::ExecUserActionOrTransaction( return true; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (WriteCycleSize >= MAX_WRITE_CYCLE_SIZE) { return EProcessResult::Blocked; @@ -3277,13 +3390,13 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessag auto result = EProcessResult::Continue; if (msg.IsWrite()) { - result = PreProcessRequest(msg.GetWrite()); + result = PreProcessRequest(msg.GetWrite(), affectedSourceIdsAndConsumers); } else if (msg.IsRegisterMessageGroup()) { - result = PreProcessRequest(msg.GetRegisterMessageGroup()); + result = PreProcessRequest(msg.GetRegisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsDeregisterMessageGroup()) { - result = PreProcessRequest(msg.GetDeregisterMessageGroup()); + result = PreProcessRequest(msg.GetDeregisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsSplitMessageGroup()) { - result = PreProcessRequest(msg.GetSplitMessageGroup()); + result = PreProcessRequest(msg.GetSplitMessageGroup(), affectedSourceIdsAndConsumers); } else { Y_ABORT_UNLESS(msg.IsOwnership()); } @@ -3320,9 +3433,9 @@ bool TPartition::ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequ return true; } -TPartition::EProcessResult TPartition::PreProcessUserAct( - TEvPQ::TEvSetClientInfo& act, const TActorContext& -) { +TPartition::EProcessResult TPartition::PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers) +{ Y_ABORT_UNLESS(!KVWriteInProgress); const TString& user = act.ClientId; @@ -3331,8 +3444,12 @@ TPartition::EProcessResult TPartition::PreProcessUserAct( return EProcessResult::Blocked; } } - WriteKeysSizeEstimate += 1; - SetOffsetAffectedConsumers.insert(user); + + if (affectedSourceIdsAndConsumers) { + ++affectedSourceIdsAndConsumers->WriteKeysSize; + affectedSourceIdsAndConsumers->ReadConsumers.push_back(user); + } + return EProcessResult::Continue; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 163ec06b7c05..15dbde885fef 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -336,11 +336,31 @@ class TPartition : public TActorBootstrapped { TAutoPtr MakeHasDataInfoResponse(ui64 lagSize, const TMaybe& cookie, bool readingFinished = false); void ProcessTxsAndUserActs(const TActorContext& ctx); - void ContinueProcessTxsAndUserActs(const TActorContext& ctx); - void ProcessCommitQueue(); void RunPersist(); - void MoveUserActOrTxToCommitState(); + enum class EProcessResult; + struct TAffectedSourceIdsAndConsumers; + + void ProcessUserActionAndTxEvents(); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + + void MoveUserActionAndTxToPendingCommitQueue(); + + void ProcessUserActionAndTxPendingCommits(); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request); + + bool WritingCycleDoesNotExceedTheLimits() const; + void PushBackDistrTx(TSimpleSharedPtr event); void PushBackDistrTx(TSimpleSharedPtr event); void PushFrontDistrTx(TSimpleSharedPtr event); @@ -422,14 +442,13 @@ class TPartition : public TActorBootstrapped { const TString& reason); THolder MakeCommitDone(ui64 step, ui64 txId); - bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event); + bool BeginTransactionConfig(); void CommitTransaction(TSimpleSharedPtr& t); void RollbackTransaction(TSimpleSharedPtr& t); - void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx); + void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config); void ExecChangePartitionConfig(); void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx); @@ -720,10 +739,10 @@ class TPartition : public TActorBootstrapped { TMaybe KafkaProducerEpoch = 0; }; - THashSet TxAffectedSourcesIds; - THashSet WriteAffectedSourcesIds; - THashSet TxAffectedConsumers; - THashSet SetOffsetAffectedConsumers; + THashMap TxAffectedSourcesIds; + THashMap WriteAffectedSourcesIds; + THashMap TxAffectedConsumers; + THashMap SetOffsetAffectedConsumers; THashMap TxSourceIdForPostPersist; THashMap TxInflightMaxSeqNoPerSourceId; @@ -778,45 +797,61 @@ class TPartition : public TActorBootstrapped { TMaybe UsersInfoStorage; - // template T& GetUserActionAndTransactionEventsFront(); - // template T& GetCurrentEvent(); - //TSimpleSharedPtr& GetCurrentTransaction(); + struct TAffectedSourceIdsAndConsumers { + TVector TxWriteSourcesIds; + TVector WriteSourcesIds; + TVector TxReadConsumers; + TVector ReadConsumers; + ui32 WriteKeysSize = 0; + ui32 WriteCycleSize = 0; + }; + + void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx); - EProcessResult PreProcessUserActionOrTransaction(TMessage& msg); + void DeleteAffectedSourceIdsAndConsumers(); + void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + void DeleteFromSet(const TVector& p, THashMap& q) const; - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, TEvKeyValue::TEvRequest* request); + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, - TEvKeyValue::TEvRequest* request); + bool ExecUserActionOrTransaction(TSimpleSharedPtr& events, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TSimpleSharedPtr& tx, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx); + [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers); void CommitUserAct(TEvPQ::TEvSetClientInfo& act); - [[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx); + [[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecImmediateTx(TTransaction& tx); - EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TWriteMsg& msg); + EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TWriteMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, - TMaybe& predicate, TString& issueMsg); + [[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - EProcessResult ApplyWriteInfoResponse(TTransaction& tx); + EProcessResult ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool FirstEvent = true; bool HaveWriteMsg = false; bool HaveData = false; bool HaveCheckDisk = false; @@ -837,13 +872,12 @@ class TPartition : public TActorBootstrapped { void BeginAppendHeadWithNewWrites(const TActorContext& ctx); void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + bool HasPendingCommitsOrPendingWrites() const; + // // user actions and transactions // struct TUserActionAndTransactionEvent { - std::variant, // user actions - TSimpleSharedPtr, // distributed transaction or update config - TMessage> Event; TUserActionAndTransactionEvent(TSimpleSharedPtr&& transaction) : Event(std::move(transaction)) {} @@ -853,10 +887,16 @@ class TPartition : public TActorBootstrapped { TUserActionAndTransactionEvent(TMessage&& message) : Event(std::move(message)) {} + + std::variant, // user actions + TSimpleSharedPtr, // distributed transaction or update config + TMessage> Event; + TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers; }; std::deque UserActionAndTransactionEvents; std::deque UserActionAndTxPendingCommit; + std::deque UserActionAndTxPendingWrite; TVector> WriteInfosApplied; THashMap> TransactionsInflight; @@ -879,15 +919,6 @@ class TPartition : public TActorBootstrapped { TMessageQueue Responses; ui64 CurrentBatchSize = 0; - enum class ETxBatchingState{ - PreProcessing, - Executing, - Finishing - }; - ETxBatchingState BatchingState = ETxBatchingState::PreProcessing; - // - // - // std::deque> UpdateUserInfoTimestamp; bool ReadingTimestamp; TString ReadingForUser; @@ -1071,15 +1102,15 @@ class TPartition : public TActorBootstrapped { size_t WriteNewSizeFromSupportivePartitions = 0; bool TryAddDeleteHeadKeysToPersistRequest(); - void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request); + void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const; TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key); + void DumpTheSizeOfInternalQueues() const; TIntrusivePtr SamplingControl; TDeque TxForPersistTraceIds; TDeque TxForPersistSpans; - bool CanProcessUserActionAndTransactionEvents() const; }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 3d4d94aa392e..3aac92d28133 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -568,10 +568,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { TxSourceIdForPostPersist.clear(); TxInflightMaxSeqNoPerSourceId.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - TxAffectedConsumers.clear(); - SetOffsetAffectedConsumers.clear(); if (UserActionAndTransactionEvents.empty()) { WriteInfosToTx.clear(); } @@ -981,7 +977,9 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx, StartProcessChangeOwnerRequests(ctx); } -TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -996,7 +994,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMs if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -1013,7 +1011,9 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); } -TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -1027,7 +1027,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroup if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -1036,7 +1036,9 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& } -TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -1052,16 +1054,16 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } for (auto& body : msg.Deregistrations) { if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } - return EProcessResult::Continue; + return EProcessResult::Continue; } @@ -1081,7 +1083,9 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para } } -TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { +TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { WriteInflightSize -= p.Msg.Data.size(); ScheduleReplyError(p.Cookie, false, InactivePartitionErrorCode, @@ -1106,7 +1110,8 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { return EProcessResult::Blocked; } } - WriteAffectedSourcesIds.insert(p.Msg.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(p.Msg.SourceId); + return EProcessResult::Continue; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 566398a44a9a..48dcd47cdacf 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -1510,7 +1510,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { BatchSizes.push_back(msg->BatchSize); } } else if (auto* msg = ev->CastAsLocal()) { - Cerr << "Got KV request\n"; + Cerr << "Got KV request" << Endl; with_lock(Lock) { HadKvRequest = true; } @@ -2174,6 +2174,9 @@ Y_UNIT_TEST_F(CorrectRange_Commit, TPartitionFixture) SendCommitTx(step, txId); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=2}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); @@ -2204,13 +2207,22 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) SendCommitTx(step, txId_1); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_2); WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_3); - WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_2, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); @@ -2236,6 +2248,9 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-1", .Offset=3}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=2, .UserInfos={{1, {.Session="session-2", .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK}); WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true}); @@ -2316,9 +2331,21 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) SendCalcPredicate(step, txId_1, client, 0, 2); WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); + WaitCmdWrite({ + .PlanStep=step, .TxId=txId_1, + .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}} + }); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + SendCalcPredicate(step, txId_2, client, 0, 5); SendRollbackTx(step, txId_1); + WaitCmdWrite({ + .PlanStep=step, .TxId=txId_1, + .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}} + }); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true}); } @@ -2354,6 +2381,10 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) SendCalcPredicate(step, txId_2, "client-2", 0, 2); WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); + WaitCmdWrite({ + .UserInfos={{1, {.Consumer="client-1", .Session="session-1", .Offset=0}}} + }); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); SendCommitTx(step, txId_1); Cerr << "Wait cmd write (initial)\n"; WaitCmdWrite({.Count=8, @@ -2362,8 +2393,8 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) {1, {.Consumer="client-1", .Session="session-1", .Offset=2}}, }, }); - SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + Cerr << "Wait commit 1 done\n"; WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); @@ -2387,6 +2418,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) {0, {.Partition=3, .Consumer="client-2"}} }}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + Cerr << "Wait config changed\n"; WaitPartitionConfigChanged({.Partition=TPartitionId(partition)}); @@ -2394,6 +2426,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) // consumer 'client-2' was deleted // WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); + SendRollbackTx(step, txId_2); } @@ -2733,6 +2766,7 @@ Y_UNIT_TEST_F(DataTxCalcPredicateOk, TPartitionTxTestHelper) TString data = "data for write"; SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + EmulateKVTablet(); auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); UNIT_ASSERT(ownerEvent != nullptr); auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); @@ -2819,7 +2853,7 @@ void TPartitionTxTestHelper::NonConflictingActsBatchOkTest() { WaitTxPredicateReply(tx2); WaitTxPredicateReply(tx3); - WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; + //WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; SendTxCommit(tx3); SendTxRollback(tx2); @@ -2863,31 +2897,37 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx1); SendTxRollback(tx2); - ExpectNoKvRequest(); + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx3); //2 Normal writes with src1 & src4 - WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message ExpectNoTxPredicateReply(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx3); WaitTxPredicateReply(tx5); - WaitBatchCompletion(1); + //WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message SendTxCommit(tx5); - WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx5); + + //WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + WaitKvRequest(); + SendKvResponse(); WaitImmediateTxComplete(immTx1, true); } @@ -2902,7 +2942,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx1, true); WaitWriteInfoRequest(tx2, true); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx1); ExpectNoKvRequest(); @@ -2916,7 +2956,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { AddAndSendNormalWrite("src2", 7, 12); auto tx3 = MakeAndSendWriteTx({{"src2", {12, 15}}}); Y_UNUSED(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); ExpectNoCommitDone(); @@ -2934,13 +2974,12 @@ Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); - ExpectNoKvRequest(); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx2); WaitKvRequest(); @@ -2966,27 +3005,28 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 1 completion\n"; SendTxCommit(tx1); - WaitBatchCompletion(1); - Cerr << "Expect no KV request\n"; - ExpectNoKvRequest(); + //WaitBatchCompletion(1); + Cerr << "Expect KV request\n"; + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx2); SendTxCommit(tx2); - Cerr << "Waif or tx 3 predicate failure\n"; + Cerr << "Wait for tx 3 predicate failure\n"; WaitTxPredicateFailure(tx3); - Cerr << "Waif or tx 4 predicate failure\n"; + Cerr << "Wait for tx 4 predicate failure\n"; WaitTxPredicateFailure(tx4); Cerr << "Wait batch of 3 completion\n"; - WaitBatchCompletion(1); // Immediate Tx 2 - 4. - Cerr << "Expect no KV request\n"; - ExpectNoKvRequest(); + //WaitBatchCompletion(1); // Immediate Tx 2 - 4. + Cerr << "Expect KV request\n"; + WaitKvRequest(); + SendKvResponse(); SendTxRollback(tx3); SendTxRollback(tx4); - WaitBatchCompletion(2); // Immediate Tx 2 - 4. + //WaitBatchCompletion(2); // Immediate Tx 2 - 4. - ExpectNoCommitDone(); WaitKvRequest(); SendKvResponse(); Cerr << "Wait for commits\n"; @@ -3012,20 +3052,24 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); + + WaitKvRequest(); + SendKvResponse(); + + WaitCommitDone(tx1); - ExpectNoKvRequest(); WaitTxPredicateFailure(tx2); WaitTxPredicateReply(tx3); SendTxRollback(tx2); SendTxCommit(tx3); - WaitBatchCompletion(2); // Tx 2 & 3. - ExpectNoCommitDone(); + //WaitBatchCompletion(2); // Tx 2 & 3. + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); - WaitCommitDone(tx1); WaitCommitDone(tx3); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); WaitKvRequest(); SendKvResponse(); WaitProxyResponse({.AlreadyWritten=true, .SeqNo=1}); @@ -3049,12 +3093,12 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxWithHead, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); Cerr << "Wait 1st KV request\n"; WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); Cerr << "Wait 2nd KV request\n"; WaitKvRequest(); SendKvResponse(); @@ -3119,11 +3163,14 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx1, true); WaitWriteInfoRequest(immTx2, true); - WaitBatchCompletion(4 + 1); + //WaitBatchCompletion(4 + 1); + EmulateKVTablet(); EmulateKVTablet(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); + EmulateKVTablet(); } + { // 2. ImmTx -> WriteTx = KVRequest ResetBatchCompletion(); @@ -3133,17 +3180,19 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); + EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); ExpectNoCommitDone(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); } + { // 3. NormWrite -> WriteTx = KVRequest ResetBatchCompletion(); @@ -3152,15 +3201,17 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { auto tx = wrapper.AddTx(); wrapper.Process(); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); + EmulateKVTablet(); EmulateKVTablet(); WaitCommitDone(tx); } + { // 4. WriteTx -> NormWrite = 2 batches ResetBatchCompletion(); @@ -3170,13 +3221,15 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitTxPredicateReply(tx); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoKvRequest(); SendTxCommit(tx); - WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); + //WaitBatchCompletion(1); + EmulateKVTablet(); } + { // 5. WriteTx -> ImmTx = 2 batches ResetBatchCompletion(); @@ -3186,17 +3239,18 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); ExpectNoCommitDone(); EmulateKVTablet(); + //WaitBatchCompletion(1); WaitCommitDone(tx); + EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); } - } + Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { Init({.WriterSessions={"src1", "src2"}, .EndOffset = 1}); // Failed WriteTx doesn't block @@ -3212,11 +3266,11 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(5 + 1); + //WaitBatchCompletion(5 + 1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateFailure(tx); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxRollback(tx); EmulateKVTablet(); @@ -3236,7 +3290,7 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(2 + 1); + //WaitBatchCompletion(2 + 1); WaitTxPredicateReply(tx); ExpectNoKvRequest(); SendTxCommit(tx); @@ -3268,12 +3322,14 @@ Y_UNIT_TEST_F(NonConflictingCommitsBatch, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(5 + 1 /*tmpTx*/); + //WaitBatchCompletion(5 + 1 /*tmpTx*/); SendTxCommit(tx1); SendTxCommit(tx2); WaitKvRequest(); SendKvResponse(); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx1); WaitCommitDone(tx2); WaitImmediateTxComplete(txImm1, false); @@ -3290,39 +3346,43 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { //Just block processing so every message arrives before batching starts auto txTmp = MakeAndSendWriteTx({}); - MakeAndSendNormalOffsetCommit(1, 2); + MakeAndSendNormalOffsetCommit(1, 2); // act-1 auto tx1 = MakeAndSendTxOffsetCommit(1, 2, 5); auto tx2 = MakeAndSendTxOffsetCommit(1, 5, 10); - MakeAndSendNormalOffsetCommit(1, 20); + MakeAndSendNormalOffsetCommit(1, 20); // act-2 ResetBatchCompletion(); WaitWriteInfoRequest(txTmp, true); WaitTxPredicateReply(txTmp); - WaitBatchCompletion(2); SendTxRollback(txTmp); + //WaitBatchCompletion(2); // txTmp + act-1 ExpectNoTxPredicateReply(); WaitKvRequest(); SendKvResponse(); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + WaitKvRequest(); + SendKvResponse(); ExpectNoTxPredicateReply(); SendTxCommit(tx1); - ExpectNoKvRequest(); + //WaitBatchCompletion(1); // tx1 WaitTxPredicateReply(tx2); - WaitBatchCompletion(1); SendTxCommit(tx2); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); // tx2 WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx2); - + //WaitBatchCompletion(1); // act-2 + WaitKvRequest(); + SendKvResponse(); txTmp = MakeAndSendWriteTx({}); auto immTx1 = MakeAndSendImmediateTxOffsetCommit(2, 0, 5); @@ -3331,8 +3391,8 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(txTmp); SendTxRollback(txTmp); - WaitBatchCompletion(2 + 1); WaitKvRequest(); + //WaitBatchCompletion(3); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); @@ -3356,7 +3416,7 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1 + 1); + //WaitBatchCompletion(1 + 1); SendTxCommit(tx1); WaitTxPredicateFailure(tx2); @@ -3379,11 +3439,11 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); ExpectNoTxPredicateReply(); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); WaitKvRequest(); SendKvResponse(); WaitTxPredicateFailure(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx3); WaitKvRequest(); //No user operatiions completed but TxId has changed which will be saved @@ -3416,15 +3476,18 @@ Y_UNIT_TEST_F(ConflictingCommitProccesAfterRollback, TPartitionTxTestHelper) { auto tx2 = MakeAndSendTxOffsetCommit(1, 0, 3); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); - ExpectNoKvRequest(); + WaitKvRequest(); + SendKvResponse(); WaitTxPredicateReply(tx2); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx2); + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx2); @@ -3450,7 +3513,7 @@ Y_UNIT_TEST_F(TestBatchingWithChangeConfig, TPartitionTxTestHelper) { WaitBatchCompletion(1); EmulateKVTablet(); auto event = Ctx->Runtime->GrabEdgeEvent(); - WaitBatchCompletion(1); // immTx2 + //WaitBatchCompletion(1); // immTx2 EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); } @@ -3484,16 +3547,14 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) { SendCommitTx(1, proposeTxId); //ToDo - wait propose result; - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitTxDone({.TxId=proposeTxId}); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); } - - Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) { auto* actor = CreatePartition({ .Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},