diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index 65bd3b836a..a77cedd568 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -74,8 +74,8 @@ public class TransactionStateManager { @VisibleForTesting protected final Set leavingPartitions = Sets.newHashSet(); - private final Map>> txnLogProducerMap = Maps.newHashMap(); - private final Map>> txnLogReaderMap = Maps.newHashMap(); + private final Map>> txnLogProducerMap = Maps.newConcurrentMap(); + private final Map>> txnLogReaderMap = Maps.newConcurrentMap(); // Transaction metadata cache indexed by assigned transaction topic partition ids // Map >