Skip to content

Commit

Permalink
[Server] Phase 2 of XrdLink implementation cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed May 12, 2020
1 parent e76f280 commit 022aadb
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 44 deletions.
23 changes: 20 additions & 3 deletions src/Xrd/XrdLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ XrdLink::XrdLink(XrdLinkXeq &lxq) : XrdJob("connection"), linkXQ(lxq)
void XrdLink::ResetLink()
{
if (HostName) {free(HostName); HostName = 0;}
FD = -1;
Instance = 0;
isBridged= false;
isTLS = false;
Expand Down Expand Up @@ -170,6 +169,17 @@ void XrdLink::Enable()
if (linkXQ.PollInfo.Poller) linkXQ.PollInfo.Poller->Enable(this);
}

/******************************************************************************/
/* F D n u m */
/******************************************************************************/

int XrdLink::FDnum()
{
int fd = linkXQ.PollInfo.FD;

return (fd < 0 ? -fd : fd);
}

/******************************************************************************/
/* F i n d */
/******************************************************************************/
Expand Down Expand Up @@ -245,6 +255,13 @@ void XrdLink::Hold(bool lk)

bool XrdLink::isFlawed() const {return linkXQ.LinkInfo.Etext != 0;}

/******************************************************************************/
/* i s I n s t a n c e */
/******************************************************************************/

bool XrdLink::isInstance(unsigned int inst) const
{return Instance == inst && linkXQ.PollInfo.FD >= 0;}

