Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/tm_stm: fix unboundedness of _pid_tx_id #18198

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/v/cluster/tests/tm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ FIXTURE_TEST(test_tm_stm_re_tx, tm_stm_test_fixture) {
tx_id,
std::chrono::milliseconds(0),
pid2,
expected_pid)
expected_pid,
pid1)
.get0();
BOOST_REQUIRE_EQUAL(op_code, op_status::success);
auto tx7 = expect_tx(stm.get_tx(tx_id).get0());
Expand Down
16 changes: 10 additions & 6 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ tm_stm::tm_stm(
: raft::persisted_stm<>(tm_stm_snapshot, logger, c)
, _sync_timeout(config::shard_local_cfg().tm_sync_timeout_ms.value())
, _transactional_id_expiration(
config::shard_local_cfg().transactional_id_expiration_ms.value())
config::shard_local_cfg().transactional_id_expiration_ms.bind())
, _feature_table(feature_table)
, _cache(tm_stm_cache)
, _ctx_log(logger, ssx::sformat("[{}]", _raft->ntp())) {}
Expand Down Expand Up @@ -527,14 +527,16 @@ ss::future<tm_stm::op_status> tm_stm::re_register_producer(
kafka::transactional_id tx_id,
std::chrono::milliseconds transaction_timeout_ms,
model::producer_identity pid,
model::producer_identity last_pid) {
model::producer_identity last_pid,
model::producer_identity rolled_pid) {
vlog(
_ctx_log.trace,
"[tx_id={}] Registering existing transaction with new pid: {}, previous "
"pid: {}",
"pid: {}, rolled_pid: {}",
tx_id,
pid,
last_pid);
last_pid,
rolled_pid);

