Skip to content

Commit

Permalink
Revert "Provide a poller pause/resume method to ease forking without …
Browse files Browse the repository at this point in the history
…spurious errors."

This reverts commit 1f47a78.
  • Loading branch information
ljanyst committed Oct 31, 2013
1 parent 254cc36 commit 5e8457c
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 71 deletions.
27 changes: 1 addition & 26 deletions src/XrdSys/XrdSysIOEvents.cc
Original file line number Diff line number Diff line change
Expand Up @@ -920,29 +920,6 @@ bool XrdSys::IOEvents::Poller::Init(XrdSys::IOEvents::Channel *cP, int &eNum,
return retval;
}

/******************************************************************************/
/* P a u s e */
/******************************************************************************/

void XrdSys::IOEvents::Poller::Pause(bool yes)
{
PipeData cmdbuff;

// Initialize the pipdata structure
//
memset(&cmdbuff, 0, sizeof(cmdbuff));
cmdbuff.req = (yes ? PipeData::Wait : PipeData::Cont);

// Lock all of this
//
adMutex.Lock();

// If we are still active then issue the pause/resume (SendCmd unlocks us)
//
if (cmdFD != -1) SendCmd(cmdbuff, true);
else adMutex.UnLock();
}

/******************************************************************************/
/* P o l l 2 E n u m */
/******************************************************************************/
Expand All @@ -962,7 +939,7 @@ int XrdSys::IOEvents::Poller::Poll2Enum(short events)
/* S e n d C m d */
/******************************************************************************/

int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd, bool unlock)
int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd)
{
int wlen;

Expand All @@ -975,12 +952,10 @@ int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd, bool unlock)
cmd.theSem = &mySem;
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
if (unlock) adMutex.UnLock();
if (wlen > 0) mySem.Wait();
} else {
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
if (unlock) adMutex.UnLock();
}

// All done
Expand Down
31 changes: 9 additions & 22 deletions src/XrdSys/XrdSysIOEvents.hh
Original file line number Diff line number Diff line change
Expand Up @@ -378,25 +378,12 @@ public:
static Poller *Create(int &eNum, const char **eTxt=0);

//-----------------------------------------------------------------------------
//! Pause or resume a poller object. This method is usefule to prevent file
//! descriptor activity when doing a fork(). Beware, Pause() blocks until the
//! poller thread is paused. However, no internal locks are held.
//!
//! @param true Pause the poller object. Active and pending callbacks are
//! completed. After which the poller event thread pauses
//! until Pause(false) is called.
//! @param false Resume a paused poller object. This is a noop if the poller
//! is not paused.
//-----------------------------------------------------------------------------

void Pause(bool resume=false);

//-----------------------------------------------------------------------------
//! Stop a poller object. Active and pending callbacks are completed. After
//! which the poller event thread exits. Subsequently, each associated channel
//! is disabled and removed from the poller object. If the channel is enabled
//! for a StopEvent, the stop callback is invoked. However, any attempt to use
//! the channel methods that require an active poller will return an error.
//! Stop a poller object. Active callbacks are completed. Pending callbacks are
//! discarded. After which the poller event thread exits. Subsequently, each
//! associated channel is disabled and removed from the poller object. If the
//! channel is enabled for a StopEvent, the stop callback is invoked. However,
//! any attempt to use the channel methods that require an active poller will
//! return an error.
//!
//! Since a stopped poller cannot be restarted; the only thing left is to delete
//! it. This also applies to all the associated channels since they no longer
Expand Down Expand Up @@ -431,7 +418,7 @@ inline int GetPollEnt(Channel *cP) {return cP->pollEnt;}
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd);
inline void LockChannel(Channel *cP) {cP->chMutex.Lock();}
int Poll2Enum(short events);
int SendCmd(PipeData &cmd, bool unlock=false);
int SendCmd(PipeData &cmd);
void SetPollEnt(Channel *cP, int ptEnt);
bool TmoAdd(Channel *cP);
void TmoDel(Channel *cP);
Expand Down Expand Up @@ -486,8 +473,8 @@ int cmdFD; // FD to send PipeData commands
int reqFD; // FD to recv PipeData requests
struct PipeData {char req; char evt; short ent; int fd;
XrdSysSemaphore *theSem;
enum cmd {NoOp = 0, MdFD = 1, Cont = 2, Post = 3,
MiFD = 4, RmFD = 5, Wait = 6, Stop = 7};
enum cmd {NoOp = 0, MdFD = 1, Post = 2,
MiFD = 3, RmFD = 4, Stop = 5};
};
PipeData reqBuff; // Buffer used by poller thread to recv data
char *pipeBuff; // Read resumption point in buffer
Expand Down
34 changes: 11 additions & 23 deletions src/XrdSys/XrdSysIOEventsPollE.icc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ void XrdSys::IOEvents::PollE::Begin(XrdSysSemaphore *syncsem,
{
int i, numpolled;
Channel *cP;
bool doProcess;

// Indicate to the starting thread that all went well
//
Expand All @@ -209,14 +208,11 @@ void XrdSys::IOEvents::PollE::Begin(XrdSysSemaphore *syncsem,
cerr <<"EPoll: " <<strerror(rc) <<" polling for events" <<endl;
abort();
}
else {doProcess = false;
for (i = 0; i < numpolled; i++)
{if ((cP = (Channel *)pollTab[i].data.ptr))
Dispatch(cP, pollTab[i].events);
else doProcess = true;
}
if (doProcess && !Process()) return;
}
else for (i = 0; i < numpolled; i++)
{if ((cP = (Channel *)pollTab[i].data.ptr))
Dispatch(cP, pollTab[i].events);
else if (!Process()) return;
}

if (pollMax < pollNum) AllocPT(pollNum);

Expand Down Expand Up @@ -354,22 +350,14 @@ bool XrdSys::IOEvents::PollE::Modify(XrdSys::IOEvents::Channel *cP,

bool XrdSys::IOEvents::PollE::Process()
{
bool paused = false;

// Get the pipe request and check out actions of interest.
//

do{if (GetRequest())
{ if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post();
else if (reqBuff.req == PipeData::Wait){reqBuff.theSem->Post();
paused = true;
}
else if (reqBuff.req == PipeData::Cont) break;
else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
return false;
}
}
} while(paused);
if (GetRequest())
{ if (reqBuff.req == PipeData::Post) reqBuff.theSem->Post();
else if (reqBuff.req == PipeData::Stop){reqBuff.theSem->Post();
return false;
}
}

// Return true
//
Expand Down

0 comments on commit 5e8457c

Please sign in to comment.