Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Xrd] Wait for the current PollE poll to finish after removing a Link #1941

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Xrd/XrdLinkCtl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ XrdLink *XrdLinkCtl::Alloc(XrdNetAddr &peer, int opts)
LTMutex.Lock();
if (LinkBat[peerFD])
{LTMutex.UnLock();
Log.Emsg("Link", "attempt to reuse active link");
snprintf(hName, sizeof(hName), "%d", peerFD);
Log.Emsg("Link", "attempt to reuse active link FD -",hName);
return (XrdLink *)0;
}

Expand Down
13 changes: 11 additions & 2 deletions src/Xrd/XrdPollE.hh
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public:

void Start(XrdSysSemaphore *syncp, int &rc);

XrdPollE(struct epoll_event *ptab, int numfd, int pfd)
{PollTab = ptab; PollMax = numfd; PollDfd = pfd;}
XrdPollE(struct epoll_event *ptab, int numfd, int pfd, int wfd)
: WaitFdSem(0)
{PollTab = ptab; PollMax = numfd; PollDfd = pfd;
WaitFd = wfd;
}

~XrdPollE();

protected:
Expand All @@ -57,7 +61,10 @@ protected:
const char *x2Text(unsigned int evf, char *buff);

private:
int AddWaitFd();
void HandleWaitFd(const unsigned int events);
void remFD(XrdPollInfo &pInfo, unsigned int events);
void Wait4Poller();

#ifdef EPOLLONESHOT
static const int ePollOneShot = EPOLLONESHOT;
Expand All @@ -70,5 +77,7 @@ void remFD(XrdPollInfo &pInfo, unsigned int events);
struct epoll_event *PollTab;
int PollDfd;
int PollMax;
int WaitFd;
XrdSysSemaphore WaitFdSem;
};
#endif
106 changes: 102 additions & 4 deletions src/Xrd/XrdPollE.icc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>

#include "Xrd/XrdPollE.hh"
#include "Xrd/XrdScheduler.hh"
Expand All @@ -43,7 +44,7 @@

XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
{
int pfd, bytes, alignment, pagsz = getpagesize();
int pfd, wfd, bytes, alignment, pagsz = getpagesize();
struct epoll_event *pp;

// Open the /dev/poll driver
Expand All @@ -56,20 +57,28 @@ XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
#endif
{Log.Emsg("Poll", errno, "create epoll device"); return 0;}

if ((wfd = eventfd(0, EFD_CLOEXEC)) < 0)
{Log.Emsg("Poll", errno,
"create an eventfd as the wait-poller descriptor");
close(pfd);
return 0;
}

// Calculate the size of the poll table and allocate it
//
bytes = maxfd * sizeof(struct epoll_event);
alignment = (bytes < pagsz ? 1024 : pagsz);
if (posix_memalign((void **)&pp, alignment, bytes))
{Log.Emsg("Poll", ENOMEM, "create poll table");
close(wfd);
close(pfd);
return 0;
}

// Create new poll object
//
memset((void *)pp, 0, bytes);
return (XrdPoll *)new XrdPollE(pp, maxfd, pfd);
return (XrdPoll *)new XrdPollE(pp, maxfd, pfd, wfd);
}

/******************************************************************************/
Expand All @@ -79,9 +88,30 @@ XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
XrdPollE::~XrdPollE()
{
if (PollTab) free(PollTab);
if (WaitFd >= 0) close(WaitFd);
if (PollDfd >= 0) close(PollDfd);
}

/******************************************************************************/
/* A d d W a i t F d */
/******************************************************************************/

int XrdPollE::AddWaitFd()
{
const unsigned int myPollEvts = EPOLLIN;
struct epoll_event myEvent = {myPollEvts, {(void *)&WaitFd}};

// Add the waitfd to the poll set
//
if (epoll_ctl(PollDfd, EPOLL_CTL_ADD, WaitFd, &myEvent) < 0)
{int rc = errno;
Log.Emsg("Poll", rc, "include the wait FD in the poll set");
return rc;
}

return 0;
}

