From 3a92574e633264dbead9c2db0b32f404ef6af134 Mon Sep 17 00:00:00 2001 From: alexnick88 Date: Fri, 7 Nov 2025 18:09:38 +0300 Subject: [PATCH 1/2] Changes for Ya.Metrika (#28381) --- ydb/core/persqueue/events/internal.h | 7 +++- ydb/core/persqueue/partition.cpp | 57 ++++++++++++++++++++++---- ydb/core/persqueue/partition.h | 10 +++-- ydb/core/persqueue/pq_impl.cpp | 52 ++++++++++++++++++----- ydb/core/persqueue/pq_impl.h | 4 ++ ydb/core/persqueue/transaction.cpp | 8 ++++ ydb/core/persqueue/transaction.h | 2 + ydb/core/persqueue/ut/partition_ut.cpp | 16 -------- ydb/core/protos/counters_pq.proto | 2 + ydb/core/protos/pqconfig.proto | 3 ++ 10 files changed, 122 insertions(+), 39 deletions(-) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 5440326f145d..62e229743507 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -861,11 +861,13 @@ struct TEvPQ { }; struct TEvTxCalcPredicateResult : public TEventLocal { - TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe predicate) : + TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe predicate, + const TString& issueMsg) : Step(step), TxId(txId), Partition(partition), - Predicate(predicate) + Predicate(predicate), + IssueMsg(issueMsg) { } @@ -873,6 +875,7 @@ struct TEvPQ { ui64 TxId; NPQ::TPartitionId Partition; TMaybe Predicate; + TString IssueMsg; }; struct TEvProposePartitionConfig : public TEventLocal { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 468e5f904fe8..4aed5257680a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -125,6 +125,24 @@ static const ui32 MAX_KEYS = 10000; static const ui32 MAX_TXS = 1000; static const ui32 MAX_WRITE_CYCLE_SIZE = 16_MB; +TStringBuilder MakeTxWriteErrorMessage(TMaybe txId, + TStringBuf topicName, const TPartitionId& partitionId, + TStringBuf sourceId, ui64 seqNo) +{ + TStringBuilder ss; + ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", SourceId '" << EscapeC(sourceId) << "', SeqNo " << seqNo << "] "; + return ss; +} + +TStringBuilder MakeTxReadErrorMessage(TMaybe txId, + TStringBuf topicName, const TPartitionId& partitionId, + TStringBuf consumer) +{ + TStringBuilder ss; + ss << "[TxId: " << txId << ", Topic: '" << topicName << "', Partition " << partitionId << ", Consumer '" << consumer << "] "; + return ss; +} + auto GetStepAndTxId(ui64 step, ui64 txId) { return std::make_pair(step, txId); @@ -1205,7 +1223,8 @@ void TPartition::ProcessPendingEvent(std::unique_ptr MakeHolder(ev->Step, ev->TxId, Partition, - Nothing()).Release()); + Nothing(), + TString()).Release()); return; } } @@ -1451,7 +1470,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) if (auto inFlightIter = TxInflightMaxSeqNoPerSourceId.find(s.first); !inFlightIter.IsEnd()) { if (SeqnoViolation(inFlightIter->second.KafkaProducerEpoch, inFlightIter->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) { tx.Predicate = false; - tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first; + tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, inFlightIter->second.SeqNo) << + "MinSeqNo violation failure. " << + "SeqNo " << s.second.MinSeqNo); tx.WriteInfoApplied = true; break; } @@ -1460,7 +1481,9 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) if (auto existing = knownSourceIds.find(s.first); !existing.IsEnd()) { if (SeqnoViolation(existing->second.ProducerEpoch, existing->second.SeqNo, s.second.ProducerEpoch, s.second.MinSeqNo)) { tx.Predicate = false; - tx.Message = TStringBuilder() << "MinSeqNo violation failure on " << s.first; + tx.Message = (MakeTxWriteErrorMessage(tx.GetTxId(), TopicName(), Partition, s.first, existing->second.SeqNo) << + "MinSeqNo violation failure. " << + "SeqNo " << s.second.MinSeqNo); tx.WriteInfoApplied = true; break; } @@ -1515,6 +1538,8 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, bool isPredicate) { if (isPredicate) { + TabletCounters.Percentile()[COUNTER_LATENCY_PQ_TXCALCPREDICATE].IncrementFor((Now() - tx->CalcPredicateTimestamp).MilliSeconds()); + tx->CalcPredicateSpan.End(); tx->CalcPredicateSpan = {}; @@ -1531,7 +1556,8 @@ void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr& tx, b MakeHolder(tx->Tx->Step, tx->Tx->TxId, Partition, - *tx->Predicate).Release()); + *tx->Predicate, + tx->Message).Release()); } else { auto insRes = TransactionsInflight.emplace(tx->ProposeConfig->TxId, tx); Y_ABORT_UNLESS(insRes.second); @@ -2026,7 +2052,7 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& void TPartition::PushBackDistrTx(TSimpleSharedPtr event) { - UserActionAndTransactionEvents.emplace_back(MakeSimpleShared(std::move(event))); + UserActionAndTransactionEvents.emplace_back(MakeSimpleShared(std::move(event), Now())); RequestWriteInfoIfRequired(); } @@ -2457,7 +2483,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple ReplyToProposeOrPredicate(t, true); return EProcessResult::Continue; } - result = BeginTransaction(*t->Tx, t->Predicate); + result = BeginTransaction(*t->Tx, t->Predicate, t->Message); if (t->Predicate.Defined()) { ReplyToProposeOrPredicate(t, true); } @@ -2525,7 +2551,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, return true; } -TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe& predicateOut) +TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, + TMaybe& predicateOut, TString& issueMsg) { if (tx.ForcePredicateFalse) { predicateOut = false; @@ -2548,6 +2575,8 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "' has been removed"); + issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) << + "Consumer has been removed"); result = false; break; } @@ -2555,6 +2584,8 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr if (!UsersInfoStorage->GetIfExists(consumer)) { PQ_LOG_D("Partition " << Partition << " Unknown consumer '" << consumer << "'"); + issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) << + "Unknown consumer"); result = false; break; } @@ -2583,6 +2614,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr " Bad request (invalid range) " << " Begin " << operation.GetCommitOffsetsBegin() << " End " << operation.GetCommitOffsetsEnd()); + issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) << + "Invalid range. " << + "Range begin " << operation.GetCommitOffsetsBegin() << + ", range end " << operation.GetCommitOffsetsEnd()); result = false; } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { PQ_LOG_D("Partition " << Partition << @@ -2590,6 +2625,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr " Bad request (gap) " << " Offset " << userInfo.Offset << " Begin " << operation.GetCommitOffsetsBegin()); + issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) << + "Gap. " << + "Offset " << userInfo.Offset << + ", range begin " << operation.GetCommitOffsetsBegin()); result = false; } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { PQ_LOG_D("Partition " << Partition << @@ -2597,6 +2636,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr " Bad request (behind the last offset) " << " EndOffset " << EndOffset << " End " << operation.GetCommitOffsetsEnd()); + issueMsg = (MakeTxReadErrorMessage(tx.TxId, TopicName(), Partition, consumer) << + "Behind the last offset. " << + "Partition end offset " << EndOffset << + ", range end " << operation.GetCommitOffsetsEnd()); result = false; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index a343746c9a61..163ec06b7c05 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -63,12 +63,14 @@ class TPartitionCompaction; struct TTransaction { - explicit TTransaction(TSimpleSharedPtr tx, - TMaybe predicate = Nothing()) + TTransaction(TSimpleSharedPtr tx, + TInstant calcPredicateTimestamp, + TMaybe predicate = Nothing()) : Tx(tx) , Predicate(predicate) , SupportivePartitionActor(tx->SupportivePartitionActor) , CalcPredicateSpan(std::move(tx->Span)) + , CalcPredicateTimestamp(calcPredicateTimestamp) { Y_ABORT_UNLESS(Tx); } @@ -128,6 +130,7 @@ struct TTransaction { NWilson::TSpan CommitSpan; TInstant WriteInfoResponseTimestamp; + TInstant CalcPredicateTimestamp; }; class TPartitionCompaction; @@ -808,7 +811,8 @@ class TPartition : public TActorBootstrapped { void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, TMaybe& predicate); + [[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, + TMaybe& predicate, TString& issueMsg); EProcessResult ApplyWriteInfoResponse(TTransaction& tx); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index fa51bc6a45a0..f9b9d522b38e 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3304,7 +3304,7 @@ void TPersQueue::DeleteExpiredTransactions(const TActorContext& ctx) for (auto& [txId, tx] : Txs) { if ((tx.MaxStep < step) && (tx.State <= NKikimrPQ::TTransaction::PREPARED)) { - DeleteTx(tx); + BeginDeleteTransaction(ctx, tx, NKikimrPQ::TTransaction::EXPIRED); } } @@ -3366,6 +3366,15 @@ void TPersQueue::SetTxInFlyCounter() Counters->Simple()[COUNTER_PQ_TABLET_TX_IN_FLY] = Txs.size(); } +void TPersQueue::BeginDeleteTransaction(const TActorContext& ctx, + TDistributedTransaction& tx, + NKikimrPQ::TTransaction::EState state) +{ + BeginDeletePartitions(tx); + ChangeTxState(tx, state); + CheckTxState(ctx, tx); +} + void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx) { if (!InitCompleted) { @@ -3376,12 +3385,12 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record; Y_ABORT_UNLESS(event.HasTxId()); - PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for tx " << event.GetTxId()); + PQ_LOG_TX_W("Handle TEvPersQueue::TEvCancelTransactionProposal for TxId " << event.GetTxId()); if (auto tx = GetTransaction(ctx, event.GetTxId()); tx) { Y_ABORT_UNLESS(tx->State <= NKikimrPQ::TTransaction::PREPARED); - DeleteTx(*tx); + BeginDeleteTransaction(ctx, *tx, NKikimrPQ::TTransaction::CANCELED); TryWriteTxs(ctx); } @@ -4407,6 +4416,11 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx, result->Record.SetTxId(tx.TxId); result->Record.SetStep(tx.Step); + if (tx.Error.Defined() && tx.Error->GetKind() != NKikimrPQ::TError::OK) { + auto* error = result->Record.MutableErrors()->Add(); + *error = *tx.Error; + } + PQ_LOG_TX_D("TxId: " << tx.TxId << " send TEvPersQueue::TEvProposeTransactionResult(" << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(result->Record.GetStatus()) << @@ -4768,9 +4782,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED); - PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId); - BeginDeletePartitions(tx); - TryChangeTxState(tx, NKikimrPQ::TTransaction::EXECUTED); } @@ -4792,6 +4803,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, SendEvReadSetAckToSenders(ctx, tx); + PQ_LOG_TX_I("delete partitions for TxId " << tx.TxId); + BeginDeletePartitions(tx); + TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS_ACKS); [[fallthrough]]; @@ -4808,7 +4822,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, break; - case NKikimrPQ::TTransaction::DELETING: + case NKikimrPQ::TTransaction::DELETING: { // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one. TMaybe writeId = tx.WriteId; // copy writeId to save for kafka transaction after erase DeleteWriteId(writeId); @@ -4823,6 +4837,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, TryContinueKafkaWrites(writeId, ctx); break; } + + case NKikimrPQ::TTransaction::EXPIRED: + case NKikimrPQ::TTransaction::CANCELED: + PQ_LOG_TX_D("AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)); + if (AllSupportivePartitionsHaveBeenDeleted(tx.WriteId)) { + DeleteTx(tx); + // implicitly switch to the state DELETING + } + + break; + + } } bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted(const TMaybe& writeId) const @@ -5314,11 +5340,13 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con // the actor's ID could have changed from the moment he sent the TEvProposeTransaction. you need to // update the actor ID in the transaction // - // if the transaction has progressed beyond WAIT_RS, then a response has been sent to the sender + // if the transaction has progressed beyond EXECUTED, then a response has been sent to the sender // + status = NKikimrProto::OK; + tx->SourceActor = ev->Sender; - if (tx->State <= NKikimrPQ::TTransaction::WAIT_RS) { - status = NKikimrProto::OK; + if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) { + SendEvProposeTransactionResult(ctx, *tx); } } @@ -5448,7 +5476,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon } if (writeInfo.TxId.Defined()) { if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) { - if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) { + if ((tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) || + (tx->State == NKikimrPQ::TTransaction::EXPIRED) || + (tx->State == NKikimrPQ::TTransaction::CANCELED)) { TryExecuteTxs(ctx, *tx); } } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index af17db6403d9..005a5ae8686c 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -595,6 +595,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx); + void BeginDeleteTransaction(const TActorContext& ctx, + TDistributedTransaction& tx, + NKikimrPQ::TTransaction::EState state); + TIntrusivePtr SamplingControl; NWilson::TSpan WriteTxsSpan; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 42bc6d082cd8..b8e45eefb8ea 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -248,6 +248,14 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred } OnPartitionResult(event, decision); + + if (!event.IssueMsg.empty()) { + NKikimrPQ::TError error; + error.SetKind(NKikimrPQ::TError::BAD_REQUEST); + error.SetReason(event.IssueMsg); + + Error = std::move(error); + } } void UpdatePartitionsData(NKikimrPQ::TPartitions& partitionsData, NKikimrPQ::TPartitions::TPartitionInfo& partition) { diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 1ec27a9db473..009f5fe44f4b 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -109,6 +109,8 @@ struct TDistributedTransaction { bool Pending = false; + TMaybe Error; + void SetExecuteSpan(NWilson::TSpan&& span); void EndExecuteSpan(); NWilson::TSpan CreatePlanStepSpan(ui64 tabletId, ui64 step); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 891bd4ddb16b..e4e0bc5b01aa 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -282,11 +282,6 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { void SendChangePartitionConfig(const TConfigParams& config = {}); void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {}); - TTransaction MakeTransaction(ui64 step, ui64 txId, - TString consumer, - ui64 begin, ui64 end, - TMaybe predicate = Nothing()); - void SendSubDomainStatus(bool subDomainOutOfSpace = false); void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false); void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true); @@ -1099,17 +1094,6 @@ void TPartitionFixture::WaitPartitionConfigChanged(const TChangePartitionConfigM } } -TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId, - TString consumer, - ui64 begin, ui64 end, - TMaybe predicate) -{ - auto event = MakeSimpleShared(step, txId); - event->AddOperation(std::move(consumer), begin, end); - - return TTransaction(event, predicate); -} - template void CompareVectors(const TVector& expected, const TIterable& actual) { auto i = 0u; diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 6aab74eb959f..ba05f46dedcf 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -141,6 +141,8 @@ enum EPercentileCounters { COUNTER_LATENCY_PQ_SPLIT_MESSAGE_GROUP = 19 [(CounterOpts) = {Name: "LatencySplitMessageGroup"}]; COUNTER_LATENCY_PQ_PUBLISH_READ = 20 [(CounterOpts) = {Name: "LatencyPublishRead"}]; COUNTER_LATENCY_PQ_FORGET_READ = 21 [(CounterOpts) = {Name: "LatencyForgetRead"}]; + + COUNTER_LATENCY_PQ_TXCALCPREDICATE = 22 [(CounterOpts) = {Name: "LatencyTxCalcPredicate"}]; } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 89fc8aa709eb..ed87fa01bea8 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -1144,6 +1144,9 @@ message TTransaction { EXECUTED = 9; // persist WAIT_RS_ACKS = 11; DELETING = 10; + + EXPIRED = 12; + CANCELED = 13; }; message TPredicateReceived { From 7d6a912d374e547dbf005cba53c54a29631ffde9 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Sun, 9 Nov 2025 05:08:52 +0000 Subject: [PATCH 2/2] Optimizing the calculation of the transaction predicate (#28423) --- ydb/core/persqueue/partition.cpp | 441 ++++++++++++++++--------- ydb/core/persqueue/partition.h | 119 ++++--- ydb/core/persqueue/partition_write.cpp | 33 +- ydb/core/persqueue/ut/partition_ut.cpp | 147 ++++++--- 4 files changed, 472 insertions(+), 268 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 4aed5257680a..c9fa2725cca1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1265,6 +1265,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr ev, con } auto txIter = TransactionsInflight.begin(); + if (txIter->second->ProposeConfig) { + Y_ABORT_UNLESS(!ChangeConfig); + ChangeConfig = + MakeSimpleShared(TopicConverter, + txIter->second->ProposeConfig->Config); + PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + } if (ChangeConfig) { Y_ABORT_UNLESS(TransactionsInflight.size() == 1, "PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64, @@ -1429,7 +1436,9 @@ void TPartition::WriteInfoResponseHandler( ProcessTxsAndUserActs(ctx); } -TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) { +TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ bool isImmediate = (tx.ProposeTransaction != nullptr); Y_ABORT_UNLESS(tx.WriteInfo); Y_ABORT_UNLESS(!tx.WriteInfoApplied); @@ -1448,22 +1457,23 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) EProcessResult ret = EProcessResult::Continue; const auto& knownSourceIds = SourceIdStorage.GetInMemorySourceIds(); - THashSet txSourceIds; - for (auto& s : srcIdInfo) { + TVector txSourceIds; + for (const auto& s : srcIdInfo) { if (TxAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("TxAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } if (isImmediate) { - WriteAffectedSourcesIds.insert(s.first); + txSourceIds.push_back(s.first); + PQ_LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } else { if (WriteAffectedSourcesIds.contains(s.first)) { PQ_LOG_TX_D("WriteAffectedSourcesIds contains SourceId " << s.first << ". TxId " << tx.GetTxId()); ret = EProcessResult::Blocked; break; } - txSourceIds.insert(s.first); + txSourceIds.push_back(s.first); PQ_LOG_D("TxId " << tx.GetTxId() << " affect SourceId " << s.first); } @@ -1489,14 +1499,19 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx) } } } + if (ret == EProcessResult::Continue && tx.Predicate.GetOrElse(true)) { - TxAffectedSourcesIds.insert(txSourceIds.begin(), txSourceIds.end()); + auto& sourceIds = + (isImmediate ? affectedSourceIdsAndConsumers.WriteSourcesIds : affectedSourceIdsAndConsumers.TxWriteSourcesIds); + sourceIds = std::move(txSourceIds); + tx.WriteInfoApplied = true; - WriteKeysSizeEstimate += tx.WriteInfo->BodyKeys.size(); - WriteKeysSizeEstimate += tx.WriteInfo->SrcIdInfo.size(); - WriteKeysSizeEstimate += tx.WriteInfo->BlobsFromHead.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->BodyKeys.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->SrcIdInfo.size(); + affectedSourceIdsAndConsumers.WriteKeysSize += tx.WriteInfo->BlobsFromHead.size(); + for (const auto& blob : tx.WriteInfo->BlobsFromHead) { - WriteCycleSizeEstimate += blob.GetBlobSize(); + affectedSourceIdsAndConsumers.WriteCycleSize += blob.GetBlobSize(); } } @@ -2119,22 +2134,24 @@ size_t TPartition::GetUserActCount(const TString& consumer) const } } -void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) +void TPartition::ProcessTxsAndUserActs(const TActorContext&) { if (KVWriteInProgress) { - PQ_LOG_D("Writing. Can't process transactions and user actions"); + PQ_LOG_D("Writing. Can't process user action and tx events"); return; } + if (DeletePartitionState == DELETION_INITED) { if (!PersistRequest) { PersistRequest = MakeHolder(); } + ScheduleNegativeReplies(); ScheduleDeletePartitionDone(); AddCmdDeleteRangeForAllKeys(*PersistRequest); - ctx.Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); + Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); PersistRequest = nullptr; CurrentPersistRequestSpan = std::move(PersistRequestSpan); PersistRequestSpan = NWilson::TSpan(); @@ -2143,139 +2160,241 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) return; } - PQ_LOG_D("Batching state before ContinueProcessTxsAndUserActs: " << (int)BatchingState); - if (CanProcessUserActionAndTransactionEvents()) { - ContinueProcessTxsAndUserActs(ctx); - } - // Still preprocessing? Waiting for something - if (CanProcessUserActionAndTransactionEvents()) { - PQ_LOG_D("Still preprocessing - waiting for something"); + + PQ_LOG_D("Process user action and tx events"); + ProcessUserActionAndTxEvents(); + DumpTheSizeOfInternalQueues(); + if (!UserActionAndTxPendingWrite.empty()) { + PQ_LOG_D("Waiting for the batch to finish"); return; } - PQ_LOG_D("Batching state after ContinueProcessTxsAndUserActs: " << (int)BatchingState); - // Preprocessing complete; + PQ_LOG_D("Process user action and tx pending commits"); + ProcessUserActionAndTxPendingCommits(); + DumpTheSizeOfInternalQueues(); + if (CurrentBatchSize > 0) { PQ_LOG_D("Batch completed (" << CurrentBatchSize << ")"); Send(SelfId(), new TEvPQ::TEvTxBatchComplete(CurrentBatchSize)); } CurrentBatchSize = 0; - if (UserActionAndTxPendingCommit.empty()) { - // Processing stopped and nothing to commit - finalize - BatchingState = ETxBatchingState::Finishing; - } else { - // Process commit queue - ProcessCommitQueue(); - } - // BatchingState can go to Finishing in ContinueProcessTxsAndUserActs. Therefore, it is necessary to check - // the size of the UserActionAndTxPendingCommit queue here. - if (!UserActionAndTxPendingCommit.empty()) { - // Still pending for come commits - PQ_LOG_D("Still pending for come commits"); - return; - } PQ_LOG_D("Try persist"); - // Here we have an empty UserActionAndTxPendingCommit queue and BatchingState is equal to Finishing. RunPersist(); } -bool TPartition::CanProcessUserActionAndTransactionEvents() const -{ - return (BatchingState == ETxBatchingState::PreProcessing); -} - -void TPartition::ContinueProcessTxsAndUserActs(const TActorContext&) +void TPartition::ProcessUserActionAndTxEvents() { - Y_ABORT_UNLESS(!KVWriteInProgress); - - if (WriteCycleSizeEstimate >= MAX_WRITE_CYCLE_SIZE || WriteKeysSizeEstimate >= MAX_KEYS) { - BatchingState = ETxBatchingState::Finishing; - return; - } - auto visitor = [this](auto& event) { - return this->PreProcessUserActionOrTransaction(event); - }; - while (CanProcessUserActionAndTransactionEvents() && !UserActionAndTransactionEvents.empty()) { + while (!UserActionAndTransactionEvents.empty()) { if (ChangingConfig) { - BatchingState = ETxBatchingState::Finishing; break; } + auto& front = UserActionAndTransactionEvents.front(); if (TMessage* msg = std::get_if(&front.Event); msg && msg->WaitPreviousWriteSpan) { msg->WaitPreviousWriteSpan.End(); } + + auto visitor = [this, &front](auto& event) { + return this->ProcessUserActionAndTxEvent(event, front.AffectedSourceIdsAndConsumers); + }; switch (std::visit(visitor, front.Event)) { - case EProcessResult::Continue: - MoveUserActOrTxToCommitState(); - FirstEvent = false; - break; - case EProcessResult::ContinueDrop: - UserActionAndTransactionEvents.pop_front(); - break; - case EProcessResult::Break: - MoveUserActOrTxToCommitState(); - BatchingState = ETxBatchingState::Finishing; - FirstEvent = false; - break; - case EProcessResult::Blocked: - BatchingState = ETxBatchingState::Executing; - return; - case EProcessResult::NotReady: - return; + case EProcessResult::Continue: + MoveUserActionAndTxToPendingCommitQueue(); + break; + case EProcessResult::ContinueDrop: + UserActionAndTransactionEvents.pop_front(); + break; + case EProcessResult::Break: + MoveUserActionAndTxToPendingCommitQueue(); + break; + case EProcessResult::Blocked: + return; + case EProcessResult::NotReady: + return; } - CurrentBatchSize += 1; } - if (UserActionAndTransactionEvents.empty()) { - BatchingState = ETxBatchingState::Executing; - return; +} + +void TPartition::DumpTheSizeOfInternalQueues() const +{ + PQ_LOG_D("Events: " << UserActionAndTransactionEvents.size() << + ", PendingCommits: " << UserActionAndTxPendingCommit.size() << + ", PendingWrites: " << UserActionAndTxPendingWrite.size()); +} + +TString GetTransactionType(const TTransaction& tx) +{ + if (tx.Tx) { + return "Tx"; + } else if (tx.ProposeTransaction) { + return "ImmediateTx"; + } else if (tx.ProposeConfig) { + return "ProposeConfig"; + } else if (tx.ChangeConfig) { + return "ChangeConfig"; + } else { + return "???"; } +} +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TEvPQ::TEvSetClientInfo)"); + return PreProcessUserActionOrTransaction(event, affectedSourceIdsAndConsumers); } -void TPartition::MoveUserActOrTxToCommitState() { - auto& front = UserActionAndTransactionEvents.front(); - UserActionAndTxPendingCommit.push_back(std::move(front)); - UserActionAndTransactionEvents.pop_front(); +auto TPartition::ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TTransaction[" << GetTransactionType(*tx) << "])"); + return PreProcessUserActionOrTransaction(tx, affectedSourceIdsAndConsumers); +} + +auto TPartition::ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) -> EProcessResult +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxEvent(TMessage)"); + return PreProcessUserActionOrTransaction(msg, affectedSourceIdsAndConsumers); } -void TPartition::ProcessCommitQueue() { +bool TPartition::WritingCycleDoesNotExceedTheLimits() const +{ + return WriteCycleSizeEstimate < MAX_WRITE_CYCLE_SIZE && WriteKeysSizeEstimate < MAX_KEYS; +} + +void TPartition::ProcessUserActionAndTxPendingCommits() { CurrentBatchSize = 0; Y_ABORT_UNLESS(!KVWriteInProgress); if (!PersistRequest) { PersistRequest = MakeHolder(); } - auto visitor = [this, request=PersistRequest.Get()](auto& event) { - return this->ExecUserActionOrTransaction(event, request); - }; - while (!UserActionAndTxPendingCommit.empty()) { + + while (!UserActionAndTxPendingCommit.empty() && WritingCycleDoesNotExceedTheLimits()) { auto& front = UserActionAndTxPendingCommit.front(); auto state = ECommitState::Committed; + if (auto* tx = get_if>(&front.Event)) { state = tx->Get()->State; } + switch (state) { - case ECommitState::Pending: - return; - case ECommitState::Aborted: - break; - case ECommitState::Committed: - break; + case ECommitState::Pending: + return; + case ECommitState::Aborted: + break; + case ECommitState::Committed: + break; } - auto event = std::move(front.Event); + + UserActionAndTxPendingWrite.push_back(std::move(front)); UserActionAndTxPendingCommit.pop_front(); + + auto& event = UserActionAndTxPendingWrite.back().Event; + auto visitor = [this, request = PersistRequest.Get()](auto& event) { + return this->ProcessUserActionAndTxPendingCommit(event, request); + }; std::visit(visitor, event); + + ++CurrentBatchSize; } - if (UserActionAndTxPendingCommit.empty()) { - BatchingState = ETxBatchingState::Finishing; +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TEvPQ::TEvSetClientInfo)"); + ExecUserActionOrTransaction(event, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TTransaction[" << GetTransactionType(*tx) << "])"); + ExecUserActionOrTransaction(tx, request); +} + +void TPartition::ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request) +{ + PQ_LOG_D("TPartition::ProcessUserActionAndTxPendingCommit(TMessage)"); + ExecUserActionOrTransaction(msg, request); +} + +static void AppendToSet(const TVector& p, THashMap& q) +{ + for (const auto& s : p) { + ++q[s]; } } -void TPartition::RunPersist() { - TransactionsInflight.clear(); +void TPartition::AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + AppendToSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + AppendToSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + AppendToSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + WriteKeysSizeEstimate += affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers() +{ + if (UserActionAndTxPendingWrite.empty()) { + return; + } + + for (const auto& e : UserActionAndTxPendingWrite) { + DeleteAffectedSourceIdsAndConsumers(e.AffectedSourceIdsAndConsumers); + + if (auto* tx = std::get_if>(&e.Event); tx) { + if (auto txId = (*tx)->GetTxId(); txId.Defined()) { + TransactionsInflight.erase(*txId); + } + + if ((*tx)->ChangeConfig || (*tx)->ProposeConfig) { + ChangingConfig = false; + } + } + } + + UserActionAndTxPendingWrite.clear(); +} + +void TPartition::DeleteFromSet(const TVector& p, THashMap& q) const +{ + for (const auto& s : p) { + auto i = q.find(s); + Y_ABORT_UNLESS(i != q.end()); + Y_ABORT_UNLESS(i->second > 0); + if (--i->second) { + continue; + } + q.erase(s); + } +} + +void TPartition::DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ + DeleteFromSet(affectedSourceIdsAndConsumers.TxWriteSourcesIds, TxAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.WriteSourcesIds, WriteAffectedSourcesIds); + DeleteFromSet(affectedSourceIdsAndConsumers.TxReadConsumers, TxAffectedConsumers); + DeleteFromSet(affectedSourceIdsAndConsumers.ReadConsumers, SetOffsetAffectedConsumers); + + Y_ABORT_UNLESS(WriteKeysSizeEstimate >= affectedSourceIdsAndConsumers.WriteKeysSize); + WriteKeysSizeEstimate -= affectedSourceIdsAndConsumers.WriteKeysSize; +} + +void TPartition::MoveUserActionAndTxToPendingCommitQueue() { + auto& front = UserActionAndTransactionEvents.front(); + AppendAffectedSourceIdsAndConsumers(front.AffectedSourceIdsAndConsumers); + UserActionAndTxPendingCommit.push_back(std::move(front)); + UserActionAndTransactionEvents.pop_front(); +} - Y_ABORT_UNLESS(UserActionAndTxPendingCommit.empty()); +void TPartition::RunPersist() { const auto& ctx = ActorContext(); const auto now = ctx.Now(); if (!PersistRequest) { @@ -2403,7 +2522,7 @@ bool TPartition::TryAddDeleteHeadKeysToPersistRequest() return haveChanges; } -void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) +void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const { PQ_LOG_D("=== DumpKeyValueRequest ==="); PQ_LOG_D("--- delete ----------------"); @@ -2451,7 +2570,8 @@ void TPartition::AnswerCurrentReplies(const TActorContext& ctx) Replies.clear(); } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { auto span = t->CalcPredicateSpan.CreateChild(TWilsonTopic::TopicTopLevel, "Topic.Partition.PreProcess", @@ -2463,7 +2583,7 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::NotReady; } if (t->WriteInfo && !t->WriteInfoApplied) { //Recieved write info but not applied - result = ApplyWriteInfoResponse(*t); + result = ApplyWriteInfoResponse(*t, affectedSourceIdsAndConsumers); if (!t->WriteInfoApplied) { // Tried to apply write info but couldn't - TX must be blocked. PQ_LOG_TX_D("The TxId " << t->GetTxId() << " must be blocked"); Y_ABORT_UNLESS(result != EProcessResult::Continue); @@ -2476,23 +2596,24 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return EProcessResult::Continue; } t->Predicate.ConstructInPlace(true); - return PreProcessImmediateTx(t->ProposeTransaction->GetRecord()); + return PreProcessImmediateTx(*t, affectedSourceIdsAndConsumers); } else if (t->Tx) { // Distributed TX if (t->Predicate.Defined()) { // Predicate defined - either failed previously or Tx created with predicate defined. ReplyToProposeOrPredicate(t, true); return EProcessResult::Continue; } - result = BeginTransaction(*t->Tx, t->Predicate, t->Message); + result = BeginTransactionData(*t, affectedSourceIdsAndConsumers); if (t->Predicate.Defined()) { ReplyToProposeOrPredicate(t, true); } return result; } else if (t->ProposeConfig) { - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + PQ_LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } - t->Predicate = BeginTransaction(*t->ProposeConfig); + t->Predicate = BeginTransactionConfig(); ChangingConfig = true; PendingPartitionConfig = GetPartitionConfig(t->ProposeConfig->Config); //Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found"); @@ -2502,7 +2623,8 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple Y_ABORT_UNLESS(t->ChangeConfig); Y_ABORT_UNLESS(!ChangeConfig && !ChangingConfig); - if (!FirstEvent) { + if (HasPendingCommitsOrPendingWrites()) { + PQ_LOG_D("Wait until the operation with the config becomes the first in the queue"); return EProcessResult::Blocked; } ChangingConfig = true; @@ -2514,6 +2636,11 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimple return result; } +bool TPartition::HasPendingCommitsOrPendingWrites() const +{ + return !UserActionAndTxPendingCommit.empty() || !UserActionAndTxPendingWrite.empty(); +} + bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, TEvKeyValue::TEvRequest*) { auto span = t->CommitSpan.CreateChild(TWilsonTopic::TopicTopLevel, @@ -2533,35 +2660,35 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, case ECommitState::Committed: break; } - const auto& ctx = ActorContext(); if (t->ChangeConfig) { Y_ABORT_UNLESS(!ChangeConfig); Y_ABORT_UNLESS(ChangingConfig); ChangeConfig = t->ChangeConfig; SendChangeConfigReply = t->SendReply; - BeginChangePartitionConfig(ChangeConfig->Config, ctx); + BeginChangePartitionConfig(ChangeConfig->Config); } else if (t->ProposeConfig) { - Y_ABORT_UNLESS(ChangingConfig); - ChangeConfig = MakeSimpleShared(TopicConverter, - t->ProposeConfig->Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); + Y_ABORT_UNLESS(ChangeConfig); SendChangeConfigReply = false; } CommitTransaction(t); return true; } -TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, - TMaybe& predicateOut, TString& issueMsg) +TPartition::EProcessResult TPartition::BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const TEvPQ::TEvTxCalcPredicate& tx = *t.Tx; + TMaybe& predicateOut = t.Predicate; + TString& issueMsg = t.Message; + if (tx.ForcePredicateFalse) { predicateOut = false; return EProcessResult::Continue; } - THashSet consumers; + TVector consumers; bool result = true; - for (auto& operation : tx.Operations) { + for (const auto& operation : tx.Operations) { const TString& consumer = operation.GetConsumer(); if (TxAffectedConsumers.contains(consumer)) { PQ_LOG_TX_D("TxAffectedConsumers contains consumer " << consumer << ". TxId " << tx.TxId); @@ -2649,25 +2776,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } break; } - consumers.insert(consumer); + consumers.push_back(consumer); PQ_LOG_TX_D("TxId " << tx.TxId << " affect consumer " << consumer); } } if (result) { - TxAffectedConsumers.insert(consumers.begin(), consumers.end()); + affectedSourceIdsAndConsumers.TxReadConsumers = std::move(consumers); } predicateOut = result; return EProcessResult::Continue; } -bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) +bool TPartition::BeginTransactionConfig() { - ChangeConfig = - MakeSimpleShared(TopicConverter, - event.Config); - PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config); - SendChangeConfigReply = false; return true; } @@ -2820,7 +2942,6 @@ void TPartition::CommitWriteOperations(TTransaction& t) void TPartition::CommitTransaction(TSimpleSharedPtr& t) { - const auto& ctx = ActorContext(); if (t->Tx) { Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); @@ -2865,7 +2986,7 @@ void TPartition::CommitTransaction(TSimpleSharedPtr& t) } else if (t->ProposeConfig) { Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); - BeginChangePartitionConfig(t->ProposeConfig->Config, ctx); + BeginChangePartitionConfig(t->ProposeConfig->Config); ExecChangePartitionConfig(); ChangePlanStepAndTxId(t->ProposeConfig->Step, t->ProposeConfig->TxId); @@ -2897,8 +3018,7 @@ void TPartition::RollbackTransaction(TSimpleSharedPtr& t) } } -void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx) +void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config) { TSet hasReadRule; for (auto& [consumer, info] : UsersInfoStorage->GetAll()) { @@ -2912,7 +3032,7 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co } } - for (auto& consumer : config.GetConsumers()) { + for (const auto& consumer : config.GetConsumers()) { auto& userInfo = GetOrCreatePendingUser(consumer.GetName(), 0); TInstant ts = TInstant::MilliSeconds(consumer.GetReadFromTimestampsMs()); @@ -2927,21 +3047,22 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co auto act = MakeHolder(0, consumer.GetName(), 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); - auto res = PreProcessUserAct(*act, ctx); - ChangeConfigActs.emplace_back(std::move(act)); - + auto res = PreProcessUserAct(*act, nullptr); Y_ABORT_UNLESS(res == EProcessResult::Continue); + + ChangeConfigActs.emplace_back(std::move(act)); } hasReadRule.erase(consumer.GetName()); } - for (auto& consumer : hasReadRule) { + for (const auto& consumer : hasReadRule) { GetOrCreatePendingUser(consumer); auto act = MakeHolder(0, consumer, 0, "", 0, 0, 0, TActorId{}, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); - auto res = PreProcessUserAct(*act, ctx); + auto res = PreProcessUserAct(*act, nullptr); Y_ABORT_UNLESS(res == EProcessResult::Continue); + ChangeConfigActs.emplace_back(std::move(act)); } } @@ -2954,14 +3075,8 @@ void TPartition::ExecChangePartitionConfig() { } void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) { - FirstEvent = true; - TxAffectedConsumers.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - SetOffsetAffectedConsumers.clear(); - BatchingState = ETxBatchingState::PreProcessing; + DeleteAffectedSourceIdsAndConsumers(); WriteCycleSizeEstimate = 0; - WriteKeysSizeEstimate = 0; if (ChangeConfig) { EndChangePartitionConfig(std::move(ChangeConfig->Config), @@ -3046,8 +3161,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx) ChangeConfig = nullptr; PendingPartitionConfig = nullptr; } - ChangingConfig = false; - BatchingState = ETxBatchingState::PreProcessing; } void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config, @@ -3121,15 +3234,18 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) TxIdHasChanged = true; } -TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx) +TPartition::EProcessResult TPartition::PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { + const NKikimrPQ::TEvProposeTransaction& tx = t.ProposeTransaction->GetRecord(); + if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_ABORT_UNLESS(tx.HasData()); - THashSet consumers; - for (auto& operation : tx.GetData().GetOperations()) { + TVector consumers; + for (const auto& operation : tx.GetData().GetOperations()) { if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } @@ -3156,10 +3272,10 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - consumers.insert(user); + consumers.push_back(user); } - SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); - WriteKeysSizeEstimate += consumers.size(); + affectedSourceIdsAndConsumers.ReadConsumers = std::move(consumers); + affectedSourceIdsAndConsumers.WriteKeysSize += consumers.size(); return EProcessResult::Continue; } @@ -3252,12 +3368,14 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TSimpleSharedPtr& act, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (AffectedUsers.size() >= MAX_USERS) { return EProcessResult::Blocked; } - return PreProcessUserAct(*act, ActorContext()); + + return PreProcessUserAct(*act, &affectedSourceIdsAndConsumers); } bool TPartition::ExecUserActionOrTransaction( @@ -3267,7 +3385,8 @@ bool TPartition::ExecUserActionOrTransaction( return true; } -TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg) +TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) { if (WriteCycleSize >= MAX_WRITE_CYCLE_SIZE) { return EProcessResult::Blocked; @@ -3275,13 +3394,13 @@ TPartition::EProcessResult TPartition::PreProcessUserActionOrTransaction(TMessag auto result = EProcessResult::Continue; if (msg.IsWrite()) { - result = PreProcessRequest(msg.GetWrite()); + result = PreProcessRequest(msg.GetWrite(), affectedSourceIdsAndConsumers); } else if (msg.IsRegisterMessageGroup()) { - result = PreProcessRequest(msg.GetRegisterMessageGroup()); + result = PreProcessRequest(msg.GetRegisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsDeregisterMessageGroup()) { - result = PreProcessRequest(msg.GetDeregisterMessageGroup()); + result = PreProcessRequest(msg.GetDeregisterMessageGroup(), affectedSourceIdsAndConsumers); } else if (msg.IsSplitMessageGroup()) { - result = PreProcessRequest(msg.GetSplitMessageGroup()); + result = PreProcessRequest(msg.GetSplitMessageGroup(), affectedSourceIdsAndConsumers); } else { Y_ABORT_UNLESS(msg.IsOwnership()); } @@ -3318,9 +3437,9 @@ bool TPartition::ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequ return true; } -TPartition::EProcessResult TPartition::PreProcessUserAct( - TEvPQ::TEvSetClientInfo& act, const TActorContext& -) { +TPartition::EProcessResult TPartition::PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers) +{ Y_ABORT_UNLESS(!KVWriteInProgress); const TString& user = act.ClientId; @@ -3329,8 +3448,12 @@ TPartition::EProcessResult TPartition::PreProcessUserAct( return EProcessResult::Blocked; } } - WriteKeysSizeEstimate += 1; - SetOffsetAffectedConsumers.insert(user); + + if (affectedSourceIdsAndConsumers) { + ++affectedSourceIdsAndConsumers->WriteKeysSize; + affectedSourceIdsAndConsumers->ReadConsumers.push_back(user); + } + return EProcessResult::Continue; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 163ec06b7c05..15dbde885fef 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -336,11 +336,31 @@ class TPartition : public TActorBootstrapped { TAutoPtr MakeHasDataInfoResponse(ui64 lagSize, const TMaybe& cookie, bool readingFinished = false); void ProcessTxsAndUserActs(const TActorContext& ctx); - void ContinueProcessTxsAndUserActs(const TActorContext& ctx); - void ProcessCommitQueue(); void RunPersist(); - void MoveUserActOrTxToCommitState(); + enum class EProcessResult; + struct TAffectedSourceIdsAndConsumers; + + void ProcessUserActionAndTxEvents(); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult ProcessUserActionAndTxEvent(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + + void MoveUserActionAndTxToPendingCommitQueue(); + + void ProcessUserActionAndTxPendingCommits(); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& event, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TSimpleSharedPtr& tx, + TEvKeyValue::TEvRequest* request); + void ProcessUserActionAndTxPendingCommit(TMessage& msg, + TEvKeyValue::TEvRequest* request); + + bool WritingCycleDoesNotExceedTheLimits() const; + void PushBackDistrTx(TSimpleSharedPtr event); void PushBackDistrTx(TSimpleSharedPtr event); void PushFrontDistrTx(TSimpleSharedPtr event); @@ -422,14 +442,13 @@ class TPartition : public TActorBootstrapped { const TString& reason); THolder MakeCommitDone(ui64 step, ui64 txId); - bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event); + bool BeginTransactionConfig(); void CommitTransaction(TSimpleSharedPtr& t); void RollbackTransaction(TSimpleSharedPtr& t); - void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx); + void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config); void ExecChangePartitionConfig(); void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx); @@ -720,10 +739,10 @@ class TPartition : public TActorBootstrapped { TMaybe KafkaProducerEpoch = 0; }; - THashSet TxAffectedSourcesIds; - THashSet WriteAffectedSourcesIds; - THashSet TxAffectedConsumers; - THashSet SetOffsetAffectedConsumers; + THashMap TxAffectedSourcesIds; + THashMap WriteAffectedSourcesIds; + THashMap TxAffectedConsumers; + THashMap SetOffsetAffectedConsumers; THashMap TxSourceIdForPostPersist; THashMap TxInflightMaxSeqNoPerSourceId; @@ -778,45 +797,61 @@ class TPartition : public TActorBootstrapped { TMaybe UsersInfoStorage; - // template T& GetUserActionAndTransactionEventsFront(); - // template T& GetCurrentEvent(); - //TSimpleSharedPtr& GetCurrentTransaction(); + struct TAffectedSourceIdsAndConsumers { + TVector TxWriteSourcesIds; + TVector WriteSourcesIds; + TVector TxReadConsumers; + TVector ReadConsumers; + ui32 WriteKeysSize = 0; + ui32 WriteCycleSize = 0; + }; + + void AppendAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event); - EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx); - EProcessResult PreProcessUserActionOrTransaction(TMessage& msg); + void DeleteAffectedSourceIdsAndConsumers(); + void DeleteAffectedSourceIdsAndConsumers(const TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + void DeleteFromSet(const TVector& p, THashMap& q) const; - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, TEvKeyValue::TEvRequest* request); + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& event, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TSimpleSharedPtr& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessUserActionOrTransaction(TMessage& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool ExecUserActionOrTransaction(TSimpleSharedPtr& event, - TEvKeyValue::TEvRequest* request); + bool ExecUserActionOrTransaction(TSimpleSharedPtr& events, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TSimpleSharedPtr& tx, TEvKeyValue::TEvRequest* request); bool ExecUserActionOrTransaction(TMessage& msg, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx); + [[nodiscard]] EProcessResult PreProcessUserAct(TEvPQ::TEvSetClientInfo& act, + TAffectedSourceIdsAndConsumers* affectedSourceIdsAndConsumers); void CommitUserAct(TEvPQ::TEvSetClientInfo& act); - [[nodiscard]] EProcessResult PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx); + [[nodiscard]] EProcessResult PreProcessImmediateTx(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecImmediateTx(TTransaction& tx); - EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg); - EProcessResult PreProcessRequest(TWriteMsg& msg); + EProcessResult PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); + EProcessResult PreProcessRequest(TWriteMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); void ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters); void ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters); bool ExecRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request); - [[nodiscard]] EProcessResult BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, - TMaybe& predicate, TString& issueMsg); + [[nodiscard]] EProcessResult BeginTransactionData(TTransaction& t, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - EProcessResult ApplyWriteInfoResponse(TTransaction& tx); + EProcessResult ApplyWriteInfoResponse(TTransaction& tx, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers); - bool FirstEvent = true; bool HaveWriteMsg = false; bool HaveData = false; bool HaveCheckDisk = false; @@ -837,13 +872,12 @@ class TPartition : public TActorBootstrapped { void BeginAppendHeadWithNewWrites(const TActorContext& ctx); void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx); + bool HasPendingCommitsOrPendingWrites() const; + // // user actions and transactions // struct TUserActionAndTransactionEvent { - std::variant, // user actions - TSimpleSharedPtr, // distributed transaction or update config - TMessage> Event; TUserActionAndTransactionEvent(TSimpleSharedPtr&& transaction) : Event(std::move(transaction)) {} @@ -853,10 +887,16 @@ class TPartition : public TActorBootstrapped { TUserActionAndTransactionEvent(TMessage&& message) : Event(std::move(message)) {} + + std::variant, // user actions + TSimpleSharedPtr, // distributed transaction or update config + TMessage> Event; + TAffectedSourceIdsAndConsumers AffectedSourceIdsAndConsumers; }; std::deque UserActionAndTransactionEvents; std::deque UserActionAndTxPendingCommit; + std::deque UserActionAndTxPendingWrite; TVector> WriteInfosApplied; THashMap> TransactionsInflight; @@ -879,15 +919,6 @@ class TPartition : public TActorBootstrapped { TMessageQueue Responses; ui64 CurrentBatchSize = 0; - enum class ETxBatchingState{ - PreProcessing, - Executing, - Finishing - }; - ETxBatchingState BatchingState = ETxBatchingState::PreProcessing; - // - // - // std::deque> UpdateUserInfoTimestamp; bool ReadingTimestamp; TString ReadingForUser; @@ -1071,15 +1102,15 @@ class TPartition : public TActorBootstrapped { size_t WriteNewSizeFromSupportivePartitions = 0; bool TryAddDeleteHeadKeysToPersistRequest(); - void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request); + void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) const; TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key); + void DumpTheSizeOfInternalQueues() const; TIntrusivePtr SamplingControl; TDeque TxForPersistTraceIds; TDeque TxForPersistSpans; - bool CanProcessUserActionAndTransactionEvents() const; }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index b7547227087f..60caf85ab22e 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -568,10 +568,6 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { TxSourceIdForPostPersist.clear(); TxInflightMaxSeqNoPerSourceId.clear(); - TxAffectedSourcesIds.clear(); - WriteAffectedSourcesIds.clear(); - TxAffectedConsumers.clear(); - SetOffsetAffectedConsumers.clear(); if (UserActionAndTransactionEvents.empty()) { WriteInfosToTx.clear(); } @@ -981,7 +977,9 @@ void TPartition::CancelOneWriteOnWrite(const TActorContext& ctx, StartProcessChangeOwnerRequests(ctx); } -TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -996,7 +994,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TRegisterMessageGroupMs if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -1013,7 +1011,9 @@ void TPartition::ExecRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& p parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange)); } -TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -1027,7 +1027,7 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TDeregisterMessageGroup if (TxAffectedSourcesIds.contains(msg.Body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(msg.Body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(msg.Body.SourceId); return EProcessResult::Continue; } @@ -1036,7 +1036,9 @@ void TPartition::ExecRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& } -TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg) { +TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& msg, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { ScheduleReplyError(msg.Cookie, false, InactivePartitionErrorCode, TStringBuilder() << "Write to inactive partition " << Partition.OriginalPartitionId); @@ -1052,16 +1054,16 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TSplitMessageGroupMsg& if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } for (auto& body : msg.Deregistrations) { if (TxAffectedSourcesIds.contains(body.SourceId)) { return EProcessResult::Blocked; } - WriteAffectedSourcesIds.insert(body.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(body.SourceId); } - return EProcessResult::Continue; + return EProcessResult::Continue; } @@ -1081,7 +1083,9 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para } } -TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { +TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p, + TAffectedSourceIdsAndConsumers& affectedSourceIdsAndConsumers) +{ if (!CanWrite()) { WriteInflightSize -= p.Msg.Data.size(); ScheduleReplyError(p.Cookie, false, InactivePartitionErrorCode, @@ -1106,7 +1110,8 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { return EProcessResult::Blocked; } } - WriteAffectedSourcesIds.insert(p.Msg.SourceId); + affectedSourceIdsAndConsumers.WriteSourcesIds.push_back(p.Msg.SourceId); + return EProcessResult::Continue; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index e4e0bc5b01aa..140b245da04f 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -1511,7 +1511,7 @@ class TPartitionTxTestHelper : public TPartitionFixture { BatchSizes.push_back(msg->BatchSize); } } else if (auto* msg = ev->CastAsLocal()) { - Cerr << "Got KV request\n"; + Cerr << "Got KV request" << Endl; with_lock(Lock) { HadKvRequest = true; } @@ -2175,6 +2175,9 @@ Y_UNIT_TEST_F(CorrectRange_Commit, TPartitionFixture) SendCommitTx(step, txId); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=3, .PlanStep=step, .TxId=txId, .UserInfos={{1, {.Session=session, .Offset=2}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); @@ -2205,10 +2208,11 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) SendCommitTx(step, txId_1); - WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=0}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); - WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_2); @@ -2216,6 +2220,9 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Transactions, TPartitionFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId_3, .Partition=TPartitionId(partition), .Predicate=false}); SendRollbackTx(step, txId_3); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_2, .UserInfos={{1, {.Session=session, .Offset=1}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_3, .UserInfos={{1, {.Session=session, .Offset=1}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); } @@ -2240,6 +2247,9 @@ Y_UNIT_TEST_F(CorrectRange_Multiple_Consumers, TPartitionFixture) WaitCmdWrite({.Count=2, .UserInfos={{0, {.Session="session-1", .Offset=3}}}}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitCmdWrite({.Count=2, .UserInfos={{1, {.Session="session-2", .Offset=0}}}}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + WaitProxyResponse({.Cookie=1, .Status=NMsgBusProxy::MSTATUS_OK}); WaitCalcPredicateResult({.Step=step, .TxId=txId, .Partition=TPartitionId(partition), .Predicate=true}); @@ -2320,10 +2330,19 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TPartitionFixture) SendCalcPredicate(step, txId_1, client, 0, 2); WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); + WaitCmdWrite({ + .PlanStep=step, .TxId=txId_1, + .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}} + }); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + SendCalcPredicate(step, txId_2, client, 0, 5); SendRollbackTx(step, txId_1); - WaitCmdWrite({.Count=1, .PlanStep=step, .TxId=txId_1, .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}}}); + WaitCmdWrite({ + .PlanStep=step, .TxId=txId_1, + .UserInfos={{1, {.Consumer="client", .Session="session", .Offset=0}}} + }); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=true}); @@ -2361,6 +2380,10 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) SendCalcPredicate(step, txId_2, "client-2", 0, 2); WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=TPartitionId(partition), .Predicate=true}); + WaitCmdWrite({ + .UserInfos={{1, {.Consumer="client-1", .Session="session-1", .Offset=0}}} + }); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); SendCommitTx(step, txId_1); Cerr << "Wait cmd write (initial)\n"; WaitCmdWrite({.Count=8, @@ -2369,8 +2392,8 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) {1, {.Consumer="client-1", .Session="session-1", .Offset=2}}, }, }); - SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + Cerr << "Wait commit 1 done\n"; WaitCommitTxDone({.TxId=txId_1, .Partition=TPartitionId(partition)}); @@ -2394,6 +2417,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) {0, {.Partition=3, .Consumer="client-2"}} }}); SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + Cerr << "Wait config changed\n"; WaitPartitionConfigChanged({.Partition=TPartitionId(partition)}); @@ -2401,6 +2425,7 @@ Y_UNIT_TEST_F(ChangeConfig, TPartitionFixture) // consumer 'client-2' was deleted // WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=TPartitionId(partition), .Predicate=false}); + SendRollbackTx(step, txId_2); } @@ -2740,6 +2765,7 @@ Y_UNIT_TEST_F(DataTxCalcPredicateOk, TPartitionTxTestHelper) TString data = "data for write"; SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + EmulateKVTablet(); auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); UNIT_ASSERT(ownerEvent != nullptr); auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); @@ -2826,7 +2852,7 @@ void TPartitionTxTestHelper::NonConflictingActsBatchOkTest() { WaitTxPredicateReply(tx2); WaitTxPredicateReply(tx3); - WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; + //WaitBatchCompletion(5 + 6 + 6); //5 txs and immediate txs + 2 normal writes with 6 messages each; SendTxCommit(tx3); SendTxRollback(tx2); @@ -2870,7 +2896,7 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx1); SendTxRollback(tx2); @@ -2878,7 +2904,7 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { SendKvResponse(); WaitTxPredicateReply(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx3); //2 Normal writes with src1 & src4 @@ -2886,17 +2912,19 @@ Y_UNIT_TEST_F(ConflictingActsInSeveralBatches, TPartitionTxTestHelper) { WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx3); WaitTxPredicateReply(tx5); - WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message + //WaitBatchCompletion(6 + 2); // Normal writes produce 1 act for each message SendTxCommit(tx5); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx5); - WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; + //WaitBatchCompletion(1 + 6); //Normal write & immTx for src4; WaitKvRequest(); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); @@ -2913,7 +2941,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx1, true); WaitWriteInfoRequest(tx2, true); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx1); ExpectNoKvRequest(); @@ -2927,7 +2955,7 @@ Y_UNIT_TEST_F(ConflictingTxIsAborted, TPartitionTxTestHelper) { AddAndSendNormalWrite("src2", 7, 12); auto tx3 = MakeAndSendWriteTx({{"src2", {12, 15}}}); Y_UNUSED(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); ExpectNoCommitDone(); @@ -2945,12 +2973,12 @@ Y_UNIT_TEST_F(ConflictingTxProceedAfterRollback, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxCommit(tx2); WaitKvRequest(); @@ -2976,7 +3004,7 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 1 completion\n"; SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); Cerr << "Expect KV request\n"; WaitKvRequest(); SendKvResponse(); @@ -2990,13 +3018,13 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxInDifferentBatches, TPartitionTxTestHelper) { Cerr << "Wait batch of 3 completion\n"; - WaitBatchCompletion(1); // Immediate Tx 2 - 4. + //WaitBatchCompletion(1); // Immediate Tx 2 - 4. Cerr << "Expect KV request\n"; WaitKvRequest(); SendKvResponse(); SendTxRollback(tx3); SendTxRollback(tx4); - WaitBatchCompletion(2); // Immediate Tx 2 - 4. + //WaitBatchCompletion(2); // Immediate Tx 2 - 4. WaitKvRequest(); SendKvResponse(); @@ -3023,7 +3051,7 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitKvRequest(); SendKvResponse(); @@ -3034,11 +3062,13 @@ Y_UNIT_TEST_F(ConflictingSrcIdTxAndWritesDifferentBatches, TPartitionTxTestHelpe WaitTxPredicateReply(tx3); SendTxRollback(tx2); SendTxCommit(tx3); - WaitBatchCompletion(2); // Tx 2 & 3. + //WaitBatchCompletion(2); // Tx 2 & 3. + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx3); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); WaitKvRequest(); SendKvResponse(); WaitProxyResponse({.AlreadyWritten=true, .SeqNo=1}); @@ -3062,12 +3092,12 @@ Y_UNIT_TEST_F(ConflictingSrcIdForTxWithHead, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); SendTxCommit(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); Cerr << "Wait 1st KV request\n"; WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); - WaitBatchCompletion(3); + //WaitBatchCompletion(3); Cerr << "Wait 2nd KV request\n"; WaitKvRequest(); SendKvResponse(); @@ -3132,11 +3162,14 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx1, true); WaitWriteInfoRequest(immTx2, true); - WaitBatchCompletion(4 + 1); + //WaitBatchCompletion(4 + 1); + EmulateKVTablet(); EmulateKVTablet(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); + EmulateKVTablet(); } + { // 2. ImmTx -> WriteTx = KVRequest ResetBatchCompletion(); @@ -3146,17 +3179,19 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); + EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); ExpectNoCommitDone(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitDone(tx); } + { // 3. NormWrite -> WriteTx = KVRequest ResetBatchCompletion(); @@ -3165,15 +3200,17 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { auto tx = wrapper.AddTx(); wrapper.Process(); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateReply(tx); SendTxCommit(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); + EmulateKVTablet(); EmulateKVTablet(); WaitCommitDone(tx); } + { // 4. WriteTx -> NormWrite = 2 batches ResetBatchCompletion(); @@ -3183,14 +3220,15 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitTxPredicateReply(tx); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); ExpectNoKvRequest(); SendTxCommit(tx); EmulateKVTablet(); WaitCommitDone(tx); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); } + { // 5. WriteTx -> ImmTx = 2 batches ResetBatchCompletion(); @@ -3200,17 +3238,18 @@ Y_UNIT_TEST_F(DifferentWriteTxBatchingOptions, TPartitionTxTestHelper) { wrapper.Process(); WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(1+1); + //WaitBatchCompletion(1+1); WaitTxPredicateReply(tx); SendTxCommit(tx); ExpectNoCommitDone(); EmulateKVTablet(); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); WaitCommitDone(tx); EmulateKVTablet(); WaitImmediateTxComplete(immTx, true); } } + Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { Init({.WriterSessions={"src1", "src2"}, .EndOffset = 1}); // Failed WriteTx doesn't block @@ -3226,11 +3265,11 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(tx, true); WaitWriteInfoRequest(immTx, true); - WaitBatchCompletion(5 + 1); + //WaitBatchCompletion(5 + 1); ExpectNoTxPredicateReply(); EmulateKVTablet(); WaitTxPredicateFailure(tx); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); SendTxRollback(tx); EmulateKVTablet(); @@ -3250,7 +3289,7 @@ Y_UNIT_TEST_F(FailedTxsDontBlock, TPartitionTxTestHelper) { WaitWriteInfoRequest(immTx, true); WaitWriteInfoRequest(tx, true); - WaitBatchCompletion(2 + 1); + //WaitBatchCompletion(2 + 1); WaitTxPredicateReply(tx); ExpectNoKvRequest(); SendTxCommit(tx); @@ -3282,12 +3321,14 @@ Y_UNIT_TEST_F(NonConflictingCommitsBatch, TPartitionTxTestHelper) { WaitTxPredicateReply(tx1); WaitTxPredicateReply(tx2); - WaitBatchCompletion(5 + 1 /*tmpTx*/); + //WaitBatchCompletion(5 + 1 /*tmpTx*/); SendTxCommit(tx1); SendTxCommit(tx2); WaitKvRequest(); SendKvResponse(); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx1); WaitCommitDone(tx2); WaitImmediateTxComplete(txImm1, false); @@ -3313,28 +3354,32 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitWriteInfoRequest(txTmp, true); WaitTxPredicateReply(txTmp); - WaitBatchCompletion(2); // txTmp + act-1 SendTxRollback(txTmp); + //WaitBatchCompletion(2); // txTmp + act-1 ExpectNoTxPredicateReply(); WaitKvRequest(); SendKvResponse(); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); // tx1 + WaitKvRequest(); + SendKvResponse(); ExpectNoTxPredicateReply(); SendTxCommit(tx1); + //WaitBatchCompletion(1); // tx1 WaitTxPredicateReply(tx2); SendTxCommit(tx2); - WaitBatchCompletion(1); // tx2 + //WaitBatchCompletion(1); // tx2 WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx1); + WaitKvRequest(); + SendKvResponse(); WaitCommitDone(tx2); - WaitBatchCompletion(1); // act-2 + //WaitBatchCompletion(1); // act-2 WaitKvRequest(); SendKvResponse(); @@ -3345,8 +3390,8 @@ Y_UNIT_TEST_F(ConflictingCommitsInSeveralBatches, TPartitionTxTestHelper) { WaitTxPredicateReply(txTmp); SendTxRollback(txTmp); - WaitBatchCompletion(3); WaitKvRequest(); + //WaitBatchCompletion(3); SendKvResponse(); WaitImmediateTxComplete(immTx1, true); WaitImmediateTxComplete(immTx2, true); @@ -3370,7 +3415,7 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1 + 1); + //WaitBatchCompletion(1 + 1); SendTxCommit(tx1); WaitTxPredicateFailure(tx2); @@ -3393,11 +3438,11 @@ Y_UNIT_TEST_F(ConflictingCommitFails, TPartitionTxTestHelper) { SendTxRollback(txTmp); ExpectNoTxPredicateReply(); - WaitBatchCompletion(2); + //WaitBatchCompletion(2); WaitKvRequest(); SendKvResponse(); WaitTxPredicateFailure(tx3); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx3); WaitKvRequest(); //No user operatiions completed but TxId has changed which will be saved @@ -3430,16 +3475,18 @@ Y_UNIT_TEST_F(ConflictingCommitProccesAfterRollback, TPartitionTxTestHelper) { auto tx2 = MakeAndSendTxOffsetCommit(1, 0, 3); WaitTxPredicateReply(tx1); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxRollback(tx1); WaitKvRequest(); SendKvResponse(); WaitTxPredicateReply(tx2); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); SendTxCommit(tx2); + WaitKvRequest(); + SendKvResponse(); WaitKvRequest(); SendKvResponse(); WaitCommitDone(tx2); @@ -3465,7 +3512,7 @@ Y_UNIT_TEST_F(TestBatchingWithChangeConfig, TPartitionTxTestHelper) { WaitBatchCompletion(1); EmulateKVTablet(); auto event = Ctx->Runtime->GrabEdgeEvent(); - WaitBatchCompletion(1); // immTx2 + //WaitBatchCompletion(1); // immTx2 EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); } @@ -3499,16 +3546,14 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) { SendCommitTx(1, proposeTxId); //ToDo - wait propose result; - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitCommitTxDone({.TxId=proposeTxId}); - WaitBatchCompletion(1); + //WaitBatchCompletion(1); EmulateKVTablet(); WaitImmediateTxComplete(immTx2, true); } - - Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) { auto* actor = CreatePartition({ .Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},