Skip to content

Commit fb0f653

Browse files
authored
[improve][client-c++] Use an atomic state_ instead of the lock to improve performance (apache#16940)
### Motivation Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance. ### Modifications Use an atomic `state_` instead of the lock to improve performance.
1 parent 5262e6c commit fb0f653

File tree

9 files changed

+91
-224
lines changed

9 files changed

+91
-224
lines changed

pulsar-client-cpp/lib/ConsumerImpl.cc

Lines changed: 18 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,7 @@ void ConsumerImpl::start() {
160160
}
161161

162162
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
163-
Lock lock(mutex_);
164-
const auto state = state_;
165-
lock.unlock();
166-
if (state == Closed) {
163+
if (state_ == Closed) {
167164
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
168165
return;
169166
}
@@ -202,7 +199,6 @@ void ConsumerImpl::connectionFailed(Result result) {
202199
ConsumerImplPtr ptr = shared_from_this();
203200

204201
if (consumerCreatedPromise_.setFailed(result)) {
205-
Lock lock(mutex_);
206202
state_ = Failed;
207203
}
208204
}
@@ -274,15 +270,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
274270
void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
275271
LOG_INFO(getName() << "Unsubscribing");
276272

277-
Lock lock(mutex_);
278273
if (state_ != Ready) {
279-
lock.unlock();
280274
callback(ResultAlreadyClosed);
281275
LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
282276
"then call unsubscribe");
283277
return;
284278
}
285279

280+
Lock lock(mutex_);
281+
286282
ClientConnectionPtr cnx = getCnx().lock();
287283
if (cnx) {
288284
LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_);
@@ -303,7 +299,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
303299

304300
void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
305301
if (result == ResultOk) {
306-
Lock lock(mutex_);
307302
state_ = Closed;
308303
LOG_INFO(getName() << "Unsubscribed successfully");
309304
} else {
@@ -750,12 +745,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
750745
Message msg;
751746

752747
// fail the callback if consumer is closing or closed
753-
Lock stateLock(mutex_);
754748
if (state_ != Ready) {
755749
callback(ResultAlreadyClosed, msg);
756750
return;
757751
}
758-
stateLock.unlock();
759752

760753
Lock lock(pendingReceiveMutex_);
761754
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -773,12 +766,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
773766
}
774767

775768
Result ConsumerImpl::receiveHelper(Message& msg) {
776-
{
777-
Lock lock(mutex_);
778-
if (state_ != Ready) {
779-
return ResultAlreadyClosed;
780-
}
769+
if (state_ != Ready) {
770+
return ResultAlreadyClosed;
781771
}
772+
782773
if (messageListener_) {
783774
LOG_ERROR(getName() << "Can not receive when a listener has been set");
784775
return ResultInvalidConfiguration;
@@ -808,11 +799,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
808799
return ResultInvalidConfiguration;
809800
}
810801

811-
{
812-
Lock lock(mutex_);
813-
if (state_ != Ready) {
814-
return ResultAlreadyClosed;
815-
}
802+
if (state_ != Ready) {
803+
return ResultAlreadyClosed;
816804
}
817805

818806
if (messageListener_) {
@@ -824,7 +812,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
824812
messageProcessed(msg);
825813
return ResultOk;
826814
} else {
827-
Lock lock(mutex_);
828815
if (state_ != Ready) {
829816
return ResultAlreadyClosed;
830817
}
@@ -991,13 +978,10 @@ void ConsumerImpl::disconnectConsumer() {
991978
}
992979

993980
void ConsumerImpl::closeAsync(ResultCallback callback) {
994-
Lock lock(mutex_);
995-
996981
// Keep a reference to ensure object is kept alive
997982
ConsumerImplPtr ptr = shared_from_this();
998983

999984
if (state_ != Ready) {
1000-
lock.unlock();
1001985
if (callback) {
1002986
callback(ResultAlreadyClosed);
1003987
}
@@ -1016,7 +1000,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
10161000
ClientConnectionPtr cnx = getCnx().lock();
10171001
if (!cnx) {
10181002
state_ = Closed;
1019-
lock.unlock();
10201003
// If connection is gone, also the consumer is closed on the broker side
10211004
if (callback) {
10221005
callback(ResultOk);
@@ -1027,16 +1010,13 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
10271010
ClientImplPtr client = client_.lock();
10281011
if (!client) {
10291012
state_ = Closed;
1030-
lock.unlock();
10311013
// Client was already destroyed
10321014
if (callback) {
10331015
callback(ResultOk);
10341016
}
10351017
return;
10361018
}
10371019

1038-
// Lock is no longer required
1039-
lock.unlock();
10401020
int requestId = client->newRequestId();
10411021
Future<Result, ResponseData> future =
10421022
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
@@ -1052,9 +1032,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
10521032

10531033
void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
10541034
if (result == ResultOk) {
1055-
Lock lock(mutex_);
10561035
state_ = Closed;
1057-
lock.unlock();
10581036

10591037
ClientConnectionPtr cnx = getCnx().lock();
10601038
if (cnx) {
@@ -1074,22 +1052,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI
10741052
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
10751053

10761054
void ConsumerImpl::shutdown() {
1077-
Lock lock(mutex_);
10781055
state_ = Closed;
1079-
lock.unlock();
10801056

10811057
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
10821058
}
10831059

1084-
bool ConsumerImpl::isClosed() {
1085-
Lock lock(mutex_);
1086-
return state_ == Closed;
1087-
}
1060+
bool ConsumerImpl::isClosed() { return state_ == Closed; }
10881061

1089-
bool ConsumerImpl::isOpen() {
1090-
Lock lock(mutex_);
1091-
return state_ == Ready;
1092-
}
1062+
bool ConsumerImpl::isOpen() { return state_ == Ready; }
10931063

10941064
Result ConsumerImpl::pauseMessageListener() {
10951065
if (!messageListener_) {
@@ -1152,14 +1122,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
11521122
int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
11531123

11541124
void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
1155-
Lock lock(mutex_);
11561125
if (state_ != Ready) {
11571126
LOG_ERROR(getName() << "Client connection is not open, please try again later.")
1158-
lock.unlock();
11591127
callback(ResultConsumerNotInitialized, BrokerConsumerStats());
11601128
return;
11611129
}
11621130

1131+
Lock lock(mutex_);
11631132
if (brokerConsumerStats_.isValid()) {
11641133
LOG_DEBUG(getName() << "Serving data from cache");
11651134
BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1219,16 +1188,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
12191188
}
12201189

12211190
void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
1222-
Lock lock(mutex_);
1223-
if (state_ == Closed || state_ == Closing) {
1224-
lock.unlock();
1191+
const auto state = state_.load();
1192+
if (state == Closed || state == Closing) {
12251193
LOG_ERROR(getName() << "Client connection already closed.");
12261194
if (callback) {
12271195
callback(ResultAlreadyClosed);
12281196
}
12291197
return;
12301198
}
1231-
lock.unlock();
12321199

12331200
this->ackGroupingTrackerPtr_->flushAndClean();
12341201
ClientConnectionPtr cnx = getCnx().lock();
@@ -1252,16 +1219,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
12521219
}
12531220

12541221
void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
1255-
Lock lock(mutex_);
1256-
if (state_ == Closed || state_ == Closing) {
1257-
lock.unlock();
1222+
const auto state = state_.load();
1223+
if (state == Closed || state == Closing) {
12581224
LOG_ERROR(getName() << "Client connection already closed.");
12591225
if (callback) {
12601226
callback(ResultAlreadyClosed);
12611227
}
12621228
return;
12631229
}
1264-
lock.unlock();
12651230

12661231
ClientConnectionPtr cnx = getCnx().lock();
12671232
if (cnx) {
@@ -1325,16 +1290,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
13251290
}
13261291

13271292
void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
1328-
Lock lock(mutex_);
1329-
if (state_ == Closed || state_ == Closing) {
1330-
lock.unlock();
1293+
const auto state = state_.load();
1294+
if (state == Closed || state == Closing) {
13311295
LOG_ERROR(getName() << "Client connection already closed.");
13321296
if (callback) {
13331297
callback(ResultAlreadyClosed, MessageId());
13341298
}
13351299
return;
13361300
}
1337-
lock.unlock();
13381301

13391302
ClientConnectionPtr cnx = getCnx().lock();
13401303
if (cnx) {
@@ -1380,10 +1343,7 @@ void ConsumerImpl::trackMessage(const MessageId& messageId) {
13801343
}
13811344
}
13821345

1383-
bool ConsumerImpl::isConnected() const {
1384-
Lock lock(mutex_);
1385-
return !getCnx().expired() && state_ == Ready;
1386-
}
1346+
bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
13871347

13881348
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
13891349

pulsar-client-cpp/lib/HandlerBase.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
4343
HandlerBase::~HandlerBase() { timer_->cancel(); }
4444

4545
void HandlerBase::start() {
46-
Lock lock(mutex_);
4746
// guard against concurrent state changes such as closing
48-
if (state_ == NotStarted) {
49-
state_ = Pending;
50-
lock.unlock();
51-
47+
State state = NotStarted;
48+
if (state_.compare_exchange_strong(state, Pending)) {
5249
grabCnx();
5350
}
5451
}
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
9794
return;
9895
}
9996

100-
Lock lock(handler->mutex_);
10197
State state = handler->state_;
10298

10399
ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
135131
}
136132

137133
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
138-
if (handler->state_ == Pending || handler->state_ == Ready) {
134+
const auto state = handler->state_.load();
135+
if (state == Pending || state == Ready) {
139136
TimeDuration delay = handler->backoff_.next();
140137

141138
LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)

pulsar-client-cpp/lib/HandlerBase.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class HandlerBase {
105105
Failed
106106
};
107107

108-
State state_;
108+
std::atomic<State> state_;
109109
Backoff backoff_;
110110
uint64_t epoch_;
111111

0 commit comments

Comments
 (0)