Skip to content

Commit 03ee31b

Browse files
committed
Comms cleanup + address TSAN complaint
Signed-off-by: Tom Flynn <tom.flynn@gmail.com> Change-Id: Ib8195fde93061a1ec36bd5557dc9c16e2313d4f4
1 parent e247043 commit 03ee31b

File tree

3 files changed

+18
-142
lines changed

3 files changed

+18
-142
lines changed

libopflex/comms/CommunicationPeer.cpp

Lines changed: 9 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -332,17 +332,10 @@ void CommunicationPeer::readBufferZ(char const * buffer, size_t nread) {
332332
chunk_size = readChunk(buffer);
333333
nread -= chunk_size++;
334334

335-
VLOG(6)
336-
<< "nread="
337-
<< nread
338-
<< " chunk_size="
339-
<< chunk_size
340-
;
335+
VLOG(6) << "nread=" << nread << " chunk_size=" << chunk_size;
341336

342337
if(!nread) {
343-
344338
break;
345-
346339
}
347340

348341
buffer += chunk_size;
@@ -360,54 +353,10 @@ void CommunicationPeer::readBufferZ(char const * buffer, size_t nread) {
360353
}
361354
}
362355

363-
std::ostream& operator << (
364-
std::ostream& os,
365-
std::vector<iovec> const & iov) {
366-
367-
os << "INLINE_DUMP\n";
368-
369-
for (size_t i = 0; i < iov.size(); ++i) {
370-
371-
os
372-
<< "("
373-
<< i
374-
<< "@"
375-
<< std::hex
376-
<< iov[i].iov_base
377-
<< std::dec
378-
<< "+"
379-
<< iov[i].iov_len
380-
;
381-
382-
if (VLOG_IS_ON(7)) {
383-
size_t len = iov[i].iov_len;
384-
size_t offset = 0;
385-
std::string temp((const char*)iov[i].iov_base, len);
386-
387-
do {
388-
os
389-
<< "("
390-
<< temp.c_str() + offset
391-
<< ")"
392-
;
393-
offset += strlen(temp.c_str()) + 1;
394-
} while (len > offset);
395-
}
396-
397-
os << ")";
398-
399-
}
400-
return os;
401-
}
402-
403356
void CommunicationPeer::onWrite() {
404-
405357
transport_.callbacks_->onSent_(this);
406-
407358
pendingBytes_ = 0;
408-
409359
write(); /* kick the can */
410-
411360
}
412361

