Skip to content

Commit

Permalink
Fix race condition when adding a new channel to a poll set. This only…
Browse files Browse the repository at this point in the history
… affected

MacOS. Revert the timeout mutex to be a simple non-recursive mutex. Produce
an event trace when the envar XrdSysIOE_TRACE is set.
  • Loading branch information
abh3 authored and ljanyst committed Jun 6, 2013
1 parent bcc7d22 commit e51051b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 31 deletions.
134 changes: 104 additions & 30 deletions src/XrdSys/XrdSysIOEvents.cc
Expand Up @@ -66,6 +66,26 @@

#define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)

#define BOOLNAME(x) (x ? "true" : "false")

#define DO_TRACE(x,fd,y) \
{PollerInit::traceMTX.Lock(); \
cerr <<"IOE fd " <<fd <<' ' <<#x <<": " <<y <<endl; \
PollerInit::traceMTX.UnLock();}

#define TRACING PollerInit::doTrace

#define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)

#define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")

#define TRACE_MOD(x,fd,y,z) \
IF_TRACE(x,fd,"Modify(" <<y <<") == " \
<<BOOLNAME(retval) <<" [prior events=" <<z <<']' <<TRACE_LOK)

#define TRACE_NOD(x,fd,y) \
IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")

/******************************************************************************/
/* G l o b a l D a t a */
/******************************************************************************/
Expand Down Expand Up @@ -165,6 +185,9 @@ class PollerInit : public Poller
PollerInit() : Poller(-1, -1) {}
~PollerInit() {}

static XrdSysMutex traceMTX;
static bool doTrace;

protected:

void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
Expand All @@ -178,10 +201,16 @@ bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
}

bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{return Init(cP, eNum, eTxt, isLocked);}
{bool rc = Init(cP, eNum, eTxt, isLocked);
IF_TRACE(Modify,cP->GetFD(), "Init() returned " <<BOOLNAME(rc));
return rc;
}

void Shutdown() {}
};

bool PollerInit::doTrace = (getenv("XrdSysIOE_TRACE") != 0);
XrdSysMutex PollerInit::traceMTX;

/******************************************************************************/
/* P o l l e r W a i t */
Expand Down Expand Up @@ -211,17 +240,14 @@ bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
}

bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{eNum = EDEADLK;
if (eTxt) *eTxt = "modifying channel";
return false;
}
{return Init(cP, eNum, eTxt, isLocked);}

void Shutdown() {}
};

PollerErr1 pollErr1;
PollerInit pollInit;
PollerInit pollWait;
PollerWait pollWait;
};
};

Expand Down Expand Up @@ -301,9 +327,13 @@ bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
//
chMutex.Lock();

// Get correct current events; depending on the state of the channel
//
if (chPoller == &pollWait) curev = static_cast<int>(reMod);
else curev = static_cast<int>(chEvents);

// Calculate new event mask
//
curev = static_cast<int>(chEvents);
events &= allEvents;
newev = curev & ~events;

Expand All @@ -313,6 +343,9 @@ bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
if (newev != curev)
{chEvents = newev;
retval = chPoller->Modify(this, eNum, eText, isLocked);
TRACE_MOD(Disable,chFD,newev,curev);
} else {
TRACE_NOD(Disable,chFD,newev);
}
if (isLocked) chMutex.UnLock();

Expand All @@ -330,18 +363,27 @@ bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
const char **eText)
{
time_t newDL;
int eNum, newev, curev = static_cast<int>(chEvents);
int eNum, newev, curev;
bool retval, isLocked = true, setTO = false;

// Trace this entry
//
IF_TRACE(Enable,chFD,"->Enable(" <<events <<',' <<timeout <<')');

// Lock ourselves against any changes (this is a recursive mutex)
//
chMutex.Lock();

// Get correct current events; depending on the state of the channel
//
if (chPoller == &pollWait) curev = static_cast<int>(reMod);
else curev = static_cast<int>(chEvents);

// Establish events that should be enabled
//
events &= allEvents;
newev = (curev ^ events) & events;
chEvents |= events;
chEvents = curev | events;

// Handle timeout changes now
//
Expand Down Expand Up @@ -369,7 +411,7 @@ bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,

// Check if we have to reset the timeout. We need to hold the channel lock here.
//
if (setTO && (chPoller == &pollInit || chPoller != &pollErr1))
if (setTO && chPoller != &pollErr1)
setTO = chPollXQ->TmoAdd(this);
else setTO = false;

Expand All @@ -379,7 +421,13 @@ bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
// not unlock here is because we must ensure the channel doesn't change while
// we call modify. We let modify determine what to do.
//
retval = (newev ? chPoller->Modify(this, eNum, eText, isLocked) : true);
if (newev)
{retval = chPoller->Modify(this, eNum, eText, isLocked);
TRACE_MOD(Enable,chFD,(curev | events),curev);
} else {
retval = true;
TRACE_NOD(Enable,chFD,(curev | events));
}

// We need to notify the poller thread if the added deadline is the first in the
// queue and the poller is waiting. We also optimize for the case where the
Expand Down Expand Up @@ -560,7 +608,19 @@ bool XrdSys::IOEvents::Poller::CbkXeq(XrdSys::IOEvents::Channel *cP, int events,
XrdSysMutexHelper cbkMHelp(cP->chMutex);
char oldEvents;
int isRead = 0, isWrite = 0;
bool cbok, isLocked = true;
bool cbok, retval, isLocked = true;

// Perform any required tracing
//
if (TRACING)
{const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
(cP->chPoller == &pollInit ? "init" :
(cP->chPoller == &pollWait ? "wait" : "err")));
DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
<<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
<<" callback " <<(cP->chCB ? "present" : "missing")
<<" poller=" <<cbtype);
}

