From 548706bbb69aa15d4726aaecb7a59e9bd87edd42 Mon Sep 17 00:00:00 2001 From: Alexander Kotov Date: Sun, 23 Nov 2025 00:15:41 +0300 Subject: [PATCH] Entries remain in the TxWrites after the immediate transaction (#29285) --- ydb/core/persqueue/pq_impl.cpp | 41 ++++++++---- ydb/core/persqueue/pq_impl.h | 3 +- .../src/client/topic/ut/topic_to_table_ut.cpp | 64 +++++++++++++++++++ 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index f9b9d522b38e..174db9360a23 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3322,7 +3322,7 @@ void TPersQueue::ScheduleDeleteExpiredKafkaTransactions() { for (auto& pair : TxWrites) { if (txnExpired(pair.second)) { PQ_LOG_D("Transaction for Kafka producer " << pair.first.KafkaProducerInstanceId << " is expired"); - BeginDeletePartitions(pair.second); + BeginDeletePartitions(pair.first, pair.second); } } } @@ -5420,7 +5420,7 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e } PQ_LOG_TX_I("delete partitions for WriteId " << writeId << " (longTxService lost tx)"); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(writeId, writeInfo); } void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) @@ -5470,6 +5470,14 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon DeletePartition(partitionId, ctx); writeInfo.Partitions.erase(partitionId.OriginalPartitionId); + TryDeleteWriteId(writeId, writeInfo, ctx); + TxWritesChanged = true; + + TryWriteTxs(ctx); +} + +void TPersQueue::TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx) +{ if (writeInfo.Partitions.empty()) { if (!writeInfo.KafkaTransaction) { UnsubscribeWriteId(writeId, ctx); @@ -5481,14 +5489,15 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon (tx->State == NKikimrPQ::TTransaction::CANCELED)) { TryExecuteTxs(ctx, *tx); } + } else { + // if the transaction is not in Txs, then it is an immediate transaction + DeleteWriteId(writeId); } - } else if (writeInfo.KafkaTransaction) { // case when kafka transaction haven't even started in KQP, but data for it was already written in partition + } else { + // this is kafka transaction or immediate transaction DeleteWriteId(writeId); } } - TxWritesChanged = true; - - TryWriteTxs(ctx); } void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&) @@ -5508,20 +5517,24 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo TTxWriteInfo& writeInfo = TxWrites.at(writeId); Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(writeId, writeInfo); } -void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo) +void TPersQueue::BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo) { if (writeInfo.Deleting) { PQ_LOG_TX_D("Already deleting WriteInfo"); return; } - for (auto& [_, partitionId] : writeInfo.Partitions) { - Y_ABORT_UNLESS(Partitions.contains(partitionId)); - const TPartitionInfo& partition = Partitions.at(partitionId); - PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId); - Send(partition.Actor, new TEvPQ::TEvDeletePartition); + if (writeInfo.Partitions.empty()) { + TryDeleteWriteId(writeId, writeInfo, ActorContext()); + } else { + for (auto& [_, partitionId] : writeInfo.Partitions) { + Y_ABORT_UNLESS(Partitions.contains(partitionId)); + const TPartitionInfo& partition = Partitions.at(partitionId); + PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId); + Send(partition.Actor, new TEvPQ::TEvDeletePartition); + } } writeInfo.Deleting = true; } @@ -5533,7 +5546,7 @@ void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx) } TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(*tx.WriteId, writeInfo); } TString TPersQueue::LogPrefix() const { diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 005a5ae8686c..4837a1a49d51 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -547,7 +547,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx); - void BeginDeletePartitions(TTxWriteInfo& writeInfo); + void BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo); void BeginDeletePartitions(const TDistributedTransaction& tx); bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation, @@ -580,6 +580,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe& writeId) const; void DeleteWriteId(const TMaybe& writeId); + void TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx); void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp index 237469864e0c..0dc6f032c38a 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp @@ -158,6 +158,8 @@ class TFixture : public NUnitTest::TBaseFixture { void DeleteSupportivePartition(const TString& topicName, ui32 partition); + void WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName, + std::uint32_t partition); struct TTableRecord { TTableRecord() = default; @@ -1741,6 +1743,42 @@ void TFixture::SendLongTxLockStatus(const TActorId& actorId, runtime.SendToPipe(tabletId, actorId, event.release()); } +void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName, + std::uint32_t partition) +{ + auto& runtime = Setup->GetRuntime(); + NActors::TActorId edge = runtime.AllocateEdgeActor(); + std::uint64_t tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); + + for (int i = 0; i < 20; ++i) { + auto request = std::make_unique(); + request->Record.SetCookie(12345); + request->Record.AddCmdRead()->SetKey("_txinfo"); + + auto& runtime = Setup->GetRuntime(); + + runtime.SendToPipe(tabletId, edge, request.release()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT(response->Record.HasCookie()); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + + auto& read = response->Record.GetReadResult(0); + + NKikimrPQ::TTabletTxInfo info; + UNIT_ASSERT(info.ParseFromString(read.GetValue())); + + if (info.TxWritesSize() == 0) { + return; + } + + Sleep(TDuration::MilliSeconds(100)); + } + + UNIT_FAIL("TTabletTxInfo.TxWrites is expected to be empty"); +} + void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, ui64 tabletId, const NPQ::TWriteId& writeId) @@ -2301,6 +2339,32 @@ size_t TFixture::GetTableRecordsCount(const TString& tablePath) return parser.ColumnParser(0).GetUint64(); } +Y_UNIT_TEST_F(The_TxWriteInfo_Is_Deleted_After_The_Immediate_Transaction, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession session = CreateTableSession(); + + NTable::TTransaction tx = BeginTx(session); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + RestartPQTablet("topic_A", 0); + CommitTx(tx, EStatus::SUCCESS); + + tx = BeginTx(session); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + RestartPQTablet("topic_A", 0); + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); + + CheckTabletKeys("topic_A"); + + WaitForTheTabletToDeleteTheWriteInfo("topic_A", 0); +} + Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture) { TestWriteToTopic24();