/******************************************************************************/
/* D i s a b l e */
/******************************************************************************/
Expand Down Expand Up @@ -164,6 +194,44 @@ void XrdPollE::Exclude(XrdPollInfo &pInfo)
// Forcibly remove the link from the poll set
//
remFD(pInfo, 0);

// Wait to make sure the poll thread has completed handling the last set of
// events which may have included this Link. The handing includes examining the
// Link's PollInfo before deciding if to schedule the Link. After we return,
// the PollInfo may be reset and the Link could be subsequently reused.
//
Wait4Poller();
}

/******************************************************************************/
/* H a n d l e W a i t F d */
/******************************************************************************/

void XrdPollE::HandleWaitFd(const unsigned int events)
{
// Used by the polling thread to signal waiters that a polling loop has been
// completed.
//
eventfd_t ic, cnt;

// We don't expect anything but EPOLLIN. But if we get an error (errors
// can be reported, despite not being selected events) abort rather than
// potentially keeping the poll thread busy repeatedly looping.
//
if (!(events & EPOLLIN) || (events & (EPOLLERR | EPOLLHUP)))
{char eBuff[64];
Log.Emsg("Poll", "wait-poller handler:", x2Text(events, eBuff));
if (events & (EPOLLERR | EPOLLHUP))
abort();
return;
}

if (eventfd_read(WaitFd, &cnt) < 0)
{Log.Emsg("Poll", errno, "read from the wait-poller descriptor");
return;
}

for (ic=0;ic<cnt;++ic) WaitFdSem.Post();
}

/******************************************************************************/
Expand Down Expand Up @@ -222,12 +290,20 @@ void XrdPollE::remFD(XrdPollInfo &pInfo, unsigned int events)
void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
{
char eBuff[64];
int i, numpolled, num2sched;
int rc, i, numpolled, num2sched;
unsigned int waitFdEvents;
bool haveWaiters;
XrdJob *jfirst, *jlast;
const short pollOK = EPOLLIN | EPOLLPRI;
XrdLink *lp;
XrdPollInfo *pInfo;

if ((rc = AddWaitFd()))
{retcode = rc;
syncsem->Post();
return;
}

// Indicate to the starting thread that all went well
//
retcode = 0;
Expand All @@ -247,8 +323,11 @@ void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
// Checkout which links must be dispatched (no need to lock)
//
jfirst = jlast = 0; num2sched = 0;
haveWaiters = false; waitFdEvents = 0;
for (i = 0; i < numpolled; i++)
{if ((pInfo = (XrdPollInfo *)PollTab[i].data.ptr))
{if (PollTab[i].data.ptr == &WaitFd)
{haveWaiters = true; waitFdEvents = PollTab[i].events;}
else if ((pInfo = (XrdPollInfo *)PollTab[i].data.ptr))
{if (!(pInfo->isEnabled) && pInfo->FD >= 0)
remFD(*pInfo, PollTab[i].events);
else {pInfo->isEnabled = 0;
Expand All @@ -272,9 +351,28 @@ void XrdPollE::Start(XrdSysSemaphore *syncsem, int &retcode)
//
if (num2sched == 1) Sched.Schedule(jfirst);
else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);

if (haveWaiters) HandleWaitFd(waitFdEvents);
} while(1);
}

/******************************************************************************/
/* W a i t 4 P o l l e r */
/******************************************************************************/

void XrdPollE::Wait4Poller()
{
// Makes the caller wait for the polling thread to complete an event processing
// loop.
//
if (eventfd_write(WaitFd, 1) < 0)
{Log.Emsg("Poll", errno, "write to the wait-poller descriptor");
return;
}

WaitFdSem.Wait();
}

/******************************************************************************/
/* x 2 T e x t */
/******************************************************************************/
Expand Down