Skip to content

Commit

Permalink
Implement non-blocking send queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Jan 20, 2017
1 parent c0afc7b commit 59024b7
Show file tree
Hide file tree
Showing 10 changed files with 650 additions and 10 deletions.
72 changes: 69 additions & 3 deletions src/Xrd/XrdLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "Xrd/XrdInet.hh"
#include "Xrd/XrdPoll.hh"
#include "Xrd/XrdScheduler.hh"
#include "Xrd/XrdSendQ.hh"

#define TRACELINK this
#define XRD_TRACE XrdTrace->
Expand Down Expand Up @@ -163,7 +164,7 @@ void XrdLink::Reset()
Lname[1] = '\0';
ID = &Uname[sizeof(Uname)-2];
Comment = ID;
Next = 0;
sendQ = 0;
Protocol = 0;
ProtoAlt = 0;
conTime = time(0);
Expand Down Expand Up @@ -267,6 +268,19 @@ XrdLink *XrdLink::Alloc(XrdNetAddr &peer, int opts)
return lp;
}

/******************************************************************************/
/* B a c k l o g */
/******************************************************************************/

int XrdLink::Backlog()
{
XrdSysMutexHelper(wrMutex);

// Return backlog information
//
return (sendQ ? sendQ->Backlog() : 0);
}

/******************************************************************************/
/* C l i e n t */
/******************************************************************************/
Expand Down Expand Up @@ -294,6 +308,12 @@ int XrdLink::Client(char *nbuf, int nbsz)
int XrdLink::Close(int defer)
{ int csec, fd, rc = 0;

// We need to disband any non-blocking appendage we may have now
//
wrMutex.Lock();
if (sendQ) {sendQ->Terminate(); sendQ = 0;}
wrMutex.UnLock();

// If a defer close is requested, we can close the descriptor but we must
// keep the slot number to prevent a new client getting the same fd number.
// Linux is peculiar in that any in-progress operations will remain in that
Expand Down Expand Up @@ -673,6 +693,15 @@ int XrdLink::Send(const char *Buff, int Blen)
//
wrMutex.Lock();
isIdle = 0;
AtomicAdd(BytesOut, myBytes);

// Do non-blocking writes if we are setup to do so.
//
if (sendQ)
{retc = sendQ->Send(Buff, Blen);
wrMutex.UnLock();
return retc;
}

// Write the data out
//
Expand All @@ -686,7 +715,6 @@ int XrdLink::Send(const char *Buff, int Blen)

// All done
//
AtomicAdd(BytesOut, myBytes);
wrMutex.UnLock();
if (retc >= 0) return Blen;
XrdLog->Emsg("Link", errno, "send to", ID);
Expand All @@ -704,21 +732,29 @@ int XrdLink::Send(const struct iovec *iov, int iocnt, int bytes)
// Add up bytes if they were not given to us
//
if (!bytes) for (i = 0; i < iocnt; i++) bytes += iov[i].iov_len;
bytesleft = static_cast<ssize_t>(bytes);

// Get a lock and assume we will be successful (statistically we are)
//
wrMutex.Lock();
isIdle = 0;
AtomicAdd(BytesOut, bytes);

// Do non-blocking writes if we are setup to do so.
//
if (sendQ)
{retc = sendQ->Send(iov, iocnt, bytes);
wrMutex.UnLock();
return retc;
}

// Write the data out. On some version of Unix (e.g., Linux) a writev() may
// end at any time without writing all the bytes when directed to a socket.
// So, we attempt to resume the writev() using a combination of write() and
// a writev() continuation. This approach slowly converts a writev() to a
// series of writes if need be. We must do this inline because we must hold
// the lock until all the bytes are written or an error occurs.
//
bytesleft = static_cast<ssize_t>(bytes);
while(bytesleft)
{do {retc = writev(FD, iov, iocnt);} while(retc < 0 && errno == EINTR);
if (retc >= bytesleft || retc < 0) break;
Expand Down Expand Up @@ -931,6 +967,30 @@ void XrdLink::setID(const char *userid, int procid)
Comment = (const char *)ID;
}

/******************************************************************************/
/* s e t N B */
/******************************************************************************/

bool XrdLink::setNB()
{
// We don't support non-blocking I/O except for Linux at the moment
//
#if !defined(__linux__)
return false;
#else
// Trace this request
//
TRACEI(DEBUG,"enabling non-blocking output");

// If we don't already have a sendQ object get one
//
wrMutex.Lock();
if (!sendQ) sendQ = new XrdSendQ(*this, wrMutex);
wrMutex.UnLock();
return true;
#endif
}

/******************************************************************************/
/* S e t u p */
/******************************************************************************/
Expand Down Expand Up @@ -968,6 +1028,12 @@ int XrdLink::Setup(int maxfds, int idlewait)
XrdSched->Schedule((XrdJob *)ls, ichk+time(0));
}

// Initialize the send queue
//
XrdSendQ::Init(XrdLog, XrdSched);

// All done
//
return 1;
}

Expand Down
7 changes: 6 additions & 1 deletion src/Xrd/XrdLink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class XrdNetAddr;
class XrdPoll;
class XrdOucTrace;
class XrdScheduler;
class XrdSendQ;
class XrdSysError;

class XrdLink : XrdJob
Expand Down Expand Up @@ -91,6 +92,8 @@ XrdNetAddrInfo *AddrInfo() {return (XrdNetAddrInfo *)&Addr;}

static XrdLink *Alloc(XrdNetAddr &peer, int opts=0);

int Backlog();

void Bind() {} // Obsolete
void Bind(pthread_t tid) { (void)tid; } // Obsolete

Expand Down Expand Up @@ -223,6 +226,8 @@ static void setKWT(int wkSec, int kwSec);

void setLocation(XrdNetAddrInfo::LocInfo &loc) {Addr.SetLocation(loc);}

bool setNB();

XrdProtocol *setProtocol(XrdProtocol *pp);

void setRef(int cnt); // ASYNC Mode
Expand Down Expand Up @@ -301,7 +306,7 @@ XrdSysMutex rdMutex;
XrdSysMutex wrMutex;
XrdSysSemaphore IOSemaphore;
XrdSysCondVar *KillcvP; // Protected by opMutex!
XrdLink *Next;
XrdSendQ *sendQ;
XrdProtocol *Protocol;
XrdProtocol *ProtoAlt;
XrdPoll *Poller;
Expand Down
Loading

0 comments on commit 59024b7

Please sign in to comment.