// Remove this from the timeout queue if there and reset the deadlines based
// on the event we are reflecting. This separates read and write deadlines
Expand All @@ -586,7 +646,8 @@ bool XrdSys::IOEvents::Poller::CbkXeq(XrdSys::IOEvents::Channel *cP, int events,
}
oldEvents = cP->chEvents;
cP->chEvents = 0;
Modify(cP, eNum, 0, isLocked);
retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
TRACE_MOD(CbkXeq,cP->chFD,0,oldEvents);
if (!isLocked) cP->chMutex.Lock();
cP->chEvents = oldEvents;
return true;
Expand Down Expand Up @@ -618,7 +679,9 @@ bool XrdSys::IOEvents::Poller::CbkXeq(XrdSys::IOEvents::Channel *cP, int events,
cP->chStat = Channel::isCBMode;
chDead = false;
cbkMHelp.UnLock();
IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
cbok = cP->chCB->Event(cP,cP->chCBA, events);
IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));

// If channel destroyed by the callback, bail really fast. Otherwise, regain
// the channel lock.
Expand Down Expand Up @@ -801,7 +864,21 @@ bool XrdSys::IOEvents::Poller::Init(XrdSys::IOEvents::Channel *cP, int &eNum,
// The channel must be locked upon entry!
//
bool retval;
char oldEv;


// If we are already in progress then simply update the shadow events and
// resuppress all current events.
//
if (cP->chPoller == &pollWait)
{cP->reMod = cP->chEvents;
cP->chEvents = 0;
IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
return true;
}

// Trace this entry
//
IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));

// If no events are enabled at this point, just return
//
Expand All @@ -815,31 +892,28 @@ bool XrdSys::IOEvents::Poller::Init(XrdSys::IOEvents::Channel *cP, int &eNum,
return false;
}

// If we are already in progress then indicate to the other thread that it
// needs to remodify this channel w.r.t. to the poller (this is very rare).
//
if (cP->chPoller == &pollWait)
{cP->reMod = 1;
return true;
}

// So, now we can include the channel in the poll set, enable it, and if
// successful, point it to a functioning poller.
// So, now we can include the channel in the poll set. We will include it
// with no events enabled to prevent callbacks prior to completion here.
//
cP->chPoller = &pollWait; oldEv = cP->chEvents;
cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}

// Determine what future poller to use and whether something happened should we
// have lost control of the channel. If something meaningful did happen then
// we need to redo it at this point as the other thread didn't want to wait.
// Determine what future poller to use. If we can use the regular poller then
// set the correct event mask for the channel. Note that we could have lost
// control but the correct events will be reflected in the "reMod" member.
//
if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
else {cP->chPoller = cP->chPollXQ;
cP->inPSet = 1;
if (cP->reMod && cP->chEvents != oldEv)
{retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
if (cP->reMod)
{cP->chEvents = cP->reMod;
retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
TRACE_MOD(Init,cP->chFD,int(cP->reMod),0);
if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
} else {
TRACE_NOD(Init,cP->chFD,0);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/XrdSys/XrdSysIOEvents.hh
Expand Up @@ -496,7 +496,7 @@ void WakeUp();
static Poller *newPoller(int pFD[2], int &eNum, const char **eTxt);

XrdSysMutex adMutex; // Mutex for adding & detaching channels
XrdSysRecMutex toMutex; // Mutex for handling the timeout list
XrdSysMutex toMutex; // Mutex for handling the timeout list
};
};
};
Expand Down

0 comments on commit e51051b

Please sign in to comment.