diff --git a/ydb/core/persqueue/pqtablet/pq_impl.cpp b/ydb/core/persqueue/pqtablet/pq_impl.cpp index e90174e5751c..4e84852b0150 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.cpp +++ b/ydb/core/persqueue/pqtablet/pq_impl.cpp @@ -3040,7 +3040,7 @@ void TPersQueue::ScheduleDeleteExpiredKafkaTransactions() { for (auto& pair : TxWrites) { if (txnExpired(pair.second)) { PQ_LOG_D("Transaction for Kafka producer " << pair.first.KafkaProducerInstanceId << " is expired"); - BeginDeletePartitions(pair.second); + BeginDeletePartitions(pair.first, pair.second); } } } @@ -5087,7 +5087,7 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e } PQ_LOG_TX_I("delete partitions for WriteId " << writeId << " (longTxService lost tx)"); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(writeId, writeInfo); } void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) @@ -5153,6 +5153,14 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon DeletePartition(partitionId, ctx); writeInfo.Partitions.erase(partitionId.OriginalPartitionId); + TryDeleteWriteId(writeId, writeInfo, ctx); + TxWritesChanged = true; + + TryWriteTxs(ctx); +} + +void TPersQueue::TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx) +{ if (writeInfo.Partitions.empty()) { if (!writeInfo.KafkaTransaction) { UnsubscribeWriteId(writeId, ctx); @@ -5164,14 +5172,15 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon (tx->State == NKikimrPQ::TTransaction::CANCELED)) { TryExecuteTxs(ctx, *tx); } + } else { + // if the transaction is not in Txs, then it is an immediate transaction + DeleteWriteId(writeId); } - } else if (writeInfo.KafkaTransaction) { // case when kafka transaction haven't even started in KQP, but data for it was already written in partition + } else { + // this is kafka transaction or immediate transaction DeleteWriteId(writeId); } } - TxWritesChanged = true; - - TryWriteTxs(ctx); } void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&) @@ -5189,20 +5198,24 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo TTxWriteInfo& writeInfo = TxWrites.at(writeId); PQ_ENSURE(writeInfo.Partitions.size() == 1); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(writeId, writeInfo); } -void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo) +void TPersQueue::BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo) { if (writeInfo.Deleting) { PQ_LOG_TX_D("Already deleting WriteInfo"); return; } - for (auto& [_, partitionId] : writeInfo.Partitions) { - PQ_ENSURE(Partitions.contains(partitionId)); - const TPartitionInfo& partition = Partitions.at(partitionId); - PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId); - Send(partition.Actor, new TEvPQ::TEvDeletePartition); + if (writeInfo.Partitions.empty()) { + TryDeleteWriteId(writeId, writeInfo, ActorContext()); + } else { + for (auto& [_, partitionId] : writeInfo.Partitions) { + PQ_ENSURE(Partitions.contains(partitionId)); + const TPartitionInfo& partition = Partitions.at(partitionId); + PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId); + Send(partition.Actor, new TEvPQ::TEvDeletePartition); + } } writeInfo.Deleting = true; } @@ -5214,7 +5227,7 @@ void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx) } TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId); - BeginDeletePartitions(writeInfo); + BeginDeletePartitions(*tx.WriteId, writeInfo); } TString TPersQueue::LogPrefix() const { diff --git a/ydb/core/persqueue/pqtablet/pq_impl.h b/ydb/core/persqueue/pqtablet/pq_impl.h index bafc37ac98fd..822aa743cf5b 100644 --- a/ydb/core/persqueue/pqtablet/pq_impl.h +++ b/ydb/core/persqueue/pqtablet/pq_impl.h @@ -549,7 +549,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx); - void BeginDeletePartitions(TTxWriteInfo& writeInfo); + void BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo); void BeginDeletePartitions(const TDistributedTransaction& tx); bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation, @@ -582,6 +582,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe& writeId) const; void DeleteWriteId(const TMaybe& writeId); + void TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx); void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp index 457d440cbf83..3be83bed5fb8 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp @@ -167,6 +167,8 @@ class TFixture : public NUnitTest::TBaseFixture { void DeleteSupportivePartition(const std::string& topicName, std::uint32_t partition); + void WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName, + std::uint32_t partition); struct TTableRecord { TTableRecord() = default; @@ -1544,6 +1546,42 @@ void TFixture::SendLongTxLockStatus(const TActorId& actorId, runtime.SendToPipe(tabletId, actorId, event.release()); } +void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName, + std::uint32_t partition) +{ + auto& runtime = Setup->GetRuntime(); + NActors::TActorId edge = runtime.AllocateEdgeActor(); + std::uint64_t tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); + + for (int i = 0; i < 20; ++i) { + auto request = std::make_unique(); + request->Record.SetCookie(12345); + request->Record.AddCmdRead()->SetKey("_txinfo"); + + auto& runtime = Setup->GetRuntime(); + + runtime.SendToPipe(tabletId, edge, request.release()); + auto response = runtime.GrabEdgeEvent(); + + UNIT_ASSERT(response->Record.HasCookie()); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); + UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1); + + auto& read = response->Record.GetReadResult(0); + + NKikimrPQ::TTabletTxInfo info; + UNIT_ASSERT(info.ParseFromString(read.GetValue())); + + if (info.TxWritesSize() == 0) { + return; + } + + std::this_thread::sleep_for(100ms); + } + + UNIT_FAIL("TTabletTxInfo.TxWrites is expected to be empty"); +} + void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, std::uint64_t tabletId, const NPQ::TWriteId& writeId) @@ -2091,6 +2129,32 @@ std::size_t TFixture::GetTableRecordsCount(const std::string& tablePath) return parser.ColumnParser(0).GetUint64(); } +Y_UNIT_TEST_F(The_TxWriteInfo_Is_Deleted_After_The_Immediate_Transaction, TFixtureTable) +{ + CreateTopic("topic_A"); + + auto session = CreateSession(); + + auto tx = session->BeginTx(); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", tx.get()); + RestartPQTablet("topic_A", 0); + session->CommitTx(*tx, EStatus::SUCCESS); + + tx = session->BeginTx(); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", tx.get()); + RestartPQTablet("topic_A", 0); + session->CommitTx(*tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1"); + UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); + + CheckTabletKeys("topic_A"); + + WaitForTheTabletToDeleteTheWriteInfo("topic_A", 0); +} + Y_UNIT_TEST_F(WriteToTopic_Demo_24_Table, TFixtureTable) { TestWriteToTopic24();