-
Notifications
You must be signed in to change notification settings - Fork 702
[Kafka API] Aborted transactions cleanup #19733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kafka API] Aborted transactions cleanup #19733
Conversation
🟢 |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
ydb/core/persqueue/partition.h
Outdated
// after transaction timeout has passed. | ||
// | ||
// Total time till kafka supportive partition deletion = AppData.KafkaProxyConfig.TransactionTimeoutMs + KAFKA_TRANSACTION_DELETE_DELAY_MS | ||
static const ui32 KAFKA_TRANSACTION_DELETE_DELAY_MS = 60 * 60 * 1000; // 1 hour; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Можно TDuration::Hours(1).MilliSeconds()
. Принципиально хранить в виде целого числа? Можно поменять тип на TDuration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Оно используется именно как число чаще всего, так как обычно над ним перед превращением в Duration делают математические операции (деление / сложение). Поэтому я бы оставил числом
@@ -925,6 +926,11 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info, | |||
if (writeId.IsTopicApiTransaction()) { | |||
SubscribeWriteId(writeId, ctx); | |||
} | |||
|
|||
if (txWrite.GetKafkaTransaction() && txWrite.HasCreatedAt()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Мы защитились feature-флагом от ситуации когда нет одного из значений?
ydb/core/persqueue/pq_impl.cpp
Outdated
}; | ||
|
||
for (auto& pair : TxWrites) { | ||
if (pair.second.KafkaTransaction && txnExpired(pair.second)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Первое условие лишнее? Оно повторно проверяется в txnExpired
} | ||
|
||
TString GetSupportivePartitionKeyFrom() { | ||
return std::string{TKeyPrefix::EServiceType::ServiceTypeData}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Лишнее преобразование из std::string
в TString
. Здесь и ниже.
const auto& result = response->Record.GetReadRangeResult(0); | ||
if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) { | ||
for (ui32 i = 0; i < result.PairSize(); i++) { | ||
supportivePartitionsKeys.emplace_back(result.GetPair(i).GetKey().c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Без вызова c_str()
не компилируется?
ydb/core/protos/config.proto
Outdated
@@ -2305,6 +2305,7 @@ message TKafkaProxyConfig { | |||
optional TProxy Proxy = 7; | |||
optional bool MeteringV2Enabled = 10 [default = true]; | |||
optional bool AuthViaApiKey = 11 [default = true]; | |||
optional uint32 TransactionTimeoutMs = 12 [default = 86400]; // 1 day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Значение по умолчанию надо умножить на 1000.
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
Changelog entry
This PR is part of the "Transactions in Kafka API" epic. It adds functionality to cleanup data for aborted transactions.
Changelog category
Description for reviewers
...