From 61916d18522b5a31d0094766b29cb37ea16b4f73 Mon Sep 17 00:00:00 2001 From: k2la Date: Mon, 9 Mar 2020 16:45:37 +0900 Subject: [PATCH] [pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498) ### Motivation Because of #6391 , acked messages were counted as unacked messages. Although messages from brokers were acknowledged, the following log was output. ``` 2020-03-06 19:44:51.790 INFO ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650] my-message-0: Fri Mar 6 19:45:05 2020 my-message-1: Fri Mar 6 19:45:05 2020 my-message-2: Fri Mar 6 19:45:05 2020 2020-03-06 19:45:15.818 INFO UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time ``` This behavior happened on master branch. (cherry picked from commit 67f8cf30d33f3cb7e8d9309cadb2d80626dd25bc) --- .../lib/UnAckedMessageTrackerEnabled.cc | 14 +++++++++----- .../lib/UnAckedMessageTrackerEnabled.h | 3 +-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 7894e64a874f6..9185dba70bb3b 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -90,8 +90,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long bool UnAckedMessageTrackerEnabled::add(const MessageId& m) { std::lock_guard acquire(lock_); if (messageIdPartitionMap.count(m) == 0) { - bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second; - return insert && timePartitions.back().insert(m).second; + std::set& partition = timePartitions.back(); + bool emplace = messageIdPartitionMap.emplace(m, partition).second; + bool insert = partition.insert(m).second; + return emplace && insert; } return false; } @@ -104,7 +106,8 @@ bool UnAckedMessageTrackerEnabled::isEmpty() { bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) { std::lock_guard acquire(lock_); bool removed = false; - std::map>::iterator exist = messageIdPartitionMap.find(m); + + std::map&>::iterator exist = messageIdPartitionMap.find(m); if (exist != messageIdPartitionMap.end()) { removed = exist->second.erase(m); } @@ -121,7 +124,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { MessageId msgIdInMap = it->first; if (msgIdInMap < msgId) { - std::map>::iterator exist = messageIdPartitionMap.find(msgId); + std::map&>::iterator exist = messageIdPartitionMap.find(msgId); if (exist != messageIdPartitionMap.end()) { exist->second.erase(msgId); } @@ -135,7 +138,8 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { MessageId msgIdInMap = it->first; if (msgIdInMap.getTopicName().compare(topic) == 0) { - std::map>::iterator exist = messageIdPartitionMap.find(msgIdInMap); + std::map&>::iterator exist = + messageIdPartitionMap.find(msgIdInMap); if (exist != messageIdPartitionMap.end()) { exist->second.erase(msgIdInMap); } diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h index c2b4012adb184..9195b30d70d69 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h @@ -23,7 +23,6 @@ #include namespace pulsar { - class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { public: ~UnAckedMessageTrackerEnabled(); @@ -41,7 +40,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { void timeoutHandlerHelper(); bool isEmpty(); long size(); - std::map> messageIdPartitionMap; + std::map&> messageIdPartitionMap; std::deque> timePartitions; std::mutex lock_; DeadlineTimerPtr timer_;