diff --git a/src/XrdSsi/XrdSsiReqAgent.hh b/src/XrdSsi/XrdSsiReqAgent.hh index 38263bb55ca..fdab7806e6f 100644 --- a/src/XrdSsi/XrdSsiReqAgent.hh +++ b/src/XrdSsi/XrdSsiReqAgent.hh @@ -41,6 +41,9 @@ static XrdSsiErrInfo &ErrInfoRef(XrdSsiRequest *rP) {return rP->errInfo;} static XrdSsiRespInfo *RespP(XrdSsiRequest *rP) {return &(rP->Resp);} +static void SetNode(XrdSsiRequest *rP, const char *name) + {rP->epNode = name;} + static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP) {rP->rrMutex = mP;} }; diff --git a/src/XrdSsi/XrdSsiRequest.hh b/src/XrdSsi/XrdSsiRequest.hh index cdf353c4607..54c2276439c 100644 --- a/src/XrdSsi/XrdSsiRequest.hh +++ b/src/XrdSsi/XrdSsiRequest.hh @@ -61,6 +61,8 @@ //! ProcessResponse() Initial response: Mandatary //! ProcessResponseData() Data response: Mandatory only if response data is //! asynchronously received. +//! +//! All callbacks are invoked with no locks outstanding unless otherwise noted. //----------------------------------------------------------------------------- class XrdSsiPacer; @@ -115,6 +117,19 @@ virtual void Alert(XrdSsiRespInfoMsg &aMsg) {aMsg.RecycleMsg(false);} inline uint32_t GetDetachTTL() {return detTTL;} +//----------------------------------------------------------------------------- +//! Obtain the enpoint host name. +//! +//! @return =0 Host name not available. Typically, the host name will be +//! available on the first callback to this object. +//! @return !0 Pointer to the host name. The name is valid until Finished(). +//----------------------------------------------------------------------------- + +inline +const char *GetEndPoint() {XrdSsiMutexMon(rrMutex); + return epNode; + } + //----------------------------------------------------------------------------- //! Obtain the metadata associated with a response. //! @@ -138,7 +153,6 @@ const char *GetMetadata(int &dlen) //! Obtain the request data sent by a client. //! //! This method is duplicated in XrdSsiResponder to allow calling consistency. -//! This method may be called with the object's recursive mutex unlocked! //! //! @param dlen holds the length of the request after the call. //! @@ -201,8 +215,6 @@ virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, //! stream object after a successful GetResponseData() or an asynchronous //! stream SetBuff() call. //! -//! Note: This method is called with the object's recursive mutex locked. -//! //! @param eInfo Error information. You can check if an error occured using //! eInfo.hasError() or eInfo.isOK(). //! @param buff Pointer to the buffer given to XrdSsiStream::SetBuff(). @@ -295,7 +307,7 @@ static RDR_Info RestartDataResponse(RDR_How rhow, const char *reqid=0); XrdSsiRequest(const char *reqid=0, uint16_t tmo=0) : reqID(reqid), rrMutex(0), - nextRequest(0), theRespond(0), thePacer(0), + theRespond(0), thePacer(0), epNode(0), detTTL(0), tOut(0) {} protected: @@ -368,6 +380,7 @@ XrdSsiResponder *theRespond; // Set via XrdSsiResponder::BindRequest() XrdSsiRespInfo Resp; // Set via XrdSsiResponder::SetResponse() XrdSsiErrInfo errInfo; XrdSsiPacer *thePacer; +const char *epNode; uint32_t detTTL; uint16_t tOut; }; diff --git a/src/XrdSsi/XrdSsiResource.hh b/src/XrdSsi/XrdSsiResource.hh index ca37d04ef6e..ceef83b283a 100644 --- a/src/XrdSsi/XrdSsiResource.hh +++ b/src/XrdSsi/XrdSsiResource.hh @@ -59,9 +59,9 @@ Affinity affinity;//!< Resource affinity uint32_t rOpts; //!< Resource options. One or more of he following: static -const uint32_t Reusable;//!> Resource context may be cached and reused +const uint32_t Reusable= 1;//!> Resource context may be cached and reused static -const uint32_t Discard; //!> Discard cached resource if it exists +const uint32_t Discard = 2;//!> Discard cached resource if it exists //----------------------------------------------------------------------------- //! Constructor diff --git a/src/XrdSsi/XrdSsiScale.hh b/src/XrdSsi/XrdSsiScale.hh index 2e8293fbbbe..160911cdc6e 100644 --- a/src/XrdSsi/XrdSsiScale.hh +++ b/src/XrdSsi/XrdSsiScale.hh @@ -63,7 +63,23 @@ int getEnt() {entMutex.Lock(); return -1; } -void retEnt(int xEnt) {entMutex.Lock(); pendCnt[xEnt]--; entMutex.UnLock();} +void retEnt(int xEnt) {if (xEnt >= 0 && xEnt < maxEnt) + {entMutex.Lock(); + if (pendCnt[xEnt]) pendCnt[xEnt]--; + entMutex.UnLock(); + } + } + +bool rsvEnt(int xEnt) {if (xEnt < 0 && xEnt >= maxEnt) return false; + entMutex.Lock(); + if (pendCnt[nowEnt] < maxPend) + {pendCnt[nowEnt]++; + entMutex.UnLock(); + return true; + } + entMutex.UnLock(); + return false; + } XrdSsiScale() : nowEnt(0) {memset(pendCnt, 0, sizeof(uint16_t)*maxEnt);} ~XrdSsiScale() {} diff --git a/src/XrdSsi/XrdSsiServReal.cc b/src/XrdSsi/XrdSsiServReal.cc index e40917198ff..6536e0ffc89 100644 --- a/src/XrdSsi/XrdSsiServReal.cc +++ b/src/XrdSsi/XrdSsiServReal.cc @@ -166,8 +166,13 @@ bool XrdSsiServReal::GenURL(XrdSsiResource *rP, char *buff, int blen, int uEnt) void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef) { - XrdSsiSessReal *sObj; + static const uint32_t useCache = XrdSsiResource::Reusable + | XrdSsiResource::Discard; + XrdSysMutexHelper mHelp; + XrdSsiSessReal *sObj; + std::string resKey; int uEnt; + bool hold = (resRef.rOpts & XrdSsiResource::Reusable) != 0; char epURL[4096]; // Validate the resource name @@ -177,6 +182,14 @@ void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef, return; } +// Check if this is a reusable resource. Reusable resources are a bit more +// complicated to pull off. In any case, we need to hold the cache lock. +// + if (resRef.rOpts & useCache) + {mHelp.Lock(&rcMutex); + if (ResReuse(reqRef, resRef, resKey)) return; + } + // Get a sid entry number // if ((uEnt = sidScale.getEnt()) < 0) @@ -192,43 +205,92 @@ void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef, return; } -// Obtain a new session object (note the first request uses the session mutex) +// Obtain a new session object // - if (!(sObj = Alloc(resRef.rName.c_str(), uEnt))) + if (!(sObj = Alloc(resRef.rName.c_str(), uEnt, hold))) {XrdSsiUtils::RetErr(reqRef, "Insufficient memory.", ENOMEM); sidScale.retEnt(uEnt); return; - } else XrdSsiReqAgent::SetMutex(&reqRef, sObj->MutexP()); + } // Now just provision this resource which will execute the request should it -// be successful. +// be successful. If Provision() fails, we need to delete the session object +// because its file object now is in an usable state (funky client interface). // - if (!(sObj->Provision(&reqRef, epURL))) Recycle(sObj); + if (!(sObj->Provision(&reqRef, epURL))) Recycle(sObj, false); + +// If this was started with a reusable resource, put the session in the cache. +// The resource key was constructed by the call to ResReuse() and teh cache +// mutex is still held at this point (will be released upon return). +// + if (hold) resCache[resKey] = sObj; } /******************************************************************************/ /* R e c y c l e */ /******************************************************************************/ -void XrdSsiServReal::Recycle(XrdSsiSessReal *sObj) +void XrdSsiServReal::Recycle(XrdSsiSessReal *sObj, bool reuse) { EPNAME("Recycle"); static const char *tident = "ServReal"; +// Clear all pending events (likely not needed) +// + sObj->ClrEvent(); + // Add to queue unless we have too many of these // myMutex.Lock(); actvSes--; - DEBUG("Sessions: free=" <second; + if (resRef.rOpts & XrdSsiResource::Discard || !sesP->Run(&reqRef)) + {resCache.erase(it); + sesP->UnHold(); + return false; + } + +// All done, the request should have been sent off via Reusable() call. +// + return true; +} + /******************************************************************************/ /* S t o p */ /******************************************************************************/ diff --git a/src/XrdSsi/XrdSsiServReal.hh b/src/XrdSsi/XrdSsiServReal.hh index 1dff1fc54b0..53fc08a0c19 100644 --- a/src/XrdSsi/XrdSsiServReal.hh +++ b/src/XrdSsi/XrdSsiServReal.hh @@ -29,6 +29,9 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ +#include +#include + #include "XrdSsi/XrdSsiService.hh" #include "XrdSys/XrdSysPthread.hh" @@ -41,7 +44,7 @@ public: void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef); -void Recycle(XrdSsiSessReal *sObj); +void Recycle(XrdSsiSessReal *sObj, bool reuse); bool Stop(); @@ -52,8 +55,13 @@ bool Stop(); ~XrdSsiServReal(); private: -XrdSsiSessReal *Alloc(const char *sName, int uent, bool hold=false); +XrdSsiSessReal *Alloc(const char *sName, int uent, bool hold); bool GenURL(XrdSsiResource *rP, char *buff, int blen, int uEnt); +bool ResReuse(XrdSsiRequest &reqRef, XrdSsiResource &resRef, + std::string &resKey); + +std::map resCache; +XrdSysMutex rcMutex; char *manNode; XrdSysMutex myMutex; diff --git a/src/XrdSsi/XrdSsiSessReal.cc b/src/XrdSsi/XrdSsiSessReal.cc index abe9b94f40e..a9a10ce4365 100644 --- a/src/XrdSsi/XrdSsiSessReal.cc +++ b/src/XrdSsi/XrdSsiSessReal.cc @@ -37,9 +37,9 @@ #include #include "XrdSsi/XrdSsiAtomics.hh" +#include "XrdSsi/XrdSsiReqAgent.hh" #include "XrdSsi/XrdSsiRequest.hh" #include "XrdSsi/XrdSsiRRInfo.hh" -#include "XrdSsi/XrdSsiUtils.hh" #include "XrdSsi/XrdSsiScale.hh" #include "XrdSsi/XrdSsiServReal.hh" #include "XrdSsi/XrdSsiSessReal.hh" @@ -47,7 +47,9 @@ #include "XrdSsi/XrdSsiTrace.hh" #include "XrdSsi/XrdSsiUtils.hh" +#include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysHeaders.hh" +#include "Xrd/XrdScheduler.hh" using namespace XrdSsi; @@ -88,113 +90,47 @@ namespace namespace XrdSsi { -extern XrdSsiScale sidScale; -} - -/******************************************************************************/ -/* D e s t r u c t o r */ -/******************************************************************************/ - -XrdSsiSessReal::~XrdSsiSessReal() -{ - XrdSsiTaskReal *tP; - - if (sessName) free(sessName); - if (sessNode) free(sessNode); +extern XrdScheduler *schedP; - while((tP = freeTask)) {freeTask = tP->attList.next; delete tP;} +extern XrdSysError Log; +extern XrdSsiScale sidScale; } /******************************************************************************/ -/* Private: E x e c u t e */ +/* L o c a l C l a s s e s */ /******************************************************************************/ - -// Called with sessMutex locked! -bool XrdSsiSessReal::Execute(XrdSsiRequest *reqP) +namespace { - XrdCl::XRootDStatus Status; - XrdSsiRRInfo rrInfo; - XrdSsiTaskReal *tP, *ptP; - char *reqBuff; - int reqBlen; - -// Get the request information -// - reqBuff = reqP->GetRequest(reqBlen); - -// Allocate a task object for this request -// - if (!(tP = NewTask(reqP))) return false; +class CleanUp : public XrdJob +{ +public: -// Construct the info for this request -// - rrInfo.Id(tP->ID()); - rrInfo.Size(reqBlen); +void DoIt() {sessP->Lock(); + sessP->Unprovision(); + delete this; + } -// Issue the write -// - Status = epFile.Write(rrInfo.Info(), (uint32_t)reqBlen, reqBuff, - (XrdCl::ResponseHandler *)tP, reqP->GetTimeOut()); + CleanUp(XrdSsiSessReal *sP) : sessP(sP) {} + ~CleanUp() {} -// Determine ending status. If it's bad, return an error. -// - if (!Status.IsOK()) - {std::string eText; - int eNum = XrdSsiUtils::GetErr(Status, eText); - RelTask(tP); - SetErrResponse(eText.c_str(), eNum); - return false; - } - -// Insert the task into our list of tasks -// - if ((ptP = attBase)) {INSERT(attList, ptP, tP);} - else attBase = tP; - -// We now need to change the binding to the task. -// - tP->BindRequest(*reqP); - numAT++; - return true; +private: +XrdSsiSessReal *sessP; +}; } /******************************************************************************/ -/* F i n i s h e d */ +/* D e s t r u c t o r */ /******************************************************************************/ -// Note that if we are called then Finished() must have been called while we -// were still in the open phase or in the task dispatch phase. - -void XrdSsiSessReal::Finished(XrdSsiRequest &rqstR, - const XrdSsiRespInfo &rInfo, bool cancel) +XrdSsiSessReal::~XrdSsiSessReal() { - EPNAME("SessReqFin"); - -// Document this call as it rarely happens at this point -// - DEBUG("Request="<<&rqstR<<" cancel="<Finished(rqstR, rInfo, cancel); - else {if (requestP) XrdSsi::sidScale.retEnt(uEnt); - requestP = 0; - if (doStop) - {XrdCl::XRootDStatus zStat; - epFile.IsOpen() ? Unprovision() : Shutdown(zStat); - return; - } - } + if (sessName) free(sessName); + if (sessNode) free(sessNode); -// Unlock our mutex as we are done here -// - sessMutex.UnLock(); + while((tP = freeTask)) {freeTask = tP->attList.next; delete tP;} } /******************************************************************************/ @@ -212,8 +148,8 @@ void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName, nextTID = 0; alocLeft = XrdSsiRRInfo::maxID; isHeld = hold; - doStop = false; inOpen = false; + noReuse = false; if (sessName) free(sessName); sessName = (sName ? strdup(sName) : 0); if (sessNode) free(sessNode); @@ -229,13 +165,13 @@ void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName, XrdSsiTaskReal *XrdSsiSessReal::NewTask(XrdSsiRequest *reqP) { EPNAME("NewTask"); - XrdSsiTaskReal *tP; + XrdSsiTaskReal *ptP, *tP; // Allocate a task object for this request // if ((tP = freeTask)) freeTask = tP->attList.next; else {if (!alocLeft || !(tP = new XrdSsiTaskReal(this, nextTID))) - {SetErrResponse("Too many active requests.", EMLINK); + {XrdSsiUtils::RetErr(*reqP, "Too many active requests.", EMLINK); return 0; } alocLeft--; nextTID++; @@ -245,6 +181,17 @@ XrdSsiTaskReal *XrdSsiSessReal::NewTask(XrdSsiRequest *reqP) // tP->Init(reqP, reqP->GetTimeOut()); DEBUG("Task=" <Recycle(this); + myService->Recycle(this, !noReuse); } } @@ -341,10 +321,9 @@ void XrdSsiSessReal::TaskFinished(XrdSsiTaskReal *tP) if (tP == attBase || tP->attList.next != tP) {REMOVE(attBase, attList, tP);} -// Clear asny pending task events and decrease active count +// Clear any pending task events and decrease active count // tP->ClrEvent(); - numAT--; // Return the request entry number // @@ -355,10 +334,24 @@ void XrdSsiSessReal::TaskFinished(XrdSsiTaskReal *tP) // unlock it before we return. // RelTask(tP); - if (!isHeld && numAT < 1) Unprovision(); + if (!isHeld && !attBase) Unprovision(); else sessMutex.UnLock(); } +/******************************************************************************/ +/* U n H o l d */ +/******************************************************************************/ + +void XrdSsiSessReal::UnHold() +{ + XrdSsiMutexMon sessMon(sessMutex); + +// Turn off the hold flag and if we have no attached tasks, schedule shutdown +// + isHeld = false; + if (!attBase) XrdSsi::schedP->Schedule(new CleanUp(this)); +} + /******************************************************************************/ /* Private: U n p r o v i s i o n */ /******************************************************************************/ @@ -367,18 +360,23 @@ void XrdSsiSessReal::TaskFinished(XrdSsiTaskReal *tP) void XrdSsiSessReal::Unprovision() // Called with sessMutex locked! { + EPNAME("Unprovision"); XrdCl::XRootDStatus uStat; -// Close the file this will schedule a shutdown if successful +// Clear any pending events // + DEBUG("Closing " <IsOK(); + +// If we have no requests then we may want to simply shoutdown. // Note that shutdown and unprovision unlock the sessMutex. // - if (!requestP) - {if (!status->IsOK()) Shutdown(*status); - else {if (!isHeld) Unprovision();} + if (!tP) + {if (isHeld) + {sessMutex.UnLock(); + return false; + } + if (!status->IsOK()) Shutdown(*status, false); + else {if (!isHeld) Unprovision(); + else sessMutex.UnLock(); + } return false; } // We are here because the open finally completed. If the open failed, then -// tell Finish() to do a shutdown and post an error response. +// schedule an error for all pending tasks. The Finish() call on each will +// drive the cleanup of this session. // if (!status->IsOK()) - {std::string eTxt; - int eNum = XrdSsiUtils::GetErr(*status, eTxt); - doStop = true; + {XrdSsiErrInfo eInfo; + XrdSsiUtils::SetErr(*status, eInfo); + do {tP->SchedError(&eInfo); tP = tP->attList.next;} + while(tP != attBase); sessMutex.UnLock(); - SetErrResponse(eTxt.c_str(), eNum); return false; } @@ -432,14 +441,14 @@ bool XrdSsiSessReal::XeqEvent(XrdCl::XRootDStatus *status, sessNode = strdup(currNode.c_str()); } else sessNode = strdup("Unknown!"); -// Execute the request. If we failed and this is a single request session then -// we need to disband he session. We delay this until Finish() is called. +// Execute each pending request. // - if (!Execute(requestP) && !isHeld) - {if (!requestP) Unprovision(); - else {doStop = true; - sessMutex.UnLock(); - } - } else sessMutex.UnLock(); - return false; + do {if (!tP->SendRequest(sessNode)) noReuse = true; + tP = tP->attList.next; + } while(tP != attBase); + +// We are done, field the next event +// + sessMutex.UnLock(); + return true; } diff --git a/src/XrdSsi/XrdSsiSessReal.hh b/src/XrdSsi/XrdSsiSessReal.hh index 3dca14c65c0..d69e2df7d6e 100644 --- a/src/XrdSsi/XrdSsiSessReal.hh +++ b/src/XrdSsi/XrdSsiSessReal.hh @@ -30,28 +30,23 @@ /******************************************************************************/ #include - + #include "XrdCl/XrdClFile.hh" + #include "XrdSsi/XrdSsiAtomics.hh" #include "XrdSsi/XrdSsiEvent.hh" -#include "XrdSsi/XrdSsiResponder.hh" + #include "XrdSys/XrdSysPthread.hh" class XrdSsiServReal; class XrdSsiTaskReal; -class XrdSsiSessReal : public XrdSsiEvent, public XrdSsiResponder +class XrdSsiSessReal : public XrdSsiEvent { public: XrdSsiSessReal *nextSess; - bool Execute(XrdSsiRequest *reqP); - - void Finished( XrdSsiRequest &rqstR, - const XrdSsiRespInfo &rInfo, - bool cancel=false); - void InitSession(XrdSsiServReal *servP, const char *sName, int uent, @@ -63,10 +58,16 @@ XrdSsiMutex *MutexP() {return &sessMutex;} bool Provision(XrdSsiRequest *reqP, const char *epURL); + bool Run(XrdSsiRequest *reqP); + void TaskFinished(XrdSsiTaskReal *tP); + void UnHold(); + void UnLock() {sessMutex.UnLock();} + void Unprovision(); + bool XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP); @@ -86,23 +87,20 @@ XrdCl::File epFile; private: XrdSsiTaskReal *NewTask(XrdSsiRequest *reqP); void RelTask(XrdSsiTaskReal *tP); -void Shutdown(XrdCl::XRootDStatus &epStatus); -void Unprovision(); +void Shutdown(XrdCl::XRootDStatus &epStatus, bool onClose); XrdSsiMutex sessMutex; XrdSsiServReal *myService; XrdSsiTaskReal *attBase; XrdSsiTaskReal *freeTask; -XrdSsiTaskReal *pendTask; XrdSsiRequest *requestP; char *sessName; char *sessNode; int16_t nextTID; int16_t alocLeft; -int16_t numAT; // Number of active tasks int16_t uEnt; // User index for scaling bool isHeld; bool inOpen; -bool doStop; +bool noReuse; }; #endif diff --git a/src/XrdSsi/XrdSsiTaskReal.cc b/src/XrdSsi/XrdSsiTaskReal.cc index 5744197b06a..e1e8e5221ed 100644 --- a/src/XrdSsi/XrdSsiTaskReal.cc +++ b/src/XrdSsi/XrdSsiTaskReal.cc @@ -27,6 +27,7 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ +#include #include #include "XrdSsi/XrdSsiAtomics.hh" @@ -38,7 +39,10 @@ #include "XrdSsi/XrdSsiTaskReal.hh" #include "XrdSsi/XrdSsiTrace.hh" #include "XrdSsi/XrdSsiUtils.hh" + +#include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysHeaders.hh" +#include "Xrd/XrdScheduler.hh" using namespace XrdSsi; @@ -48,8 +52,8 @@ using namespace XrdSsi; namespace { -const char *statName[] = {"isWrite", "isSync", "isReady", - "isDone", "isDead"}; +const char *statName[] = {"isPend", "isWrite", "isSync", + "isReady", "isDone", "isDead"}; XrdSsiSessReal voidSession(0, "voidSession", 0); @@ -66,7 +70,9 @@ char zedData = 0; namespace XrdSsi { -extern XrdSsiScale sidScale; +extern XrdSysError Log; +extern XrdScheduler *schedP; +extern XrdSsiScale sidScale; } /******************************************************************************/ @@ -89,6 +95,23 @@ void RecycleMsg(bool sent=true) {delete respObj; delete this;} private: XrdCl::AnyObject *respObj; }; + +/******************************************************************************/ + +class SchedEmsg : public XrdJob +{ +public: + +void DoIt() {taskP->SendError(); + delete this; + } + + SchedEmsg(XrdSsiTaskReal *tP) : taskP(tP) {} + ~SchedEmsg() {} + +private: +XrdSsiTaskReal *taskP; +}; } /******************************************************************************/ @@ -265,12 +288,30 @@ bool XrdSsiTaskReal::Kill() // Called with session mutex locked! break; case isDead: return !(mhPend || defer); break; - default: cerr <<"XrdSsiTaskReal: Invalid state " <UnLock(); + wSem.Wait(); + sessP->Lock(); + } + // If we are here then the request is potentially still active at the server. // We will send a synchronous cancel request. It shouldn't take long. // @@ -278,6 +319,7 @@ bool XrdSsiTaskReal::Kill() // Called with session mutex locked! DEBUG("Sending cancel request id=" <epFile.Truncate(rInfo.Info(), tmOut); + // If we are in the message handler or if we have a message pending, then // the message handler will dispose of the task. // @@ -305,8 +347,10 @@ void XrdSsiTaskReal::Redrive() {case XrdSsiRequest::PRD_Normal: break; case XrdSsiRequest::PRD_Hold: Hold(0); break; case XrdSsiRequest::PRD_HoldLcl: Hold(rqstP->GetRequestID()); break; - default: cerr <<"Redrive: ProcessResponseData() return invalid enum " - " - " <Schedule((XrdJob *)(new SchedEmsg(this))); + defer = true; +} + +/******************************************************************************/ +/* S e n d E r r o r */ +/******************************************************************************/ + +void XrdSsiTaskReal::SendError() +{ +// Lock the associated session +// + sessP->Lock(); + +// If there was no call to finished then we need to call to send an error +// response which will precipitate a finished call (or should). +// + if (tStat != isDead) + {int eNum; + const char *eTxt = errInfo.Get(eNum).c_str(); + sessP->UnLock(); + SetErrResponse(eTxt, eNum); + sessP->Lock(); + defer = false; + if (tStat != isDead) + {sessP->UnLock(); + return; + } + } + +// It is now safe to finish this up +// + sessP->UnLock(); + sessP->TaskFinished(this); +} + +/******************************************************************************/ +/* S e n d R e q u e s t */ +/******************************************************************************/ + +// Called with sessMutex locked! + +bool XrdSsiTaskReal::SendRequest(const char *node) +{ + XrdCl::XRootDStatus Status; + XrdSsiRRInfo rrInfo; + char *reqBuff; + int reqBlen; + +// We must be in pend state to send a request. If we are not then the request +// must have been cancelled. It also means we have a logic error since this +// should never have happened. Issue a message and ignore this request. +// + if (tStat != isPend) + {Log.Emsg("SendRequest", "Invalid state", statName[tStat], + "; should be isPend!"); + return false; + } + +// Establish the endpoint +// + XrdSsiReqAgent::SetNode(reqP, node); + +// Get the request information +// + reqBuff = reqP->GetRequest(reqBlen); + +// Construct the info for this request +// + rrInfo.Id(tskID); + rrInfo.Size(reqBlen); + tStat = isWrite; + +// Issue the write +// + Status = sessP->epFile.Write(rrInfo.Info(), (uint32_t)reqBlen, reqBuff, + (XrdCl::ResponseHandler *)this, tmOut); + +// Determine ending status. If it's bad, schedule an error. Note that calls to +// Finished() will be defered until the error thread gets control. +// + if (!Status.IsOK()) + {XrdSsiUtils::SetErr(Status, errInfo); + SchedError(); + return false; + } + +// Indicate a message handler call outstanding +// + mhPend = true; + return true; +} + /******************************************************************************/ /* S e t B u f f */ /******************************************************************************/ @@ -495,9 +646,12 @@ bool XrdSsiTaskReal::XeqEvent(XrdCl::XRootDStatus *status, switch(tStat) {case isWrite: if (!aOK) return RespErr(status); // Unlocks the mutex! - DEBUG("Calling RelBuff id=" <GetRequestID()); break; - default: cerr <<"XeqEvent: ProcessResponseData() return invalid enum " - " - " <GetRequestID();} +void SchedError(XrdSsiErrInfo *eInfo=0); + +void SendError(); + +bool SendRequest(const char *node); + int SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen, bool &last); bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen); @@ -80,7 +90,7 @@ bool XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP); XrdSsiTaskReal(XrdSsiSessReal *sP, short tid) : XrdSsiEvent("TaskReal"), XrdSsiStream(XrdSsiStream::isPassive), - sessP(sP), mdResp(0), tskID(tid), + sessP(sP), mdResp(0), wPost(0), tskID(tid), mhPend(false), defer(false) {} @@ -98,11 +108,11 @@ respType GetResp(XrdCl::AnyObject **respP, char *&dbuf, int &dlen); bool RespErr(XrdCl::XRootDStatus *status); bool XeqEnd(bool getLock); -XrdSysRecMutex rrMutex; -XrdSysMutex taskMutex; +XrdSsiErrInfo errInfo; XrdSsiSessReal *sessP; XrdSsiRequest *rqstP; XrdCl::AnyObject *mdResp; +XrdSysSemaphore *wPost; char *dataBuff; int dataRlen; TaskStat tStat;