Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3322,7 +3322,7 @@ void TPersQueue::ScheduleDeleteExpiredKafkaTransactions() {
for (auto& pair : TxWrites) {
if (txnExpired(pair.second)) {
PQ_LOG_D("Transaction for Kafka producer " << pair.first.KafkaProducerInstanceId << " is expired");
BeginDeletePartitions(pair.second);
BeginDeletePartitions(pair.first, pair.second);
}
}
}
Expand Down Expand Up @@ -5420,7 +5420,7 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
}

PQ_LOG_TX_I("delete partitions for WriteId " << writeId << " (longTxService lost tx)");
BeginDeletePartitions(writeInfo);
BeginDeletePartitions(writeId, writeInfo);
}

void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
Expand Down Expand Up @@ -5470,6 +5470,14 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
DeletePartition(partitionId, ctx);

writeInfo.Partitions.erase(partitionId.OriginalPartitionId);
TryDeleteWriteId(writeId, writeInfo, ctx);
TxWritesChanged = true;

TryWriteTxs(ctx);
}

void TPersQueue::TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx)
{
if (writeInfo.Partitions.empty()) {
if (!writeInfo.KafkaTransaction) {
UnsubscribeWriteId(writeId, ctx);
Expand All @@ -5481,14 +5489,15 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
(tx->State == NKikimrPQ::TTransaction::CANCELED)) {
TryExecuteTxs(ctx, *tx);
}
} else {
// if the transaction is not in Txs, then it is an immediate transaction
DeleteWriteId(writeId);
}
} else if (writeInfo.KafkaTransaction) { // case when kafka transaction haven't even started in KQP, but data for it was already written in partition
} else {
// this is kafka transaction or immediate transaction
DeleteWriteId(writeId);
}
}
TxWritesChanged = true;

TryWriteTxs(ctx);
}

void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&)
Expand All @@ -5508,20 +5517,24 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);

BeginDeletePartitions(writeInfo);
BeginDeletePartitions(writeId, writeInfo);
}

void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
void TPersQueue::BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo)
{
if (writeInfo.Deleting) {
PQ_LOG_TX_D("Already deleting WriteInfo");
return;
}
for (auto& [_, partitionId] : writeInfo.Partitions) {
Y_ABORT_UNLESS(Partitions.contains(partitionId));
const TPartitionInfo& partition = Partitions.at(partitionId);
PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId);
Send(partition.Actor, new TEvPQ::TEvDeletePartition);
if (writeInfo.Partitions.empty()) {
TryDeleteWriteId(writeId, writeInfo, ActorContext());
} else {
for (auto& [_, partitionId] : writeInfo.Partitions) {
Y_ABORT_UNLESS(Partitions.contains(partitionId));
const TPartitionInfo& partition = Partitions.at(partitionId);
PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId);
Send(partition.Actor, new TEvPQ::TEvDeletePartition);
}
}
writeInfo.Deleting = true;
}
Expand All @@ -5533,7 +5546,7 @@ void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
}

TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
BeginDeletePartitions(writeInfo);
BeginDeletePartitions(*tx.WriteId, writeInfo);
}

TString TPersQueue::LogPrefix() const {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);

void BeginDeletePartitions(TTxWriteInfo& writeInfo);
void BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo);
void BeginDeletePartitions(const TDistributedTransaction& tx);

bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
Expand Down Expand Up @@ -580,6 +580,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {

bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
void TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx);

void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;

Expand Down
64 changes: 64 additions & 0 deletions ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class TFixture : public NUnitTest::TBaseFixture {

void DeleteSupportivePartition(const TString& topicName,
ui32 partition);
void WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName,
std::uint32_t partition);

struct TTableRecord {
TTableRecord() = default;
Expand Down Expand Up @@ -1741,6 +1743,42 @@ void TFixture::SendLongTxLockStatus(const TActorId& actorId,
runtime.SendToPipe(tabletId, actorId, event.release());
}

void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName,
std::uint32_t partition)
{
auto& runtime = Setup->GetRuntime();
NActors::TActorId edge = runtime.AllocateEdgeActor();
std::uint64_t tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition);

for (int i = 0; i < 20; ++i) {
auto request = std::make_unique<NKikimr::TEvKeyValue::TEvRequest>();
request->Record.SetCookie(12345);
request->Record.AddCmdRead()->SetKey("_txinfo");

auto& runtime = Setup->GetRuntime();

runtime.SendToPipe(tabletId, edge, request.release());
auto response = runtime.GrabEdgeEvent<NKikimr::TEvKeyValue::TEvResponse>();

UNIT_ASSERT(response->Record.HasCookie());
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345);
UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1);

auto& read = response->Record.GetReadResult(0);

NKikimrPQ::TTabletTxInfo info;
UNIT_ASSERT(info.ParseFromString(read.GetValue()));

if (info.TxWritesSize() == 0) {
return;
}

Sleep(TDuration::MilliSeconds(100));
}

UNIT_FAIL("TTabletTxInfo.TxWrites is expected to be empty");
}

void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId,
ui64 tabletId,
const NPQ::TWriteId& writeId)
Expand Down Expand Up @@ -2301,6 +2339,32 @@ size_t TFixture::GetTableRecordsCount(const TString& tablePath)
return parser.ColumnParser(0).GetUint64();
}

Y_UNIT_TEST_F(The_TxWriteInfo_Is_Deleted_After_The_Immediate_Transaction, TFixture)
{
CreateTopic("topic_A");

NTable::TSession session = CreateTableSession();

NTable::TTransaction tx = BeginTx(session);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
RestartPQTablet("topic_A", 0);
CommitTx(tx, EStatus::SUCCESS);

tx = BeginTx(session);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
RestartPQTablet("topic_A", 0);
CommitTx(tx, EStatus::SUCCESS);

auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2");

CheckTabletKeys("topic_A");

WaitForTheTabletToDeleteTheWriteInfo("topic_A", 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture)
{
TestWriteToTopic24();
Expand Down
Loading