/******************************************************************************/
/* N a m e */
/******************************************************************************/
Expand Down Expand Up @@ -521,7 +538,7 @@ int XrdLink::Terminate(const char *owner, int fdnum, unsigned int inst)
// If this link is now dead, simply ignore the request. Typically, this
// indicates a race condition that the server won.
//
if ( FD != fdnum || Instance != inst
if ( linkXQ.PollInfo.FD != fdnum || Instance != inst
|| !linkXQ.PollInfo.Poller || !linkXQ.getProtocol()) return -EPIPE;

// Check if we have too many tries here
Expand Down Expand Up @@ -606,7 +623,7 @@ const char *XrdLink::verTLS()

int XrdLink::Wait4Data(int timeout)
{
struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
struct pollfd polltab = {linkXQ.PollInfo.FD, POLLIN|POLLRDNORM, 0};
int retc;

// Issue poll and do preliminary check
Expand Down
16 changes: 7 additions & 9 deletions src/Xrd/XrdLink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void Enable();
//! @return The file descriptor number.
//-----------------------------------------------------------------------------

inline int FDnum() {int fd = FD; return (fd < 0 ? -fd : fd);}
int FDnum();

//-----------------------------------------------------------------------------
//! Translate a file descriptor number to the corresponding link object.
Expand Down Expand Up @@ -235,7 +235,7 @@ char *ID; // This is referenced a lot (should have been const).
//!
//! @return The link's instance number.
//-----------------------------------------------------------------------------
inline

unsigned int Inst() const {return Instance;}

//-----------------------------------------------------------------------------
Expand All @@ -256,9 +256,8 @@ bool isFlawed() const;
//! @return true the link matches the instance number.
//! false the link differs the instance number.
//-----------------------------------------------------------------------------
inline
bool isInstance(unsigned int inst) const
{return FD >= 0 && Instance == inst;}

bool isInstance(unsigned int inst) const;

//-----------------------------------------------------------------------------
//! Obtain the domain trimmed name of the end-point. The returned value should
Expand Down Expand Up @@ -554,7 +553,7 @@ void armBridge();
//! @return false this link is a plain old link.
//-----------------------------------------------------------------------------

inline bool hasBridge() const {return isBridged;}
bool hasBridge() const {return isBridged;}

//-----------------------------------------------------------------------------
//! Determine if this link is using TLS.
Expand All @@ -566,7 +565,7 @@ inline bool hasBridge() const {return isBridged;}
//! @return false this link not using TLS.
//-----------------------------------------------------------------------------

inline bool hasTLS() const {return isTLS;}
bool hasTLS() const {return isTLS;}

//-----------------------------------------------------------------------------
//! Return TLS protocol version being used.
Expand Down Expand Up @@ -595,10 +594,9 @@ int Wait4Data(int timeout);
void *rsvd1[3]; // Reserved for future use
XrdLinkXeq &linkXQ; // The implementation
char *HostName; // Pointer to the hostname
int FD; // File descriptor number (may be negative)
unsigned int Instance; // Instance number of this object
bool isBridged; // If true, this link is an in-memory bridge
bool isTLS; // If true, this link uses TLS for all I/O
char rsvd2[6];
char rsvd2[2];
};
#endif
2 changes: 1 addition & 1 deletion src/Xrd/XrdLinkCtl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ XrdLink *XrdLinkCtl::Alloc(XrdNetAddr &peer, int opts)
unp = lp->Uname + sizeof(Uname) - bl - 1; // Solaris compatability
memcpy(unp, buff, bl);
lp->ID = unp;
lp->FD = lp->PollInfo.pollFD = peerFD;
lp->PollInfo.FD = peerFD;
lp->Comment = (const char *)unp;

// Set options as needed
Expand Down
63 changes: 34 additions & 29 deletions src/Xrd/XrdLinkXeq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ int XrdLinkXeq::Close(bool defer)
// closing it. On most platforms, this informs readers that the connection is
// gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
// nonblocking mode is enabled, we need to do this in a separate thread as
// a shutdown may block for a pretty long time is lot of messages are queued.
// a shutdown may block for a pretty long time if lots\ of messages are queued.
// We will ask the SendQ object to schedule the shutdown for us before it
// commits suicide.
// Note that we can hold the opMutex while we also get the wrMutex.
Expand All @@ -201,7 +201,7 @@ int XrdLinkXeq::Close(bool defer)
{if (!sendQ) Shutdown(false);
else {TRACEI(DEBUG, "Shutdown FD only via SendQ");
LinkInfo.InUse++;
FD = -FD;
PollInfo.FD = -PollInfo.FD;
wrMutex.Lock();
sendQ->Terminate(this);
sendQ = 0;
Expand Down Expand Up @@ -260,10 +260,10 @@ int XrdLinkXeq::Close(bool defer)
// table needs to be cleaned up prior to actually closing the socket. So, we
// do some fancy footwork to prevent multiple closes of this link.
//
fd = (FD < 0 ? -FD : FD);
if (FD != -1)
fd = (PollInfo.FD < 0 ? -PollInfo.FD : PollInfo.FD);
if (PollInfo.FD > 0)
{if (PollInfo.Poller) {XrdPoll::Detach(this); PollInfo.Poller = 0;}
FD = -1;
PollInfo.FD = -1;
opHelper.UnLock();
XrdLinkCtl::Unhook(fd);
} else opHelper.UnLock();
Expand Down Expand Up @@ -327,7 +327,7 @@ XrdTlsPeerCerts *XrdLinkXeq::getPeerCerts()
int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
{
XrdSysMutexHelper theMutex;
struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
ssize_t mlen;
int retc;

Expand All @@ -353,7 +353,7 @@ int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)

// Do the peek.
//
do {mlen = recv(FD, Buff, Blen, MSG_PEEK);}
do {mlen = recv(PollInfo.FD, Buff, Blen, MSG_PEEK);}
while(mlen < 0 && errno == EINTR);

// Return the result
Expand All @@ -376,12 +376,12 @@ int XrdLinkXeq::Recv(char *Buff, int Blen)
//
if (LockReads) rdMutex.Lock();
isIdle = 0;
do {rlen = read(FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
do {rlen = read(PollInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
if (rlen > 0) AtomicAdd(BytesIn, rlen);
if (LockReads) rdMutex.UnLock();

if (rlen >= 0) return int(rlen);
if (FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
if (PollInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
return -1;
}

Expand All @@ -390,7 +390,7 @@ int XrdLinkXeq::Recv(char *Buff, int Blen)
int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
{
XrdSysMutexHelper theMutex;
struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
ssize_t rlen, totlen = 0;
int retc;

Expand All @@ -412,7 +412,7 @@ int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
}
return int(totlen);
}
return (FD >= 0 ? Log.Emsg("Link", -errno, "poll", ID) : -1);
return (PollInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
}

// Verify it is safe to read now
Expand All @@ -426,10 +426,12 @@ int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
// Read as much data as you can. Note that we will force an error
// if we get a zero-length read after poll said it was OK.
//
do {rlen = recv(FD, Buff, Blen, 0);} while(rlen < 0 && errno == EINTR);
do {rlen = recv(PollInfo.FD, Buff, Blen, 0);}
while(rlen < 0 && errno == EINTR);
if (rlen <= 0)
{if (!rlen) return -ENOMSG;
return (FD<0 ? -1 : Log.Emsg("Link",-errno,"receive from",ID));
if (PollInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
return -1;
}
totlen += rlen; Blen -= rlen; Buff += rlen;
}
Expand All @@ -445,7 +447,7 @@ int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)

int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
{
struct pollfd polltab = {FD, POLLIN|POLLRDNORM, 0};
struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
ssize_t rlen;
int retc;

Expand All @@ -469,14 +471,15 @@ int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
//
if (LockReads) rdMutex.Lock();
isIdle = 0;
do {rlen = recv(FD,Buff,Blen,MSG_WAITALL);} while(rlen < 0 && errno == EINTR);
do {rlen = recv(PollInfo.FD, Buff, Blen, MSG_WAITALL);}
while(rlen < 0 && errno == EINTR);
if (rlen > 0) AtomicAdd(BytesIn, rlen);
if (LockReads) rdMutex.UnLock();

if (int(rlen) == Blen) return Blen;
if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
else if (rlen > 0) Log.Emsg("RecvAll","Premature end from", ID);
else if (FD >= 0) Log.Emsg("Link",errno,"recieve from",ID);
if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
else if (PollInfo.FD >= 0) Log.Emsg("Link", errno, "recieve from", ID);
return -1;
}

Expand Down Expand Up @@ -524,7 +527,7 @@ int XrdLinkXeq::Send(const char *Buff, int Blen)
// Write the data out
//
while(bytesleft)
{if ((retc = write(FD, Buff, bytesleft)) < 0)
{if ((retc = write(PollInfo.FD, Buff, bytesleft)) < 0)
{if (errno == EINTR) continue;
else break;
}
Expand Down Expand Up @@ -625,7 +628,7 @@ int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
//
wrMutex.Lock();
isIdle = 0;
do{retc = sendfilev(FD, vecSFP, sfN, &xframt);
do{retc = sendfilev(PollInfo.FD, vecSFP, sfN, &xframt);

// Check if all went well and return if so (usual case)
//
Expand Down Expand Up @@ -673,7 +676,7 @@ do{retc = sendfilev(FD, vecSFP, sfN, &xframt);
// In linux we need to cork the socket. On permanent errors we do not uncork
// the socket because it will be closed in short order.
//
if (setsockopt(FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
{Log.Emsg("Link", errno, "cork socket for", ID);
uncork = 0; sfOK = 0;
}
Expand All @@ -684,7 +687,7 @@ do{retc = sendfilev(FD, vecSFP, sfN, &xframt);
{if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
while(bytesleft
&& (retc=sendfile(FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
&& (retc=sendfile(PollInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
{bytesleft -= retc; xIntr++;}
}
if (retc < 0 && errno == EINTR) continue;
Expand All @@ -703,7 +706,8 @@ do{retc = sendfilev(FD, vecSFP, sfN, &xframt);

// Now uncork the socket
//
if (uncork && setsockopt(FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
if (uncork
&& setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
Log.Emsg("Link", errno, "uncork socket for", ID);

// All done
Expand All @@ -727,7 +731,7 @@ int XrdLinkXeq::sendData(const char *Buff, int Blen)
// Write the data out
//
while(bytesleft)
{if ((retc = write(FD, Buff, bytesleft)) < 0)
{if ((retc = write(PollInfo.FD, Buff, bytesleft)) < 0)
{if (errno == EINTR) continue;
else break;
}
Expand Down Expand Up @@ -757,13 +761,14 @@ int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
//
bytesleft = static_cast<ssize_t>(bytes);
while(bytesleft)
{do {retc = writev(FD, iov, iocnt);} while(retc < 0 && errno == EINTR);
{do {retc = writev(PollInfo.FD, iov, iocnt);}
while(retc < 0 && errno == EINTR);
if (retc >= bytesleft || retc < 0) break;
bytesleft -= retc;
while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
{retc -= n; iov++; iocnt--;}
Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
while(n) {if ((retc = write(FD, Buff, n)) < 0)
while(n) {if ((retc = write(PollInfo.FD, Buff, n)) < 0)
{if (errno == EINTR) continue;
else break;
}
Expand All @@ -788,7 +793,7 @@ void XrdLinkXeq::setID(const char *userid, int procid)
char buff[sizeof(Uname)], *bp, *sp;
int ulen;

snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, FD);
snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
ulen = strlen(buff);
sp = buff + ulen - 1;
bp = &Uname[sizeof(Uname)-1];
Expand Down Expand Up @@ -891,7 +896,7 @@ bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
// We want to initialize TLS, do so now.
//
if (!ctx) ctx = tlsCtx;
eNote = tlsIO.Init(*ctx, FD, rwMode, hsMode, false, ID);
eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, ID);

// Check for errors
//
Expand Down Expand Up @@ -947,7 +952,7 @@ void XrdLinkXeq::Shutdown(bool getLock)
//
temp = Instance; Instance = 0;
if (!KeepFD)
{theFD = (FD < 0 ? -FD : FD);
{theFD = (PollInfo.FD < 0 ? -PollInfo.FD : PollInfo.FD);
shutdown(theFD, SHUT_RDWR);
if (dup2(devNull, theFD) < 0)
{Instance = temp;
Expand Down
4 changes: 2 additions & 2 deletions src/Xrd/XrdPollInfo.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ XrdPollInfo *Next; // Chain of links waiting for a PollPoll event
XrdLink *Link; // Link associated with this object
struct pollfd *PollEnt; // Used only by PollPoll
XrdPoll *Poller; // -> Poller object associated with this object
int pollFD; // Associated file descriptor number
int FD; // Associated target file descriptor number
bool inQ; // True -> in a PollPoll event queue
bool isEnabled; // True -> interrupts are enabled
char rsv[2]; // Reserved for future flags

void Zorch() {Next = 0; PollEnt = 0;
Poller = 0; pollFD = -1;
Poller = 0; FD = -1;
isEnabled = false; inQ = false;
rsv[0] = 0; rsv[1] = 0;
}
Expand Down

0 comments on commit 022aadb

Please sign in to comment.