Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure the wakePend flag does not incur undefined behavior. #170

Merged
merged 3 commits into from
Nov 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/XrdSys/XrdSysAtomics.hh
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,24 @@
#define AtomicZAP(x) x = 0
#endif
#endif

/*
* The following definitions give a mechanism for using C++ atomics
* (when using at least C++11). *Note* that these can't be relied
* on for correct behavior as they are non-atomic for C++03 compilers.
*
* Only use them for standards correctness (eliminating C++11 undefined
* behavior).
*/
#if __cplusplus >= 201103L
#include <atomic>
#define CPP_ATOMIC_LOAD(x, order) x.load(order)
#define CPP_ATOMIC_STORE(x, val, order) x.store(val, order)
#define CPP_ATOMIC_TYPE(kind) std::atomic<kind>
#else
#define CPP_ATOMIC_LOAD(x, order) x
#define CPP_ATOMIC_STORE(x, val, order) x = val
#define CPP_ATOMIC_TYPE(kind) kind
#endif


19 changes: 15 additions & 4 deletions src/XrdSys/XrdSysIOEvents.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
// that we cannot hold the channel mutex for this call because it may wait.
//
if (isLocked) chMutex.UnLock();
if (retval && !(chPollXQ->wakePend) && setTO && isLocked) chPollXQ->WakeUp();
bool isWakePend = CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();

// All done
//
Expand Down Expand Up @@ -1142,7 +1143,7 @@ int XrdSys::IOEvents::Poller::TmoGet()

// Return the value
//
wakePend = false;
CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
toMutex.UnLock();
return wtval;
}
Expand All @@ -1158,9 +1159,19 @@ void XrdSys::IOEvents::Poller::WakeUp()
// Send it off to wakeup the poller thread, but only if here is no wakeup in
// progress.
//
// We use a mutex here because we want to produce a synchronization point - all
// threads that might be interested timeouts and wakeups are going to incur a
// cache bounce for the page where wakePend resides; they will see a consistent
// view of the wakePend flag. For those threads, this is equivalent to
// an atomic with memory_order std::memory_order_seq_cst (the strongest ordering).
// However, the threads that are not interested in timeouts will not get a flush
// for their copy of the wakePend page. They will still have the weaker memory
// ordering of consume/release (which is guaranteed anyway on all current architectures
// except for DEC Alpha).
toMutex.Lock();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what this lock is supposed to do in the first place!

if (wakePend) toMutex.UnLock();
else {wakePend = true;
bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I eliminated the toMute locking here - I can't figure out what it'd be useful for.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Brian,

That bit of code that was eliminated makes it much more probable that
multiple duplicate messages will be sent. The lock is there to prevent
duplicate messages while we are trying to figure out whether or not the
poller thread is awake. It's all about time causality which is sort of
wierd in threaded code. So, while it doesn't appear to make sense it winds
up being crucial because we are trying to constrain two independent
events (i.e. sending the message and checking if we should send the
message - they are actually independant in a co-dependent way).

Andy

On Thu, 20 Nov 2014, Brian Bockelman wrote:

@@ -1158,12 +1159,11 @@ void XrdSys::IOEvents::Poller::WakeUp()
// Send it off to wakeup the poller thread, but only if here is no wakeup in
// progress.
//

  • toMutex.Lock();
  • if (wakePend) toMutex.UnLock();
  •  else {wakePend = true;
    
  •        toMutex.UnLock();
    
  •        SendCmd(cmdbuff);
    
  •       }
    
  • bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);

Note I eliminated the toMute locking here - I can't figure out what it'd be useful for.


Reply to this email directly or view it on GitHub:
https://github.com/xrootd/xrootd/pull/170/files#r20694995

########################################################################
Use REPLY-ALL to reply to list

To unsubscribe from the XROOTD-DEV list, click the following link:
https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=XROOTD-DEV&A=1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll add it back with a distilled version of your above comment.

if (isWakePend) {toMutex.UnLock();}
else {CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
toMutex.UnLock();
SendCmd(cmdbuff);
}
Expand Down
3 changes: 2 additions & 1 deletion src/XrdSys/XrdSysIOEvents.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <sys/types.h>

#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysAtomics.hh"

//-----------------------------------------------------------------------------
//! IOEvents
Expand Down Expand Up @@ -506,7 +507,7 @@ PipeData reqBuff; // Buffer used by poller thread to recv data
char *pipeBuff; // Read resumption point in buffer
int pipeBlen; // Number of outstanding bytes
char tmoMask; // Timeout mask
bool wakePend; // Wakeup is effectively pending (don't send)
CPP_ATOMIC_TYPE(bool) wakePend; // Wakeup is effectively pending (don't send)
bool chDead; // True if channel deleted by callback

static time_t maxTime; // Maximum time allowed
Expand Down
5 changes: 3 additions & 2 deletions src/XrdSys/XrdSysIOEventsPollE.icc
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ void XrdSys::IOEvents::PollE::Begin(XrdSysSemaphore *syncsem,
//
do {do {numpolled = epoll_wait(pollDfd, pollTab, pollMax, TmoGet());}
while (numpolled < 0 && errno == EINTR);
wakePend = true; numPoll = numpolled;
if (numpolled == 0) CbkTMO();
CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
numPoll = numpolled;
if (numpolled == 0) CbkTMO();
else if (numpolled < 0)
{int rc = errno;
cerr <<"EPoll: " <<strerror(rc) <<" polling for events" <<endl;
Expand Down