diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 4430b436232c..b8f5f1f8741d 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -3199,12 +3199,23 @@ ss::future<> rm_stm::clear_old_tx_pids() { fragmented_vector pids_for_delete; for (auto [id, epoch] : _log_state.fence_pid_epoch) { - auto pid = model::producer_identity(id, epoch); // If pid is not inside tx_seqs it means we do not have transaction for // it right now - if (!_log_state.current_txes.contains(pid)) { - pids_for_delete.push_back(pid); + auto pid = model::producer_identity(id, epoch); + if (_log_state.current_txes.contains(pid)) { + continue; + } + pids_for_delete.push_back(pid); + + auto lock_it = _tx_locks.find(id); + // If there is no lock or the lock is not being held (presumably by + // another epoch of the same producer ID), we can cleanup entry in the + // map. + if (lock_it == _tx_locks.end() || !lock_it->second->ready()) { + continue; } + lock_it->second->broken(); + _tx_locks.erase(lock_it); } vlog(