413362
int CommunicationPeer::write() {
@@ -491,54 +440,31 @@ void CommunicationPeer::timeout() {
491440

492441
if (uvRefCnt_ == 1) {
493442
/* we already have a pending close */
494-
VLOG(4)
495-
<< this
496-
<< " Already closing"
497-
;
443+
VLOG(4) << this << " Already closing";
498444
return;
499445
}
500446

501-
if (rtt <= (keepAliveInterval_ >> 3) ) {
502-
503-
VLOG(5)
504-
<< this
505-
<< " still waiting"
506-
;
507-
447+
if (rtt <= (keepAliveInterval_ >> 3u) ) {
448+
VLOG(5) << this << " still waiting";
508449
return;
509-
510450
}
511451

512-
if (rtt > (keepAliveInterval_ << 3) ) {
513-
514-
LOG(WARNING)
515-
<< this
516-
<< " tearing down the connection upon timeout"
517-
;
518-
452+
if (rtt > (keepAliveInterval_ << 3u) ) {
453+
LOG(WARNING) << this << " tearing down the connection upon timeout";
519454
/* close the connection and hope for the best */
520455
this->onDisconnect();
521456
return;
522457
}
523458

524459
/* send echo request */
525-
VLOG(5)
526-
<< this
527-
<< " sending a ping for keep-alive"
528-
;
529-
460+
VLOG(5) << this << " sending a ping for keep-alive";
530461
sendEchoReq();
531462
}
532463

533464
int comms::internal::CommunicationPeer::choke() const {
534465

535466
if (choked_) {
536-
537-
LOG(WARNING)
538-
<< this
539-
<< " already choked"
540-
;
541-
467+
LOG(WARNING) << this << " already choked";
542468
return 0;
543469
}
544470

@@ -559,24 +485,15 @@ int comms::internal::CommunicationPeer::choke() const {
559485
const_cast<CommunicationPeer *>(this)->onDisconnect();
560486

561487
} else {
562-
563488
choked_ = 1;
564-
565489
}
566490

567491
return rc;
568-
569492
}
570493

571494
int comms::internal::CommunicationPeer::unchoke() const {
572-
573495
if (!choked_) {
574-
575-
LOG(WARNING)
576-
<< this
577-
<< " already unchoked"
578-
;
579-
496+
LOG(WARNING) << this << " already unchoked";
580497
return 0;
581498
}
582499

@@ -599,13 +516,10 @@ int comms::internal::CommunicationPeer::unchoke() const {
599516
const_cast<CommunicationPeer *>(this)->onDisconnect();
600517

601518
} else {
602-
603519
choked_ = 0;
604-
605520
}
606521

607522
return rc;
608-
609523
}
610524

611525
yajr::rpc::InboundMessage * comms::internal::CommunicationPeer::parseFrame() {

libopflex/comms/loopdata.cpp

Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,11 @@ void internal::Peer::LoopData::onPrepareLoop() {
4242
uv_walk(prepare_.loop, walkAndCountHandlesCb, &countHandle);
4343

4444
if (countHandle.counter) {
45-
LOG(INFO)
46-
<< "Still waiting on "
47-
<< countHandle.counter
48-
<< " handles"
49-
;
50-
45+
LOG(INFO) << "Still waiting on " << countHandle.counter << " handles";
5146
return;
5247
}
5348

54-
LOG(INFO)
55-
<< this
56-
<< " Stopping and closing loop watcher"
57-
;
49+
LOG(INFO) << this << " Stopping and closing loop watcher";
5850
uv_prepare_stop(&prepare_);
5951
uv_close((uv_handle_t*)&prepare_, &fini);
6052

@@ -78,19 +70,12 @@ void internal::Peer::LoopData::onPrepareLoop() {
7870
if (peers[RETRY_TO_CONNECT].begin() !=
7971
peers[RETRY_TO_CONNECT].end()) {
8072

81-
LOG(INFO)
82-
<< "retrying first RETRY_TO_CONNECT peer"
83-
;
73+
LOG(INFO) << "retrying first RETRY_TO_CONNECT peer";
8474

8575
/* retry just the first active peer in the queue */
8676
peers[RETRY_TO_CONNECT]
8777
.erase_and_dispose(peers[RETRY_TO_CONNECT].begin(), RetryPeer());
8878

89-
VLOG(4)
90-
<< " Stopping prepareAgain_ @"
91-
<< reinterpret_cast<void *>(&prepareAgain_)
92-
;
93-
9479
uv_timer_stop(&prepareAgain_);
9580
/* if there are more, we will uv_timer_start() down below */
9681
}
@@ -126,30 +111,20 @@ void internal::Peer::LoopData::onPrepareLoop() {
126111
}
127112

128113
void internal::Peer::LoopData::onPrepareLoop(uv_prepare_t * h) {
129-
130114
static_cast< ::yajr::comms::internal::Peer::LoopData *>(h->data)
131115
->onPrepareLoop();
132-
133116
}
134117

135118
void internal::Peer::LoopData::fini(uv_handle_t * h) {
136119
LOG(INFO);
137-
138120
delete static_cast< ::yajr::comms::internal::Peer::LoopData *>(h->data);
139121
}
140122

141123
void internal::Peer::LoopData::destroy(bool now) {
142124
LOG(INFO);
143-
144-
assert(prepare_.data == this);
145-
146125
assert(!destroying_);
147126
if (destroying_) {
148-
LOG(WARNING)
149-
<< this
150-
<< " Double destroy() detected"
151-
;
152-
127+
LOG(WARNING) << this << " Double destroy() detected";
153128
return;
154129
}
155130

@@ -240,13 +215,8 @@ void Peer::LoopData::RetryPeer::operator () (Peer *peer)
240215

241216
void Peer::LoopData::PeerDeleter::operator () (Peer *peer)
242217
{
243-
VLOG(1)
244-
<< peer
245-
<< " deleting abruptedly"
246-
;
247-
218+
VLOG(1) << peer << " deleting abruptedly";
248219
assert(!"peers should never get deleted this way");
249-
250220
delete peer;
251221
}
252222

@@ -276,13 +246,7 @@ void Peer::LoopData::down() {
276246

277247
Peer::LoopData::~LoopData() {
278248

279-
VLOG(1)
280-
<< "Delete on Loop"
281-
;
282-
VLOG(1)
283-
<< this
284-
<< " is being deleted"
285-
;
249+
VLOG(1) << this << " is being deleted" ;
286250

287251
for (size_t i=0; i < Peer::LoopData::TOTAL_STATES; ++i) {
288252
assert(peers[Peer::LoopData::PeerState(i)].empty());
@@ -294,10 +258,7 @@ Peer::LoopData::~LoopData() {
294258

295259
void Peer::LoopData::PeerDisposer::operator () (Peer *peer)
296260
{
297-
LOG(INFO)
298-
<< peer
299-
<< " destroy() because this communication thread is shutting down"
300-
;
261+
LOG(INFO) << peer << " destroy() because this communication thread is shutting down";
301262
peer->destroy(now_);
302263
}
303264

libopflex/include/opflex/yajr/internal/comms.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <rapidjson/document.h>
2323
#include <uv.h>
2424

25+
#include <boost/atomic.hpp>
2526
#include <boost/intrusive/list.hpp>
2627

2728
#include <sstream> /* for basic_stringstream<> */
@@ -423,7 +424,7 @@ class Peer : public SafeListBaseHook {
423424
*/
424425
::yajr::Peer::UvLoopSelector uvLoopSelector_;
425426
/** reference count */
426-
unsigned int uvRefCnt_;
427+
boost::atomic<unsigned int> uvRefCnt_;
427428
/** Is this peer connected */
428429
unsigned char connected_ :1;
429430
/** Is the peer begin destroyed */

0 commit comments

Comments
 (0)