Skip to content

Commit

Permalink
[Server] Fix bridge waitResp issues to allow merge of pull #902 (to b…
Browse files Browse the repository at this point in the history
…e tested).
  • Loading branch information
abh3 authored and simonmichal committed Mar 18, 2019
1 parent 524c81a commit 08b4312
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 100 deletions.
160 changes: 74 additions & 86 deletions src/XrdXrootd/XrdXrootdTransit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ int XrdXrootdTransit::Attn(XrdLink *lP, short *theSID, int rcode,
int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
const struct iovec *ioV, int ioN, int ioL)
{
XrdLink *theLink = tP->link;
int rc;

// Refresh the request structure
Expand All @@ -137,18 +136,16 @@ int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
//
rc = Send(rcode, ioV, ioN, ioL);

// If no wait needed, enable the link. Otherwise, handle the wait (rare)
// Handle end based on current state
//
if (rc >= 0)
{if (runDone && !runWait)
if (rc >= 0 && !runWait)
{if (runDone)
{AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
theLink->Enable();
} else {
if (runWait >= 0)
Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
}
if (reInvoke) Sched->Schedule((XrdJob *)&respJob);
else Link->Enable();
}

// All done
Expand Down Expand Up @@ -322,42 +319,41 @@ void XrdXrootdTransit::Init(XrdXrootd::Bridge::Result *respP, // Private
}

/******************************************************************************/
/* P r o c e s s */
/* P r o c e e d */
/******************************************************************************/

int XrdXrootdTransit::Process(XrdLink *lp)
void XrdXrootdTransit::Proceed()
{
int rc, bridgeActive;
int rc;

// This entry is serialized via link processing. First, get the run status.
// If we were interrupted in a reinvoke state, resume that state.
//
AtomicBeg(runMutex);
bridgeActive = AtomicGet(runStatus);
AtomicEnd(runMutex);
if (reInvoke) rc = Process(Link);
else rc = 0;

// If we are running then we need to reflect this to the xrootd protocol as
// data is now available. One of the following will be returned.
// Handle ending status
//
if (rc >= 0) Link->Enable();
else if (rc != -EINPROGRESS) Link->Close();
}

/******************************************************************************/
/* P r o c e s s */
/******************************************************************************/

int XrdXrootdTransit::Process(XrdLink *lp)
{
int rc;

// This entry is serialized via link processing and data is now available.
// One of the following will be returned.
//
// < 0 -> Stop getting requests,
// -EINPROGRESS leave link disabled but otherwise all is well
// -n Error, disable and close the link
// = 0 -> OK, get next request, if allowed, o/w enable the link
// > 0 -> Slow link, stop getting requests and enable the link
//
if (bridgeActive)
{rc = XrdXrootdProtocol::Process(lp);
if (rc < 0) return rc;
if (runWait)
{if (runWait >= 0)
Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
return -EINPROGRESS;
}
if (!runDone) return rc;
AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
if (!reInvoke) return 1;
}

// Reflect data is present to the underlying protocol and if Run() has been
// called we need to dispatch that request. This may be iterative.
Expand All @@ -369,11 +365,7 @@ do{rc = realProt->Process((reInvoke ? 0 : lp));
else {runDone = false;
rc = (Resume ? XrdXrootdProtocol::Process(lp) : Process2());
if (rc >= 0)
{if (runWait)
{if (runWait >= 0)
Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
return -EINPROGRESS;
}
{if (runWait) rc = -EINPROGRESS;
if (!runDone) return rc;
AtomicBeg(runMutex);
AtomicZAP(runStatus);
Expand All @@ -396,9 +388,51 @@ do{rc = realProt->Process((reInvoke ? 0 : lp));
return (rc ? rc : 1);
}

/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/

int XrdXrootdTransit::Process()
void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
{

// Set ourselves as active so we can't get more requests
//
AtomicBeg(runMutex);
AtomicInc(runStatus);
AtomicEnd(runMutex);

// If we were active then we will need to quiesce before dismantling ourselves.
// Note that Recycle() can only be called if the link is enabled. So, this bit
// of code is improbable but we check it anyway.
//
if (runWait > 0) Sched->Cancel(&waitJob);

// First we need to recycle the real protocol
//
if (realProt) realProt->Recycle(lp, consec, reason);

// Now we need to recycle our xrootd part
//
XrdXrootdProtocol::Recycle(lp, consec, reason);

// Release the argument buffer
//
if (runArgs) {free(runArgs); runArgs = 0;}

// Delete all pending requests
//
XrdXrootdTransPend::Clear(this);

// Now just free up our object.
//
TranStack.Push(&TranLink);
}

/******************************************************************************/
/* R e d r i v e */
/******************************************************************************/

void XrdXrootdTransit::Redrive()
{
static int eCode = htonl(kXR_NoMemory);
static char eText[] = "Insufficent memory to re-issue request";
Expand Down Expand Up @@ -426,10 +460,7 @@ int XrdXrootdTransit::Process()

// Defer the request if need be
//
if (rc >= 0 && runWait)
{if (runWait > 0) Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
return 0;
}
if (rc >= 0 && runWait) return;
runWTot = 0;

// Indicate we are no longer active
Expand All @@ -445,50 +476,6 @@ int XrdXrootdTransit::Process()
//
if (rc < 0) Link->Close();
else Link->Enable();

// All done
//
return 0;
}

/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/

void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
{

// Set ourselves as active so we can't get more requests
//
AtomicBeg(runMutex);
AtomicInc(runStatus);
AtomicEnd(runMutex);

// If we were active then we will need to quiesce before dismantling ourselves.
// Note that Recycle() can only be called if the link is enabled. So, this bit
// of code is improbable but we check it anyway.
//
if (runWait > 0) Sched->Cancel(&waitJob);

// First we need to recycle the real protocol
//
if (realProt) realProt->Recycle(lp, consec, reason);

// Now we need to recycle our xrootd part
//
XrdXrootdProtocol::Recycle(lp, consec, reason);

// Release the argument buffer
//
if (runArgs) {free(runArgs); runArgs = 0;}

// Delete all pending requests
//
XrdXrootdTransPend::Clear(this);

// Now just free up our object.
//
TranStack.Push(&TranLink);
}

/******************************************************************************/
Expand Down Expand Up @@ -772,9 +759,10 @@ int XrdXrootdTransit::Wait(XrdXrootd::Bridge::Context &rInfo,
//
if (runWCall && !(respObj->Wait(rInfo, runWait, eMsg))) return -1;

// All done, the process driver will effect the wait
// All done, schedule the wait
//
TRACEP(REQ, "Bridge delaying request " <<runWait <<" sec (" <<eMsg <<")");
Sched->Schedule((XrdJob *)&waitJob, time(0)+runWait);
return 0;
}

Expand Down
35 changes: 21 additions & 14 deletions src/XrdXrootd/XrdXrootdTransit.hh
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ bool Disc();
static void Init(XrdScheduler *schedP, int qMax, int qTTL);

//-----------------------------------------------------------------------------
//! Handle link activation (replaces parent activation).
//! Resume processing after a waitresp completion.
//-----------------------------------------------------------------------------

int Process(XrdLink *lp);
void Proceed();

//-----------------------------------------------------------------------------
//! Handle protocol redrive after wait.
//! Handle link activation (replaces parent activation).
//-----------------------------------------------------------------------------

int Process();
int Process(XrdLink *lp); // XrdProtocol override

//-----------------------------------------------------------------------------
//! Handle link shutdown.
Expand All @@ -103,10 +103,10 @@ int Process();
void Recycle(XrdLink *lp, int consec, const char *reason);

//-----------------------------------------------------------------------------
//! Reissue a request after a wait
//! Redrive a request after a wait
//-----------------------------------------------------------------------------

void Reissue();
void Redrive();

//-----------------------------------------------------------------------------
//! Initialize the valid request table.
Expand Down Expand Up @@ -156,7 +156,12 @@ void SetWait(int wtime, bool notify=false)
//! Constructor & Destructor
//-----------------------------------------------------------------------------

XrdXrootdTransit() : TranLink(this), waitJob(this) {}
XrdXrootdTransit() : TranLink(this),
respJob(this, &XrdXrootdTransit::Proceed,
"Transit proceed"),
waitJob(this, &XrdXrootdTransit::Redrive,
"Transit redrive")
{}
virtual ~XrdXrootdTransit() {}

private:
Expand All @@ -174,22 +179,24 @@ int Wait(XrdXrootd::Bridge::Context &rInfo,
int WaitResp(XrdXrootd::Bridge::Context &rInfo,
const struct iovec *ioV, int ioN, int ioL);

class WaitReq : public XrdJob
class SchedReq : public XrdJob
{public:
void DoIt() {spanP->Process();}
typedef void (XrdXrootdTransit::*callbackFP)();
void DoIt() {(spanP->*cbFunc)();}

WaitReq(XrdXrootdTransit *tP)
: XrdJob("Transit Redrive"), spanP(tP)
{}
~WaitReq() {}
SchedReq(XrdXrootdTransit *tP, callbackFP cbP, const char *why)
: XrdJob(why), spanP(tP), cbFunc(cbP) {}
~SchedReq() {}
private:
XrdXrootdTransit *spanP;
callbackFP cbFunc;
};

static XrdObjectQ<XrdXrootdTransit> TranStack;
XrdObject<XrdXrootdTransit> TranLink;

WaitReq waitJob;
SchedReq respJob;
SchedReq waitJob;
XrdSysMutex runMutex;
static const char *reqTab;
XrdProtocol *realProt;
Expand Down

0 comments on commit 08b4312

Please sign in to comment.