auto tx_opt = co_await get_tx(tx_id);
if (!tx_opt.has_value()) {
Expand All @@ -558,6 +560,7 @@ ss::future<tm_stm::op_status> tm_stm::re_register_producer(
if (!r.has_value()) {
co_return tm_stm::op_status::unknown;
}
_pid_tx_id.erase(rolled_pid);
co_return tm_stm::op_status::success;
}

Expand Down Expand Up @@ -923,6 +926,7 @@ tm_stm::apply_tm_update(model::record_batch_header hdr, model::record_batch b) {
}

_cache->set_log(tx);
_pid_tx_id.erase(tx.last_pid);
_pid_tx_id[tx.pid] = tx.id;

return ss::now();
Expand All @@ -940,7 +944,7 @@ ss::future<> tm_stm::apply(const model::record_batch& b) {

bool tm_stm::is_expired(const tm_transaction& tx) {
auto now_ts = clock_type::now();
return _transactional_id_expiration < now_ts - tx.last_update_ts;
return _transactional_id_expiration() < now_ts - tx.last_update_ts;
}

ss::lw_shared_ptr<mutex> tm_stm::get_tx_lock(kafka::transactional_id tid) {
Expand Down Expand Up @@ -977,7 +981,7 @@ tm_stm::try_lock_tx(kafka::transactional_id tx_id, std::string_view lock_name) {
absl::btree_set<kafka::transactional_id> tm_stm::get_expired_txs() {
auto now_ts = clock_type::now();
auto ids = _cache->filter_all_txid_by_tx([this, now_ts](auto tx) {
return _transactional_id_expiration < now_ts - tx.last_update_ts;
return _transactional_id_expiration() < now_ts - tx.last_update_ts;
});
return ids;
}
Expand Down
11 changes: 8 additions & 3 deletions src/v/cluster/tm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,17 @@ class tm_stm final : public raft::persisted_stm<> {
mark_tx_prepared(model::term_id, kafka::transactional_id);
ss::future<checked<tm_transaction, tm_stm::op_status>>
mark_tx_killed(model::term_id, kafka::transactional_id);
// todo: cleanup last_pid and rolled_pid. It seems like they are doing
// the same thing but in practice they are not. last_pid is not updated
// in all cases whereas rolled_pid is need to cleanup all the state
// from previous epochs.
ss::future<tm_stm::op_status> re_register_producer(
model::term_id,
kafka::transactional_id,
std::chrono::milliseconds,
model::producer_identity,
model::producer_identity);
model::producer_identity pid_to_register,
model::producer_identity last_pid,
model::producer_identity rolled_pid);
ss::future<tm_stm::op_status> register_new_producer(
model::term_id,
kafka::transactional_id,
Expand Down Expand Up @@ -363,7 +368,7 @@ class tm_stm final : public raft::persisted_stm<> {
ss::future<raft::stm_snapshot> take_local_snapshot() override;

std::chrono::milliseconds _sync_timeout;
std::chrono::milliseconds _transactional_id_expiration;
config::binding<std::chrono::milliseconds> _transactional_id_expiration;
absl::flat_hash_map<model::producer_identity, kafka::transactional_id>
_pid_tx_id;
absl::flat_hash_map<kafka::transactional_id, ss::lw_shared_ptr<mutex>>
Expand Down
30 changes: 28 additions & 2 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ tx_gateway_frontend::tx_gateway_frontend(
, _metadata_dissemination_retry_delay_ms(
config::shard_local_cfg().metadata_dissemination_retry_delay_ms.value())
, _transactional_id_expiration(
config::shard_local_cfg().transactional_id_expiration_ms.value())
config::shard_local_cfg().transactional_id_expiration_ms.bind())
, _transactions_enabled(config::shard_local_cfg().enable_transactions.value())
, _max_transactions_per_coordinator(max_transactions_per_coordinator) {
/**
Expand All @@ -327,6 +327,8 @@ tx_gateway_frontend::tx_gateway_frontend(
if (_transactions_enabled) {
start_expire_timer();
}
_transactional_id_expiration.watch(
[this]() { rearm_expire_timer(/*force=*/true); });
}

void tx_gateway_frontend::start_expire_timer() {
Expand All @@ -344,6 +346,23 @@ void tx_gateway_frontend::start_expire_timer() {
rearm_expire_timer();
}

void tx_gateway_frontend::rearm_expire_timer(bool force) {
if (ss::this_shard_id() != 0 || _gate.is_closed()) {
return;
}
if (force) {
_expire_timer.cancel();
}
if (!_expire_timer.armed()) {
// we need to expire transactional ids which were inactive more than
// transactional_id_expiration period. if we check for the expired
// transactions twice during the period then in the worst case an
// expired id lives at most 1.5 x transactional_id_expiration
auto delay = _transactional_id_expiration() / 2;
_expire_timer.arm(model::timeout_clock::now() + delay);
}
}

ss::future<> tx_gateway_frontend::stop() {
vlog(txlog.debug, "Asked to stop tx coordinator");
_expire_timer.cancel();
Expand Down Expand Up @@ -1297,6 +1316,13 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::do_init_tm_tx(

tx = r.value();
init_tm_tx_reply reply;
// note: while rolled_pid and last_pid look very similar in intent which is
// to track previous incarnation of this transaction_id, it doesn't seem to
// work like that in practice. last_pid is not set in all cases (refer to
// kip-360 for details) whereas we want to cleanup older epochs state in all
// cases, hence a separate rolled_pid was added. This is definitely not
// ideal, probably needs a closer look.
model::producer_identity rolled_pid = tx.pid;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use last_pid to cleanup overwritten producer state here ? It seems that it has exactly the same purpose as the rolled_pid

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I couldn't quite grok the difference between pid, last_pid and rolled_pid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As chatted offline, last_pid seems somewhat sketchy and not updated in all cases, added a clarifying comment.

model::producer_identity last_pid = model::unknown_pid;

if (expected_pid == model::unknown_pid) {
Expand Down Expand Up @@ -1334,7 +1360,7 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::do_init_tm_tx(
reply.pid,
last_pid);
auto op_status = co_await stm->re_register_producer(
term, tx.id, transaction_timeout_ms, reply.pid, last_pid);
term, tx.id, transaction_timeout_ms, reply.pid, last_pid, rolled_pid);
if (op_status == tm_stm::op_status::success) {
reply.ec = tx_errc::none;
} else if (op_status == tm_stm::op_status::conflict) {
Expand Down
13 changes: 2 additions & 11 deletions src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class tx_gateway_frontend final
int16_t _metadata_dissemination_retries;
std::chrono::milliseconds _metadata_dissemination_retry_delay_ms;
ss::timer<model::timeout_clock> _expire_timer;
std::chrono::milliseconds _transactional_id_expiration;
config::binding<std::chrono::milliseconds> _transactional_id_expiration;
bool _transactions_enabled;
config::binding<uint64_t> _max_transactions_per_coordinator;

Expand All @@ -116,16 +116,7 @@ class tx_gateway_frontend final

void start_expire_timer();

void rearm_expire_timer() {
if (!_expire_timer.armed() && !_gate.is_closed()) {
// we need to expire transactional ids which were inactive more than
// transactional_id_expiration period. if we check for the expired
// transactions twice during the period then in the worst case an
// expired id lives at most 1.5 x transactional_id_expiration
auto delay = _transactional_id_expiration / 2;
_expire_timer.arm(model::timeout_clock::now() + delay);
}
}
void rearm_expire_timer(bool force = false);

ss::future<std::optional<model::node_id>>
wait_for_leader(const model::ntp&);
Expand Down
2 changes: 1 addition & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ configuration::configuration()
"transactional_id_expiration_ms",
"Producer ids are expired once this time has elapsed after the last "
"write with the given producer id.",
{.visibility = visibility::user},
{.needs_restart = needs_restart::no, .visibility = visibility::user},
10080min)
, max_concurrent_producer_ids(
*this,
Expand Down
44 changes: 44 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import random

from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError

from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin
Expand Down Expand Up @@ -357,6 +358,49 @@ def change_static_member_test(self):

producer.abort_transaction()

@cluster(num_nodes=3)
def transaction_id_expiration_test(self):
admin = Admin(self.redpanda)
rpk = RpkTool(self.redpanda)
# Create an open transaction.
producer = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
'transaction.timeout.ms': 3600000, # to avoid timing out
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(self.output_t.name, "x", "y")
producer.flush()

# Default transactional id expiration is 7d, so the transaction
# should be hung.
def no_running_transactions():
return len(admin.get_all_transactions()) == 0

wait_timeout_s = 20
try:
wait_until(no_running_transactions,
timeout_sec=wait_timeout_s,
backoff_sec=2,
err_msg="Transactions still running")
assert False, "No running transactions found."
except TimeoutError as e:
assert "Transactions still running" in str(e)

# transaction should be aborted.
rpk.cluster_config_set("transactional_id_expiration_ms", 5000)
wait_until(no_running_transactions,
timeout_sec=wait_timeout_s,
backoff_sec=2,
err_msg="Transactions still running")

try:
producer.commit_transaction()
assert False, "transaction should have been aborted by now."
except ck.KafkaException:
pass

@cluster(num_nodes=3)
def expired_tx_test(self):
# confluent_kafka client uses the same timeout both for init_transactions
Expand Down
Loading