From 18f767de257e81700d3978ef11512b22bac0bea2 Mon Sep 17 00:00:00 2001 From: Andrew Hanushevsky Date: Fri, 3 Feb 2017 17:23:16 -0800 Subject: [PATCH] [XrdSsi] Version 2 changes. --- src/XrdSsi.cmake | 5 +- src/XrdSsi/XrdSsiAlert.cc | 151 ++++++ src/XrdSsi/XrdSsiAlert.hh | 72 +++ src/XrdSsi/XrdSsiClient.cc | 24 +- src/XrdSsi/XrdSsiErrInfo.hh | 73 +-- src/XrdSsi/XrdSsiEvent.cc | 4 +- src/XrdSsi/XrdSsiEvent.hh | 5 +- src/XrdSsi/XrdSsiFileReq.cc | 159 ++++-- src/XrdSsi/XrdSsiFileReq.hh | 45 +- src/XrdSsi/XrdSsiFileResource.cc | 81 +++ .../{XrdSsiSSRun.hh => XrdSsiFileResource.hh} | 41 +- src/XrdSsi/XrdSsiFileSess.cc | 95 +--- src/XrdSsi/XrdSsiFileSess.hh | 12 +- src/XrdSsi/XrdSsiProvider.hh | 92 +++- src/XrdSsi/XrdSsiRRInfo.hh | 3 +- src/XrdSsi/XrdSsiRequest.cc | 93 ++-- src/XrdSsi/XrdSsiRequest.hh | 235 ++++---- src/XrdSsi/XrdSsiResource.hh | 54 +- src/XrdSsi/XrdSsiRespInfo.hh | 56 ++ src/XrdSsi/XrdSsiResponder.hh | 210 ++++---- src/XrdSsi/XrdSsiSSRun.cc | 144 ----- src/XrdSsi/XrdSsiScale.hh | 77 +++ src/XrdSsi/XrdSsiServReal.cc | 160 +++--- src/XrdSsi/XrdSsiServReal.hh | 9 +- src/XrdSsi/XrdSsiService.hh | 108 ++-- src/XrdSsi/XrdSsiSessReal.cc | 502 ++++++++---------- src/XrdSsi/XrdSsiSessReal.hh | 81 ++- src/XrdSsi/XrdSsiSession.hh | 159 ------ src/XrdSsi/XrdSsiSfsConfig.cc | 11 +- src/XrdSsi/XrdSsiStream.hh | 52 +- src/XrdSsi/XrdSsiTaskReal.cc | 285 +++++++--- src/XrdSsi/XrdSsiTaskReal.hh | 27 +- src/XrdSsi/XrdSsiUtils.cc | 131 ++++- src/XrdSsi/XrdSsiUtils.hh | 14 + 34 files changed, 1839 insertions(+), 1431 deletions(-) create mode 100644 src/XrdSsi/XrdSsiAlert.cc create mode 100644 src/XrdSsi/XrdSsiAlert.hh create mode 100644 src/XrdSsi/XrdSsiFileResource.cc rename src/XrdSsi/{XrdSsiSSRun.hh => XrdSsiFileResource.hh} (68%) delete mode 100644 src/XrdSsi/XrdSsiSSRun.cc create mode 100644 src/XrdSsi/XrdSsiScale.hh delete mode 100644 src/XrdSsi/XrdSsiSession.hh diff --git a/src/XrdSsi.cmake b/src/XrdSsi.cmake index 5cab3015b4d..37bc3c97952 100644 --- a/src/XrdSsi.cmake +++ b/src/XrdSsi.cmake @@ -15,6 +15,7 @@ set( XRD_SSI_SHMAP_SOVERSION 1 ) add_library( XrdSsiLib SHARED +XrdSsi/XrdSsiAlert.cc XrdSsi/XrdSsiAlert.hh XrdSsi/XrdSsiAtomics.hh XrdSsi/XrdSsiBVec.hh XrdSsi/XrdSsiClient.cc @@ -22,6 +23,7 @@ XrdSsi/XrdSsiClient.cc XrdSsi/XrdSsiCms.cc XrdSsi/XrdSsiCms.hh XrdSsi/XrdSsiErrInfo.hh XrdSsi/XrdSsiEvent.cc XrdSsi/XrdSsiEvent.hh +XrdSsi/XrdSsiFileResource.cc XrdSsi/XrdSsiFileResource.hh XrdSsi/XrdSsiLogger.cc XrdSsi/XrdSsiLogger.hh XrdSsi/XrdSsiPacer.cc XrdSsi/XrdSsiPacer.hh XrdSsi/XrdSsiProvider.hh @@ -30,11 +32,10 @@ XrdSsi/XrdSsiPacer.cc XrdSsi/XrdSsiPacer.hh XrdSsi/XrdSsiRequest.cc XrdSsi/XrdSsiRequest.hh XrdSsi/XrdSsiResponder.hh XrdSsi/XrdSsiResource.hh + XrdSsi/XrdSsiScale.hh XrdSsi/XrdSsiServReal.cc XrdSsi/XrdSsiServReal.hh XrdSsi/XrdSsiService.hh XrdSsi/XrdSsiSessReal.cc XrdSsi/XrdSsiSessReal.hh -XrdSsi/XrdSsiSSRun.cc XrdSsi/XrdSsiSSRun.hh - XrdSsi/XrdSsiSession.hh XrdSsi/XrdSsiStream.hh XrdSsi/XrdSsiTaskReal.cc XrdSsi/XrdSsiTaskReal.hh XrdSsi/XrdSsiTrace.hh diff --git a/src/XrdSsi/XrdSsiAlert.cc b/src/XrdSsi/XrdSsiAlert.cc new file mode 100644 index 00000000000..d8a71870e6e --- /dev/null +++ b/src/XrdSsi/XrdSsiAlert.cc @@ -0,0 +1,151 @@ +/******************************************************************************/ +/* */ +/* X r d S s i A l e r t . c c */ +/* */ +/* (c) 2017 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include + +#include "XrdOuc/XrdOucErrInfo.hh" +#include "XrdSsi/XrdSsiAlert.hh" +#include "XrdSsi/XrdSsiRRInfo.hh" + +/******************************************************************************/ +/* S t a t i c s */ +/******************************************************************************/ + +XrdSysMutex XrdSsiAlert::aMutex; +XrdSsiAlert *XrdSsiAlert::free = 0; +int XrdSsiAlert::fNum = 0; +int XrdSsiAlert::fMax = XrdSsiAlert::fmaxDflt; + +/******************************************************************************/ +/* A l l o c */ +/******************************************************************************/ + +XrdSsiAlert *XrdSsiAlert::Alloc(XrdSsiRespInfoMsg &aMsg) +{ + XrdSsiAlert *aP; + +// Obtain a lock +// + aMutex.Lock(); + +// Allocate via stack or a new call +// + if (!(aP = free)) aP = new XrdSsiAlert(); + else {free = aP->next; fNum--;} + +// Unlock mutex +// + aMutex.UnLock(); + +// Fill out object and return it +// + aP->next = 0; + aP->theMsg = &aMsg; + return aP; +} + +/******************************************************************************/ +/* D o n e */ +/******************************************************************************/ + +// Gets invoked only after query() on wtresp signal was sent + +void XrdSsiAlert::Done(int &retc, XrdOucErrInfo *eiP, const char *name) +{ + +// This is an async callback so we need to delete our errinfo object. +// + delete eiP; + +// Simply recycle this object. +// + Recycle(); +} + +/******************************************************************************/ +/* R e c y c l e */ +/******************************************************************************/ + +void XrdSsiAlert::Recycle() +{ + +// Issue callback to release the message if we have one +// + if (theMsg) theMsg->Recycle(); + +// Place object on the queue unless we have too many +// + aMutex.Lock(); + if (fNum >= fMax) delete this; + else {next = free; free = this; fNum++;} + aMutex.UnLock(); +} + +/******************************************************************************/ +/* S e t I n f o */ +/******************************************************************************/ + +void XrdSsiAlert::SetInfo(XrdOucErrInfo &eInfo) +{ + static const int aIovSz = 3; + struct AlrtResp {struct iovec ioV[aIovSz]; XrdSsiRRInfoAttn aHdr;}; + + AlrtResp *alrtResp; + char *mBuff; + int n; + +// We will be constructing the response in the message buffer. This is +// gauranteed to be big enough for our purposes so no need to check the size. +// + mBuff = eInfo.getMsgBuff(n); + +// Initialize the response +// + alrtResp = (AlrtResp *)mBuff; + memset(alrtResp, 0, sizeof(AlrtResp)); + alrtResp->aHdr.pfxLen = htons(sizeof(XrdSsiRRInfoAttn)); + +// Fill out iovec to point to our header +// +// alrtResp->ioV[0].iov_len = sizeof(XrdSsiRRInfoAttn) + msgBlen; + alrtResp->ioV[1].iov_base = mBuff+offsetof(struct AlrtResp, aHdr); + alrtResp->ioV[1].iov_len = sizeof(XrdSsiRRInfoAttn); + +// Fill out the iovec for the alert data +// + alrtResp->ioV[2].iov_base = theMsg->GetMsg(n); + alrtResp->ioV[2].iov_len = n; + alrtResp->aHdr.mdLen = htonl(n); + alrtResp->aHdr.tag = XrdSsiRRInfoAttn::alrtResp; + +// Setup to have metadata actually sent to the requestor +// + eInfo.setErrCode(aIovSz); +} diff --git a/src/XrdSsi/XrdSsiAlert.hh b/src/XrdSsi/XrdSsiAlert.hh new file mode 100644 index 00000000000..b881e3b769c --- /dev/null +++ b/src/XrdSsi/XrdSsiAlert.hh @@ -0,0 +1,72 @@ +#ifndef _XRDSSIALERT_H +#define _XRDSSIALERT_H +/******************************************************************************/ +/* */ +/* X r d S s i A l e r t . h h */ +/* */ +/* (c) 2017 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* All Rights Reserved */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include "XrdOuc/XrdOucErrInfo.hh" +#include "XrdSsi/XrdSsiRequest.hh" +#include "XrdSys/XrdSysPthread.hh" + +class XrdSsiAlert : public XrdOucEICB +{ +public: + +XrdSsiAlert *next; + +static XrdSsiAlert *Alloc(XrdSsiRespInfoMsg &aMsg); + + void Recycle(); + + void SetInfo(XrdOucErrInfo &eInfo); + +static void SetMax(int maxval) {fMax = maxval;} + +// OucEICB methods +// + void Done(int &Result, XrdOucErrInfo *cbInfo, + const char *path=0); + + int Same(unsigned long long arg1, unsigned long long arg2) + {return 0;} + + XrdSsiAlert() {} + ~XrdSsiAlert() {} +private: + +static XrdSysMutex aMutex; +static XrdSsiAlert *free; +static int fNum; +static int fMax; + +static const int fmaxDflt = 100; + +XrdSsiRespInfoMsg *theMsg; +}; +#endif diff --git a/src/XrdSsi/XrdSsiClient.cc b/src/XrdSsi/XrdSsiClient.cc index 9500d04e0a9..fd7eaa659cb 100644 --- a/src/XrdSsi/XrdSsiClient.cc +++ b/src/XrdSsi/XrdSsiClient.cc @@ -29,8 +29,12 @@ #include #include +#include +#include #include +#include #include +#include #include "Xrd/XrdScheduler.hh" #include "Xrd/XrdTrace.hh" @@ -84,15 +88,15 @@ class XrdSsiClientProvider : public XrdSsiProvider { public: -XrdSsiService *GetService(XrdSsiErrInfo &eInfo, - const char *contact, - int oHold=256 +XrdSsiService *GetService(XrdSsiErrInfo &eInfo, + const std::string &contact, + int oHold=256 ); virtual bool Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, - const char *cfgFn, - const char *parms, + std::string cfgFn, + std::string parms, int argc, char **argv ) {return true;} @@ -117,9 +121,9 @@ void SetScheduler(); /* X r d S s i C l i e n t P r o v i d e r : : G e t S e r v i c e */ /******************************************************************************/ -XrdSsiService *XrdSsiClientProvider::GetService(XrdSsiErrInfo &eInfo, - const char *contact, - int oHold) +XrdSsiService *XrdSsiClientProvider::GetService(XrdSsiErrInfo &eInfo, + const std::string &contact, + int oHold) { static const int maxTMO = 0x7fffffff; XrdNetAddr netAddr; @@ -143,12 +147,12 @@ XrdSsiService *XrdSsiClientProvider::GetService(XrdSsiErrInfo &eInfo, // If no contact is given then declare an error // - if (!contact || !(*contact)) + if (contact.empty()) {eInfo.Set("Contact not specified.", EINVAL); return 0;} // Validate the given contact // - if ((eText = netAddr.Set(contact))) + if ((eText = netAddr.Set(contact.c_str()))) {eInfo.Set(eText, EINVAL); return 0;} // Construct new binding diff --git a/src/XrdSsi/XrdSsiErrInfo.hh b/src/XrdSsi/XrdSsiErrInfo.hh index 2731689ed9b..55dc1409ee5 100644 --- a/src/XrdSsi/XrdSsiErrInfo.hh +++ b/src/XrdSsi/XrdSsiErrInfo.hh @@ -29,7 +29,7 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ -#include +#include #include //----------------------------------------------------------------------------- @@ -45,43 +45,27 @@ public: //! Reset and clear error information. //----------------------------------------------------------------------------- - void Clr() {if (errText) {free(errText); errText = 0;}; - errArg = errNum = 0; - } - -//----------------------------------------------------------------------------- -//! Determine whether or not error information is present. -//! -//! @return true Error information is present. -//! @return false Error information not present. -//----------------------------------------------------------------------------- - - bool Dirty() {return errText != 0;} + void Clr() {errText.clear(); errArg = errNum = 0;} //----------------------------------------------------------------------------- //! Get current error information. //! //! @param eNum place where the error number is to be placed. //! -//! @return =0 no error information is available, eNum also is zero. -//! @return !0 pointer to a string describing the error associated with eNum. -//! The pointer is valid until the object is deleted or Set(). +//! @return The error text and the error number value. //----------------------------------------------------------------------------- -const char *Get(int &eNum) - {if (!(eNum = errNum)) return 0; - return errText; - } +const +std::string &Get(int &eNum) const {eNum = errNum; return errText;} //----------------------------------------------------------------------------- //! Get current error text. //! -//! @return =0 no error information is available. -//! @return !0 pointer to a string describing the error. -//! The pointer is valid until the object is deleted or Set(). +//! @return The error text. //----------------------------------------------------------------------------- -const char *Get() {return errText;} +const +std::string &Get() const {return errText;} //----------------------------------------------------------------------------- //! Get current error argument. @@ -89,20 +73,41 @@ const char *Get() {return errText;} //! @return the error argument value. //----------------------------------------------------------------------------- - int GetArg() {return errArg;} + int GetArg() const {return errArg;} + +//----------------------------------------------------------------------------- +//! Check if there is an error. +//! +//! @return True if an error exists and false otherwise. +//----------------------------------------------------------------------------- + + bool hasError() const {return errNum != 0;} + +//----------------------------------------------------------------------------- +//! Check if there is no error. +//! +//! @return True if no error exists and false otherwise. +//----------------------------------------------------------------------------- + + bool isOK() const {return errNum == 0;} //----------------------------------------------------------------------------- -//! Set new error information. +//! Set new error information. There are two obvious variations. //! -//! @param eMsg pointer to a string describing the error. If zero, the eNum +//! @param eMsg pointer to a string describing the error. If nil, the eNum //! is taken as errno and strerror(eNum) is used. //! @param eNum the error number associated with the error. //! @param eArg the error argument, if any (see XrdSsiService::Provision()). //----------------------------------------------------------------------------- void Set(const char *eMsg=0, int eNum=0, int eArg=0) - {if (errText) free(errText); - errText = strdup((eMsg && *eMsg ? eMsg : strerror(eNum))); + {errText = (eMsg && *eMsg ? eMsg : strerror(eNum)); + errNum = eNum; + errArg = eArg; + } + + void Set(const std::string &eMsg, int eNum=0, int eArg=0) + {errText = (eMsg.empty() ? strerror(eNum) : eMsg); errNum = eNum; errArg = eArg; } @@ -127,14 +132,14 @@ XrdSsiErrInfo &operator=(XrdSsiErrInfo const &rhs) //! Constructor and Destructor //----------------------------------------------------------------------------- - XrdSsiErrInfo() : errText(0), errNum(0), errArg(0) {} + XrdSsiErrInfo() : errNum(0), errArg(0) {} - ~XrdSsiErrInfo() {Clr();} + ~XrdSsiErrInfo() {} private: -char *errText; -int errNum; -int errArg; +std::string errText; +int errNum; +int errArg; }; #endif diff --git a/src/XrdSsi/XrdSsiEvent.cc b/src/XrdSsi/XrdSsiEvent.cc index 52ed70bcd87..0b7c73592bd 100644 --- a/src/XrdSsi/XrdSsiEvent.cc +++ b/src/XrdSsi/XrdSsiEvent.cc @@ -137,8 +137,8 @@ void XrdSsiEvent::DoIt() EventData myEvent, *edP = &myEvent; // Process all of the events in our list. This is a tricky proposition because -// the event executor may delete us (it returns false if so). To prevent -// double frees and the like, we move out the event and work off a local copy. +// the event executor may delete us when false is returned. To prevent double +// frees and the like, we move out the event and work off a local copy. // evMutex.Lock(); do{thisEvent.Move2(myEvent); diff --git a/src/XrdSsi/XrdSsiEvent.hh b/src/XrdSsi/XrdSsiEvent.hh index 5aaac057247..e41e339b9bb 100644 --- a/src/XrdSsi/XrdSsiEvent.hh +++ b/src/XrdSsi/XrdSsiEvent.hh @@ -51,8 +51,9 @@ virtual void HandleResponse(XrdCl::XRootDStatus *status, virtual bool XeqEvent(XrdCl::XRootDStatus *st, XrdCl::AnyObject **resp) = 0; - XrdSsiEvent(const char *hName="") : XrdJob(hName), lastEvent(0), - running(false) {} + XrdSsiEvent(const char *hName="") : XrdJob(hName), lastEvent(0), + running(false) + {} ~XrdSsiEvent() {ClrEvent();} diff --git a/src/XrdSsi/XrdSsiFileReq.cc b/src/XrdSsi/XrdSsiFileReq.cc index 3ee619beea4..a0f05800c1b 100644 --- a/src/XrdSsi/XrdSsiFileReq.cc +++ b/src/XrdSsi/XrdSsiFileReq.cc @@ -37,8 +37,11 @@ #include "XrdOuc/XrdOucErrInfo.hh" #include "XrdSfs/XrdSfsDio.hh" #include "XrdSfs/XrdSfsXio.hh" +#include "XrdSsi/XrdSsiAlert.hh" #include "XrdSsi/XrdSsiFileReq.hh" +#include "XrdSsi/XrdSsiFileResource.hh" #include "XrdSsi/XrdSsiFileSess.hh" +#include "XrdSsi/XrdSsiService.hh" #include "XrdSsi/XrdSsiSfs.hh" #include "XrdSsi/XrdSsiStream.hh" #include "XrdSsi/XrdSsiTrace.hh" @@ -58,6 +61,7 @@ namespace XrdSsi { extern XrdSysError Log; extern XrdScheduler *Sched; +extern XrdSsiService *Service; }; using namespace XrdSsi; @@ -111,16 +115,62 @@ void XrdSsiFileReq::Activate(XrdOucBuffer *oP, XrdSfsXioHandle *bR, int rSz) Sched->Schedule((XrdJob *)this); } +/******************************************************************************/ +/* A l e r t */ +/******************************************************************************/ + +void XrdSsiFileReq::Alert(XrdSsiRespInfoMsg &aMsg) +{ + EPNAME("Alert"); + const XrdSsiRespInfo *rP = RespP(); + XrdSsiAlert *aP; + int msgLen; + +// Do some debugging +// + aMsg.GetMsg(msgLen); + DEBUGXQ(msgLen <<" byte alert presented wtr=" <rType != XrdSsiRespInfo::isNone || isEnding) + {frqMutex.UnLock(); + aMsg.Recycle(); + return; + } + +// Allocate an alert object and chain it into the pending queue +// + aP = XrdSsiAlert::Alloc(aMsg); + +// If the client is waiting for a response then we can send the alert now. +// Otherwise, we need to queue it until he client comes back to us. +// + if (respWait) WakeUp(aP); + else {if (alrtLast) alrtLast->next = aP; + else alrtPend = aP; + alrtLast = aP; + } + +// All done +// + frqMutex.UnLock(); +} + /******************************************************************************/ /* A l l o c */ /******************************************************************************/ -XrdSsiFileReq *XrdSsiFileReq::Alloc(XrdOucErrInfo *eiP, - XrdSsiFileSess *fP, - XrdSsiSession *sP, - const char *sID, - const char *cID, - int rnum) +XrdSsiFileReq *XrdSsiFileReq::Alloc(XrdOucErrInfo *eiP, + XrdSsiFileResource *rP, + XrdSsiFileSess *fP, + const char *sID, + const char *cID, + int rnum) { XrdSsiFileReq *nP; @@ -141,7 +191,7 @@ XrdSsiFileReq *XrdSsiFileReq::Alloc(XrdOucErrInfo *eiP, // if (nP) {nP->sessN = sID; - nP->sessP = sP; + nP->fileR = rP; nP->fileP = fP; nP->cbInfo = eiP; nP->reqID = rnum; @@ -157,19 +207,19 @@ XrdSsiFileReq *XrdSsiFileReq::Alloc(XrdOucErrInfo *eiP, /* B i n d D o n e */ /******************************************************************************/ -// This is called with reqMutex locked! +// This is called with frqMutex locked! -void XrdSsiFileReq::BindDone(XrdSsiSession *sP) +void XrdSsiFileReq::BindDone() { EPNAME("BindDone"); // Do some debugging // - DEBUGXQ("Bind called; session " <<(sP ? "set" : "nil")); + DEBUGXQ("Bind called; for request " <ProcessRequest((XrdSsiRequest *)this); + Service->ProcessRequest((XrdSsiRequest &)*this, + (XrdSsiFileResource &)*fileR); return; break; - case isAbort: DEBUGXQ("Skipped calling session Process"); + case isAbort: DEBUGXQ("Skipped calling service processor"); mHelper.UnLock(); Recycle(); return; @@ -243,7 +294,7 @@ void XrdSsiFileReq::DoIt() void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name) { EPNAME("Done"); - XrdSsiMutexMon(reqMutex); + XrdSsiMutexMon mHelper(frqMutex); // We may need to delete the errinfo object if this callback was async. Note // that the following test is valid even if the file object has been deleted. @@ -312,7 +363,7 @@ int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value // Get correct error code and message // - eMsg = eObj.Get(eNum); + eMsg = eObj.Get(eNum).c_str(); if (eNum <= 0) eNum = EFAULT; if (!eMsg || !(*eMsg)) eMsg = "reason unknown"; @@ -337,9 +388,21 @@ int XrdSsiFileReq::Emsg(const char *pfx, // Message prefix value void XrdSsiFileReq::Finalize() { EPNAME("Finalize"); - XrdSsiMutexMon mHelper(reqMutex); + XrdSsiMutexMon mHelper(frqMutex); bool cancel = (myState != odRsp); +// Release any unsent alerts (prevent any new alerts from being accepted) +// + isEnding = true; + if (alrtSent || alrtPend) + {XrdSsiAlert *dP, *aP = alrtSent; + if (aP) aP->next = alrtPend; + else aP = alrtPend; + mHelper.UnLock(); + while((dP = aP)) {aP = aP->next; dP->Recycle();} + mHelper.Lock(frqMutex); + } + // Processing is determined by the responder's state // switch(urState) @@ -420,8 +483,10 @@ void XrdSsiFileReq::Init(const char *cID) cbInfo = 0; respCB = 0; respCBarg = 0; + alrtSent = 0; + alrtPend = 0; + alrtLast = 0; sessN = "???"; - sessP = 0; oucBuff = 0; sfsBref = 0; strBuff = 0; @@ -436,16 +501,19 @@ void XrdSsiFileReq::Init(const char *cID) respWait = false; strmEOF = false; isPerm = false; + isEnding = false; + SetMutex(&frqMutex); } /******************************************************************************/ /* Protected: P r o c e s s R e s p o n s e */ /******************************************************************************/ -bool XrdSsiFileReq::ProcessResponse(const XrdSsiRespInfo &Resp, bool isOK) +bool XrdSsiFileReq::ProcessResponse(const XrdSsiErrInfo &eInfo, + const XrdSsiRespInfo &Resp) { EPNAME("ProcessResponse"); - XrdSsiMutexMon mHelper(reqMutex); + XrdSsiMutexMon mHelper(frqMutex); // Do some debugging // @@ -673,7 +741,7 @@ void XrdSsiFileReq::Recycle() void XrdSsiFileReq::RelRequestBuffer() { EPNAME("RelReqBuff"); - XrdSsiMutexMon mHelper(reqMutex); + XrdSsiMutexMon mHelper(frqMutex); // Do some debugging // @@ -814,8 +882,31 @@ int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP, bool XrdSsiFileReq::WantResponse(XrdOucErrInfo &eInfo) { - XrdSsiMutexMon(reqMutex); - const XrdSsiRespInfo *rspP = RespP(); + EPNAME("WantResp"); + XrdSsiMutexMon frqMon; + const XrdSsiRespInfo *rspP; + +// Check if we have a previos alert that was sent (we need to recycle it). We +// don't need a lock for this as it's fully serialized via serial fsctl calls. +// + if (alrtSent) {alrtSent->Recycle(); alrtSent = 0;} + +// Serialize the remainder of this code +// + frqMon.Lock(frqMutex); + rspP = RespP(); + +// If we have a pending alert then we need to send it now. Suppress the callback +// as we will recycle the alert on the next call (there should be one). +// + if (alrtPend) + {alrtSent = alrtPend; + if (!(alrtPend = alrtPend->next)) alrtLast = 0; + alrtSent->SetInfo(eInfo); + eInfo.setErrCB((XrdOucEICB *)0); + DEBUGXQ("alert sent; " <<(alrtPend ? "" : "no ") <<"more pending"); + return true; + } // Check if a response is here (well, ProcessResponse was called) // @@ -843,7 +934,7 @@ bool XrdSsiFileReq::WantResponse(XrdOucErrInfo &eInfo) /* Private: W a k e U p */ /******************************************************************************/ -void XrdSsiFileReq::WakeUp() // Called with reqMutex locked! +void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked! { EPNAME("WakeUp"); XrdOucErrInfo *wuInfo = @@ -855,15 +946,21 @@ void XrdSsiFileReq::WakeUp() // Called with reqMutex locked! // DEBUGXQ("respCBarg=" <AttnInfo(*wuInfo, rspP, reqID)) - {wuInfo->setErrCB((XrdOucEICB *)this, respCBarg); myState = odRsp;} + if (aP) + {aP->SetInfo(*wuInfo); + wuInfo->setErrCB((XrdOucEICB *)aP, respCBarg); + } else { + if (fileP->AttnInfo(*wuInfo, rspP, reqID)) + {wuInfo->setErrCB((XrdOucEICB *)this, respCBarg); myState = odRsp;} + } -// Tell the client to issue a read now or handle the full response. +// Tell the client to issue a read now or handle the alert or full response. // - respWait = false; + respWait = false; respCB->Done(respCode, wuInfo, sessN); } diff --git a/src/XrdSsi/XrdSsiFileReq.hh b/src/XrdSsi/XrdSsiFileReq.hh index dc98557a0b5..1e53215f38d 100644 --- a/src/XrdSsi/XrdSsiFileReq.hh +++ b/src/XrdSsi/XrdSsiFileReq.hh @@ -42,8 +42,12 @@ class XrdOucErrInfo; class XrdSfsXioHandle; +class XrdSsiAlert; +class XrdSsiFileResource; class XrdSsiFileSess; +class XrdSsiRespInfoMsg; class XrdSsiRRInfo; +class XrdSsiService; class XrdSsiStream; class XrdSsiFileReq : public XrdSsiRequest, public XrdSsiResponder, @@ -54,19 +58,28 @@ public: // SsiRequest methods // -static XrdSsiFileReq *Alloc(XrdOucErrInfo *eP, XrdSsiFileSess *fP, - XrdSsiSession *sP, const char *sn, - const char *id, int rnum); - void Activate(XrdOucBuffer *oP, XrdSfsXioHandle *bR, int rSz); - void BindDone(XrdSsiSession *sP); + void Alert(XrdSsiRespInfoMsg &aMsg); + +static XrdSsiFileReq *Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP, + XrdSsiFileSess *fP, const char *sn, + const char *id, int rnum); + + void BindDone(); void Finalize(); + using XrdSsiRequest::Finished; + + void Finished( XrdSsiRequest &rqstR, + const XrdSsiRespInfo &rInfo, + bool cancel=false) {} + char *GetRequest(int &rLen); - bool ProcessResponse(const XrdSsiRespInfo &resp, bool isOK); + bool ProcessResponse(const XrdSsiErrInfo &eInfo, + const XrdSsiRespInfo &resp); XrdSfsXferSize Read(bool &done, char *buffer, @@ -78,13 +91,6 @@ static XrdSsiFileReq *Alloc(XrdOucErrInfo *eP, XrdSsiFileSess *fP, static void SetMax(int mVal) {freeMax = mVal;} - void SSRun(XrdSsiService &, XrdSsiResource &, - unsigned short tmo=0) {(void)tmo;} - - void SSRun(XrdSsiService &, const char *, - const char *ruser=0, unsigned short tmo=0) - {(void)ruser; (void)tmo;} - bool WantResponse(XrdOucErrInfo &eInfo); // OucEICB methods @@ -101,7 +107,8 @@ static void SetMax(int mVal) {freeMax = mVal;} // Constructor and destructor // XrdSsiFileReq(const char *cID=0) - : XrdSsiResponder(this, (void *)0) {Init(cID);} + : frqMutex(XrdSsiMutex::Recursive) + {Init(cID);} virtual ~XrdSsiFileReq() {if (tident) free(tident);} @@ -121,23 +128,28 @@ XrdSfsXferSize readStrmP(XrdSsiStream *strmP, char *buff, int sendStrmA(XrdSsiStream *strmP, XrdSfsDio *sfDio, XrdSfsXferSize blen); void Recycle(); -void WakeUp(); +void WakeUp(XrdSsiAlert *aP=0); static XrdSysMutex aqMutex; static XrdSsiFileReq *freeReq; static int freeCnt; static int freeMax; +XrdSsiMutex frqMutex; XrdSsiFileReq *nextReq; XrdSysSemaphore *finWait; XrdOucEICB *respCB; unsigned long long respCBarg; +XrdSsiAlert *alrtSent; +XrdSsiAlert *alrtPend; +XrdSsiAlert *alrtLast; + char *tident; const char *sessN; XrdOucErrInfo *cbInfo; +XrdSsiFileResource *fileR; XrdSsiFileSess *fileP; -XrdSsiSession *sessP; char *respBuf; long long respOff; union {long long fileSz; @@ -154,6 +166,7 @@ bool respWait; bool strmEOF; bool schedDone; bool isPerm; +bool isEnding; char rID[8]; }; #endif diff --git a/src/XrdSsi/XrdSsiFileResource.cc b/src/XrdSsi/XrdSsiFileResource.cc new file mode 100644 index 00000000000..cd2d82a395e --- /dev/null +++ b/src/XrdSsi/XrdSsiFileResource.cc @@ -0,0 +1,81 @@ +/******************************************************************************/ +/* */ +/* X r d S s i F i l e R e s o u r c e . c c */ +/* */ +/* (c) 2017 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include + +#include "XrdOuc/XrdOucEnv.hh" + +#include "XrdNet/XrdNetAddrInfo.hh" +#include "XrdSec/XrdSecEntity.hh" +#include "XrdSsi/XrdSsiFileResource.hh" + +/******************************************************************************/ +/* I n i t */ +/******************************************************************************/ + +void XrdSsiFileResource::Init(const char *path, XrdOucEnv &envX, int atype) +{ + const XrdSecEntity *entP = envX.secEnv(); + const char *rVal; + int n; + +// Always supply the trace identity +// + mySec.tident = (entP ? entP->tident : "ssi"); + +// Construct the security information +// + if (atype && entP) + {strncpy(mySec.prot, entP->prot, XrdSsiPROTOIDSIZE); + mySec.name = entP->name; + mySec.host = (atype <= 1 ? entP->host + : entP->addrInfo->Name(entP->host)); + mySec.role = entP->vorg; + mySec.role = entP->role; + mySec.grps = entP->grps; + mySec.endorsements = entP->endorsements; + mySec.creds = entP->creds; + mySec.credslen = entP->credslen; + client = &mySec; + } + +// Fill out the resource name and user +// + rName = path; + if ((rVal = envX.Get("ssi.user"))) rUser = rVal; + else rUser.clear(); + +// Fill out the the optional cgi info +// + if (!(rVal = envX.Get("ssi.cgi"))) rInfo.clear(); + else {rVal = envX.Env(n); + if (!(rVal = strstr(rVal, "ssi.cgi="))) rInfo.clear(); + else rInfo = rVal+8; + } +} diff --git a/src/XrdSsi/XrdSsiSSRun.hh b/src/XrdSsi/XrdSsiFileResource.hh similarity index 68% rename from src/XrdSsi/XrdSsiSSRun.hh rename to src/XrdSsi/XrdSsiFileResource.hh index 366cb068fcc..bac0bf31c18 100644 --- a/src/XrdSsi/XrdSsiSSRun.hh +++ b/src/XrdSsi/XrdSsiFileResource.hh @@ -1,10 +1,10 @@ -#ifndef __XRDSSISSRUN_HH__ -#define __XRDSSISSRUN_HH__ +#ifndef __SSI_FILERESOURCE_H__ +#define __SSI_FILERESOURCE_H__ /******************************************************************************/ /* */ -/* X r d S s i S S R u n . h h */ +/* X r d S s i F i l e R e s o u r c e . h h */ /* */ -/* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* (c) 2017 by the Board of Trustees of the Leland Stanford, Jr., University */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ @@ -29,34 +29,27 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ -#include "XrdSsi/XrdSsiService.hh" - -//----------------------------------------------------------------------------- -//! The XrdSsiSSRun object effects the SSRun() methods in the SsiRequest object. -//----------------------------------------------------------------------------- +#include +#include +#include -class XrdSsiRequest; +#include "XrdSsi/XrdSsiEntity.hh" +#include "XrdSsi/XrdSsiResource.hh" -class XrdSsiSSRun : public XrdSsiService::Resource +class XrdOucEnv; + +class XrdSsiFileResource : public XrdSsiResource { public: -static -XrdSsiSSRun *Alloc(XrdSsiRequest *reqp, XrdSsiResource &rsrc, - unsigned short tmo=0); - -void ProvisionDone(XrdSsiSession *sessP); +void Init(const char *path, XrdOucEnv &envP, int atype); - XrdSsiSSRun(XrdSsiRequest *reqp, unsigned short tmo=0) - : XrdSsiService::Resource(0), - theReq(reqp), tOut(tmo) {} + XrdSsiFileResource() : XrdSsiResource(std::string("")) + {memset(&mySec, 0, sizeof(mySec));} - ~XrdSsiSSRun() {} + ~XrdSsiFileResource() {} private: -union {XrdSsiRequest *theReq; - XrdSsiSSRun *freeNext; - }; -unsigned short tOut; +XrdSsiEntity mySec; }; #endif diff --git a/src/XrdSsi/XrdSsiFileSess.cc b/src/XrdSsi/XrdSsiFileSess.cc index cdf5307d1b8..fc3e138190d 100644 --- a/src/XrdSsi/XrdSsiFileSess.cc +++ b/src/XrdSsi/XrdSsiFileSess.cc @@ -51,7 +51,7 @@ #include "XrdSsi/XrdSsiEntity.hh" #include "XrdSsi/XrdSsiFileSess.hh" -#include "XrdSsi/XrdSsiService.hh" +#include "XrdSsi/XrdSsiProvider.hh" #include "XrdSsi/XrdSsiSfs.hh" #include "XrdSsi/XrdSsiStream.hh" #include "XrdSsi/XrdSsiTrace.hh" @@ -59,47 +59,6 @@ #include "XrdSys/XrdSysError.hh" -/******************************************************************************/ -/* L o c a l C l a s s e s */ -/******************************************************************************/ - -namespace XrdSsi -{ -class FileResource : public XrdSsiService::Resource -{ -public: - -void ProvisionDone(XrdSsiSession *sessP) {mySess=sessP; mySem.Post();} - -void ProvisionWait() {mySem.Wait();} - -XrdSsiSession *Session() {return mySess;} - - FileResource(const char *path, const XrdSecEntity *entP, int atype) - : XrdSsiService::Resource(path), mySem(0), mySess(0) - {if (atype && entP) - {strncpy(mySec.prot, entP->prot, XrdSsiPROTOIDSIZE); - mySec.name = entP->name; - mySec.host = (atype <= 1 ? entP->host - : entP->addrInfo->Name(entP->host)); - mySec.role = entP->vorg; - mySec.role = entP->role; - mySec.grps = entP->grps; - mySec.endorsements = entP->endorsements; - mySec.creds = entP->creds; - mySec.credslen = entP->credslen; - mySec.tident = entP->tident; - } - } - ~FileResource() {} - -private: -XrdSysSemaphore mySem; -XrdSsiSession *mySess; -XrdSsiEntity mySec; -}; -} - /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ @@ -107,13 +66,17 @@ XrdSsiEntity mySec; namespace XrdSsi { extern XrdOucBuffPool *BuffPool; -extern XrdSsiService *Service; +extern XrdSsiProvider *Provider; extern XrdSysError Log; extern int respWT; }; using namespace XrdSsi; +/******************************************************************************/ +/* L o c a l C l a s s e s */ +/******************************************************************************/ + namespace { class nullCallBack : public XrdOucEICB @@ -218,7 +181,7 @@ bool XrdSsiFileSess::AttnInfo(XrdOucErrInfo &eInfo, const XrdSsiRespInfo *respP, // Fill out iovec to point to our header // - attnResp->ioV[0].iov_len = sizeof(XrdSsiRRInfoAttn) + respP->mdlen; +//?attnResp->ioV[0].iov_len = sizeof(XrdSsiRRInfoAttn) + respP->mdlen; attnResp->ioV[1].iov_base = mBuff+offsetof(struct AttnResp, aHdr); attnResp->ioV[1].iov_len = sizeof(XrdSsiRRInfoAttn); @@ -276,10 +239,6 @@ int XrdSsiFileSess::close(bool viaDel) // rTab.Reset(); -// Stop the session -// - if (sessP) {sessP->Unprovision(viaDel); sessP = 0;} - // Free any in-progress buffers // if (inProg) @@ -358,7 +317,6 @@ void XrdSsiFileSess::Init(XrdOucErrInfo &einfo, const char *user, bool forReuse) fsUser = 0; xioP = 0; oucBuff = 0; - sessP = 0; reqSize = 0; reqLeft = 0; isOpen = false; @@ -383,7 +341,7 @@ bool XrdSsiFileSess::NewRequest(int reqid, // Allocate a new request object // if ((reqid > XrdSsiRRInfo::maxID) - || !(reqP = XrdSsiFileReq::Alloc(eInfo, this, sessP, gigID, tident, reqid))) + || !(reqP = XrdSsiFileReq::Alloc(eInfo,&fileResource,this,gigID,tident,reqid))) return false; // Add it to the table @@ -415,9 +373,9 @@ int XrdSsiFileSess::open(const char *path, // In */ { static const char *epname = "open"; - FileResource fileResource(path, theEnv.secEnv(), authXQ); - const char *eText, *usr; - int eNum, n; + XrdSsiErrInfo errInfo; + const char *eText; + int eNum; // Verify that this object is not already associated with an open file // @@ -429,46 +387,43 @@ int XrdSsiFileSess::open(const char *path, // In // if (open_mode != SFS_O_RDWR) // return XrdSsiUtils::Emsg(epname, EPROTOTYPE, "open session", path, *eInfo); -// Handle the cgi information +// Setup the file resource object // - fileResource.rDesc.rUser = fsUser = ((usr = theEnv.Get("ssi.user")) - ? strdup(usr) : 0); - fileResource.rDesc.rInfo = theEnv.Env(n); + fileResource.Init(path, theEnv, authXQ); -// Obtain a session +// Notify the provider that we will be executing a request // - Service->Provision(&fileResource); - fileResource.ProvisionWait(); - if ((sessP = fileResource.Session())) - {if (!fsUser) gigID = strdup(path); + if (Provider->Prepare(errInfo, fileResource)) + {const char *usr = fileResource.rUser.c_str(); + if (!(*usr)) gigID = strdup(path); else {char gBuff[2048]; - snprintf(gBuff, sizeof(gBuff), "%s:%s", fsUser, path); + snprintf(gBuff, sizeof(gBuff), "%s:%s", usr, path); gigID = strdup(gBuff); } - DEBUG(gigID <<" provisioned."); + DEBUG(gigID <<" prepared."); isOpen = true; return SFS_OK; } // Get error information // - eText = fileResource.eInfo.Get(eNum); + eText = errInfo.Get(eNum).c_str(); if (!eNum) - {eNum = ENOMSG; eText = "Service returned invalid session response.";} + {eNum = ENOMSG; eText = "Provider returned invalid prepare response.";} // Decode the error // switch(eNum) {case EAGAIN: if (!eText || !(*eText)) break; - eNum = fileResource.eInfo.GetArg(); + eNum = errInfo.GetArg(); DEBUG(path <<" --> " <setErrInfo(eNum, eText); return SFS_REDIRECT; break; case EBUSY: - eNum = fileResource.eInfo.GetArg(); - if (!eText || !(*eText)) eText = "Service is busy."; + eNum = errInfo.GetArg(); + if (!eText || !(*eText)) eText = "Provider is busy."; DEBUG(path <<" dly " <setErrInfo(eNum, eText); @@ -484,7 +439,7 @@ int XrdSsiFileSess::open(const char *path, // In // Something is quite wrong here // - Log.Emsg(epname, "Service redirect returned no target host name!"); + Log.Emsg(epname, "Provider redirect returned no target host name!"); eInfo->setErrInfo(ENOMSG, "Server logic error"); return SFS_ERROR; } diff --git a/src/XrdSsi/XrdSsiFileSess.hh b/src/XrdSsi/XrdSsiFileSess.hh index 56728e045bb..1811ddd909b 100644 --- a/src/XrdSsi/XrdSsiFileSess.hh +++ b/src/XrdSsi/XrdSsiFileSess.hh @@ -35,13 +35,13 @@ #include "XrdSfs/XrdSfsInterface.hh" #include "XrdSsi/XrdSsiBVec.hh" #include "XrdSsi/XrdSsiFileReq.hh" +#include "XrdSsi/XrdSsiFileResource.hh" #include "XrdSsi/XrdSsiRRTable.hh" #include "XrdSys/XrdSysPthread.hh" class XrdOucEnv; class XrdSfsXioHandle; struct XrdSsiRespInfo; -class XrdSsiSession; class XrdSsiFileSess { @@ -72,7 +72,9 @@ static XrdSsiFileSess *Alloc(XrdOucErrInfo &einfo, const char *user); XrdSfsXferSize buffer_size); void Recycle(); - + +XrdSsiFileResource &Resource() {return fileResource;} + int SendData(XrdSfsDio *sfDio, XrdSfsFileOffset offset, XrdSfsXferSize size); @@ -113,6 +115,7 @@ static int freeAbs; static int maxRSZ; static int authXQ; +XrdSsiFileResource fileResource; char *tident; XrdOucErrInfo *eInfo; char *gigID; @@ -120,10 +123,7 @@ char *fsUser; XrdSysMutex myMutex; XrdSfsXio *xioP; XrdOucBuffer *oucBuff; -union { - XrdSsiSession *sessP; - XrdSsiFileSess *nextFree; - }; +XrdSsiFileSess *nextFree; int reqSize; int reqLeft; bool isOpen; diff --git a/src/XrdSsi/XrdSsiProvider.hh b/src/XrdSsi/XrdSsiProvider.hh index 6d1683dd03f..644a98efb17 100644 --- a/src/XrdSsi/XrdSsiProvider.hh +++ b/src/XrdSsi/XrdSsiProvider.hh @@ -34,7 +34,7 @@ //! for two purposes: //! 1) To ascertain the availability of a resource on a server node in an SSI //! cluster. -//! 2) To obtain a service object that can provision one or more resources. +//! 2) To obtain a service object that can process one or more requests. //! //! Client-side: A providor object is predefined in libXrdSsi.so and must be //! used by the client code to get service objects, as follows: @@ -74,39 +74,54 @@ //! oss.statlib -2 /libXrdSsi.so //----------------------------------------------------------------------------- +#include + +#include "XrdSsi/XrdSsiErrInfo.hh" +#include "XrdSsi/XrdSsiResource.hh" + class XrdSsiCluster; -class XrdSsiErrInfo; class XrdSsiLogger; class XrdSsiService; - class XrdSsiProvider { public: //----------------------------------------------------------------------------- -//! Obtain a client-side service object. +//! Obtain a service object (client-side or server-side). //! //! @param eInfo the object where error status is to be placed. -//! @param contact the point of first contact when provisioning the service. +//! @param contact the point of first contact when processing a request. //! The contact may be "host:port" where "host" is a DNS name, //! an IPV4 address (i.e. d.d.d.d), or an IPV6 address //! (i.e. [x:x:x:x:x:x]), and "port" is either a numeric port //! number or the service name assigned to the port number. -//! This pointer is nil if the call is being made server-side. +//! This is a null string if the call is being made server-side. //! Note that only one service object is obtained by a server. -//! @param oHold the maximum number of session objects that should be held -//! in reserve for future Provision() calls. +//! @param oHold the maximum number of request objects that should be held +//! in reserve for future calls. //! //! @return =0 A service object could not be created, eInfo has the reason. //! @return !0 Pointer to a service object. //----------------------------------------------------------------------------- virtual -XrdSsiService *GetService(XrdSsiErrInfo &eInfo, - const char *contact, - int oHold=256 - ) {return 0;} +XrdSsiService *GetService(XrdSsiErrInfo &eInfo, + const std::string &contact, + int oHold=256 + ) {eInfo.Set("Service not implemented!", ENOTSUP); + return 0; + } + +//----------------------------------------------------------------------------- +//! Obtain the version of the abstract class used by underlying implementation. +//! The version returned must match the version compiled in the loading library. +//! If it does not, initialization fails. +//----------------------------------------------------------------------------- + +static const int SsiVersion = 0x00010000; + + int GetVersion() {return SsiVersion;} //----------------------------------------------------------------------------- //! Initialize server-side processing. This method is invoked prior to any @@ -116,8 +131,8 @@ XrdSsiService *GetService(XrdSsiErrInfo &eInfo, //! @param clsP pointer to the cluster management object. This pointer is nil //! when a service object is being obtained by an unclustered //! system (i.e. a stand-alone server). -//! @param cfgFn pointer to the conifiguration file name. -//! @param parms pointer to the conifiguration parameters or nil if none. +//! @param cfgFn file path to the the conifiguration file. +//! @param parms conifiguration parameters, if any. //! @param argc The count of command line arguments (always >= 1). //! @param argv Pointer to a null terminated array of tokenized command line //! arguments. These arguments are taken from the command line @@ -131,21 +146,45 @@ XrdSsiService *GetService(XrdSsiErrInfo &eInfo, virtual bool Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, - const char *cfgFn, - const char *parms, + std::string cfgFn, + std::string parms, int argc, char **argv ) = 0; //----------------------------------------------------------------------------- -//! Obtain the version of the abstract class used by underlying implementation. -//! The version returned must match the version compiled in the loading library. -//! If it does not, initialization fails. +//! @brief Prepare for processing subsequent resource request. +//! +//! This method is meant to be used server-side to optimize subsequent request +//! processing, perform authorization, and allow a provider to stall or redirect +//! requests. It is optional and a default implementation is provided. +//! +//! @param eInfo The object where error information is to be placed. +//! @param rDesc Reference to the resource object that describes the +//! resource subsequent requests will use. +//! +//! @return true Continue normally, no issues detected. +//! false An exception occurred, the eInfo object has the reason. +//! +//! Special notes for server-side processing: +//! +//! 1) Two special errors are recognized that allow for a client retry: +//! +//! resP->eInfo.eNum = EAGAIN (client should retry elsewhere) +//! resP->eInfo.eMsg = the host name where the client is redirected +//! resP->eInfo.eArg = the port number to be used by the client +//! +//! resP->eInfo.eNum = EBUSY (client should wait and then retry). +//! resP->eInfo.eMsg = an optional reason for the wait. +//! resP->eInfo.eArg = the number of seconds the client should wait. //----------------------------------------------------------------------------- -static const int SsiVersion = 0x00010000; - - int GetVersion() {return SsiVersion;} +virtual bool Prepare(XrdSsiErrInfo &eInfo, const XrdSsiResource &rDesc) + {if (QueryResource(rDesc.rName.c_str()) != notPresent) + return true; + eInfo.Set("Resource not available.", ENOENT); + return false; + } //----------------------------------------------------------------------------- //! Obtain the status of a resource. @@ -153,13 +192,14 @@ static const int SsiVersion = 0x00010000; //! resource relative to a particular endpoint. //! Server-Side: When configured via oss.statlib directive, this is called //! server-side by the XrdSsiCluster object to see if the resource -//! can be provided by the providor via a service object. +//! can be provided by the providor via a service object. This +//! method is also used server-side to determine resource status. //! //! @param rName Pointer to the resource name. -//! @param contact the point of first contact that would be used to provision -//! the resource (see client-side GetService() for details). +//! @param contact the point of first contact that would be used to process +//! the request relative to the resource (see ProcessRequest()). //! A nil pointer indicates a query for availibity at the -//! local node (e.g. a cmsd query for resource availability). +//! local node (e.g. a query for local resource availability). //! //! @return One of the rStat enums, as follows: //! notPresent - resource not present on this node. diff --git a/src/XrdSsi/XrdSsiRRInfo.hh b/src/XrdSsi/XrdSsiRRInfo.hh index 51d2c1a363d..e8e3f574cec 100644 --- a/src/XrdSsi/XrdSsiRRInfo.hh +++ b/src/XrdSsi/XrdSsiRRInfo.hh @@ -87,13 +87,14 @@ int reqSize; struct XrdSsiRRInfoAttn { +static const int alrtResp = '!'; // In tag: response data is an alert static const int fullResp = ':'; // In tag: response data is present static const int pendResp = '*'; // In tag: response data is pending char tag; char flags; unsigned short pfxLen; // Length of prefix -unsigned int mdLen; // Length of metadata +unsigned int mdLen; // Length of alert or meta data int rsvd1; int rsvd2; }; diff --git a/src/XrdSsi/XrdSsiRequest.cc b/src/XrdSsi/XrdSsiRequest.cc index 62d8d8c9653..15f44e7e1ee 100644 --- a/src/XrdSsi/XrdSsiRequest.cc +++ b/src/XrdSsi/XrdSsiRequest.cc @@ -33,10 +33,9 @@ #include "XrdSsi/XrdSsiPacer.hh" #include "XrdSsi/XrdSsiRespInfo.hh" +#include "XrdSsi/XrdSsiResponder.hh" #include "XrdSsi/XrdSsiRequest.hh" -#include "XrdSsi/XrdSsiResource.hh" -#include "XrdSsi/XrdSsiService.hh" -#include "XrdSsi/XrdSsiSSRun.hh" +#include "XrdSsi/XrdSsiStream.hh" /******************************************************************************/ /* Private: C o p y D a t a */ @@ -49,24 +48,24 @@ bool XrdSsiRequest::CopyData(char *buff, int blen) // Make sure the buffer length is valid // if (blen <= 0) - {eInfo.Set("Buffer length invalid", EINVAL); + {errInfo.Set("Buffer length invalid", EINVAL); return false; } // Check if we have any data here // - reqMutex.Lock(); + rrMutex->Lock(); if (Resp.blen > 0) {if (Resp.blen > blen) last = false; else {blen = Resp.blen; last = true;} memcpy(buff, Resp.buff, blen); Resp.buff += blen; Resp.blen -= blen; } else {blen = 0; last = true;} - reqMutex.UnLock(); + rrMutex->UnLock(); // Invoke the callback // - ProcessResponseData(buff, blen, last); + ProcessResponseData(errInfo, buff, blen, last); return true; } @@ -76,76 +75,46 @@ bool XrdSsiRequest::CopyData(char *buff, int blen) bool XrdSsiRequest::Finished(bool cancel) { - XrdSsiMutexMon(reqMutex); + XrdSsiResponder *respP; -// If there is no session, return failure +// Obtain the responder // - if (!theSession) return false; - -// Tell the session we are finished -// - theSession->RequestFinished(this, Resp, cancel); + rrMutex->Lock(); + if ((respP = theRespond)) theRespond->reqP = 0; + theRespond = 0; + rrMutex->UnLock(); -// Clear response and error information +// If there is no responder, return failure // - Resp.Init(); - eInfo.Clr(); + if (!respP) return false; -// Clear pointers and return +// Tell any responder we are finished // - theRespond = 0; - theSession = 0; + respP->Finished(*this, Resp, cancel); return true; } - + /******************************************************************************/ -/* S S R u n */ +/* G e t R e s p o n s e D a t a */ /******************************************************************************/ -void XrdSsiRequest::SSRun(XrdSsiService &srvc, - XrdSsiResource &rsrc, - unsigned short tmo) +void XrdSsiRequest::GetResponseData(char *buff, int blen) { - XrdSsiSSRun *runP; + XrdSsiMutexMon mHelper(rrMutex); -// Make sure that atleats the resource name was specified +// If this is really a stream then just call the stream object to get the data. +// In the degenrate case, it's actually a data response, then we must copy it. // - if (!rsrc.rName || !(*rsrc.rName)) - {eInfo.Set("Resource name missing.", EINVAL); - Resp.eMsg = eInfo.Get(); - Resp.eNum = EINVAL; - Resp.rType = XrdSsiRespInfo::isError; - ProcessResponse(Resp, false); - return; - } - -// Now allocate memory to copy all the members + if (Resp.rType == XrdSsiRespInfo::isStream) + {if (Resp.strmP->SetBuff(errInfo, buff, blen)) return;} + else if (Resp.rType == XrdSsiRespInfo::isData) + {if (CopyData(buff, blen)) return;} + else errInfo.Set("Not a stream", ENODATA); + +// If we got here then an error occured during the setup, reflect the error +// via the callback (in the future we will schedule a new thread). // - runP = XrdSsiSSRun::Alloc(this, rsrc, tmo); - if (!runP) - {eInfo.Set(0, ENOMEM); - Resp.eMsg = eInfo.Get(); - Resp.eNum = ENOMEM; - Resp.rType = XrdSsiRespInfo::isError; - ProcessResponse(Resp, false); - return; - } - -// Now provision the resource and we are done here. The SSRun object takes over. -// - srvc.Provision(runP, tmo); -} - -/******************************************************************************/ - -void XrdSsiRequest::SSRun(XrdSsiService &srvc, - const char *rname, - const char *ruser, - unsigned short tmo) -{ - XrdSsiResource myRes(rname, ruser); - - SSRun(srvc, myRes, tmo); + ProcessResponseData(errInfo, buff, -1, true); } /******************************************************************************/ diff --git a/src/XrdSsi/XrdSsiRequest.hh b/src/XrdSsi/XrdSsiRequest.hh index 7f48d122d14..66e5e83dc65 100644 --- a/src/XrdSsi/XrdSsiRequest.hh +++ b/src/XrdSsi/XrdSsiRequest.hh @@ -29,63 +29,68 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ +#include #include #include #include "XrdSsi/XrdSsiAtomics.hh" #include "XrdSsi/XrdSsiErrInfo.hh" #include "XrdSsi/XrdSsiRespInfo.hh" -#include "XrdSsi/XrdSsiSession.hh" //----------------------------------------------------------------------------- //! The XrdSsiRequest class describes a client request and is used to effect a //! response to the request via a companion object described by XrdSsiResponder. //! Client-Side: Use this object to encapsulate your request and hand it off -//! to XrdSsiSession::ProcessRequest() either use GetResponseData() -//! or the actual response tructure to get the response data. +//! to XrdSsiService::Execute() either use GetResponseData() or +//! the actual response structure to get the response data once the +//! ProcessResponse() callback is invoked. //! -//! Server-side: XrdSsiSession::ProcessRequest() is called with this object. +//! Server-side: XrdSsiService::ProcessRequest() is called with this object. //! Use the XrdSsiResponder object to post a response. //! -//! In either case, once the the XrdSsiRequest::Finished() must be invoked -//! after the client-server exchange is complete in order to revert ownership -//! of this object to the object's creator. After which, the object me be -//! deleted or reused. +//! In either case, the client must invoke XrdSsiRequest::Finished() after the +//! client-server exchange is complete in order to revert ownership of this +//! object to the object's creator to allow it to be deleted or reused. //! //! This is an abstract class and several methods need to be implemented: //! +//! Alert() Optional, allows receiving of server alerts. //! GetRequest() Manndatory to supply the buffer holding the request //! along with its length. //! RelRequestBuffer() Optional, allows memory optimization. //! ProcessResponse() Initial response: Mandatary -//! ProcessResponseData() Data response: Mandatory ony if response data is +//! ProcessResponseData() Data response: Mandatory only if response data is //! asynchronously received. //----------------------------------------------------------------------------- class XrdSsiPacer; -class XrdSsiResource; class XrdSsiResponder; -class XrdSsiService; -class XrdSsiStream; class XrdSsiRequest { public: friend class XrdSsiResponder; -friend class XrdSsiSSRun; friend class XrdSsiTaskReal; //----------------------------------------------------------------------------- -//! The following object is used to relay error information from any method -//! dealing with the request object that returns a failure. Also, any error -//! response sent by a server will be recorded in the eInfo object as well. +//! @brief Send or receive a server generated alert. +//! +//! The Alert() method is used server-side to send one or more alerts before a +//! response is posted (alerts afterwards are ignored). To avoid race conditions, +//! server-side alerts should be sent via the Responder's Alert() method. +//! Clients must implement this method in order to receive alerts. +//! +//! @param aMsg Reference to the message object containing the alert message. +//! Non-positive alert lengths cause the alert call to be +//! ignored. You should call the message Recycle() method once +//! you have consumed the message to release its resources. //----------------------------------------------------------------------------- -XrdSsiErrInfo eInfo; +virtual void Alert(XrdSsiRespInfoMsg &aMsg) {aMsg.Recycle(false);} //----------------------------------------------------------------------------- //! Indicate that request processing has been finished. This method calls -//! XrdSsiSession::Complete() on the session object associated with Request(). +//! XrdSsiResponder::Finished() on the associated responder object. //! //! Note: This method locks the object's recursive mutex. //! @@ -93,16 +98,25 @@ XrdSsiErrInfo eInfo; //! True -> the request/response sequence aborted because of an //! error or the client cancelled the request. //! -//! @return true Complete accepted. Request object may be reclaimed. -//! @return false Complete cannot be accepted because this request object is -//! not bound to a session. This indicates a logic error. +//! @return true Finish accepted. Request object may be reclaimed. +//! @return false Finish cannot be accepted because this request object is +//! not bound to a responder. This indicates a logic error. //----------------------------------------------------------------------------- bool Finished(bool cancel=false); +//----------------------------------------------------------------------------- +//! Obtain the detached request time to live value. +//! +//! @return The detached time to live value in seconds. +//----------------------------------------------------------------------------- + +inline uint32_t GetDetachTTL() {return detTTL;} + //----------------------------------------------------------------------------- //! Obtain the metadata associated with a response. //! +//! //! Note: This method locks the object's recursive mutex. //! //! @param dlen holds the length of the metadata after the call. @@ -113,7 +127,7 @@ XrdSsiErrInfo eInfo; inline const char *GetMetadata(int &dlen) - {XrdSsiMutexMon(reqMutex); + {XrdSsiMutexMon(rrMutex); if ((dlen = Resp.mdlen)) return Resp.mdata; return 0; } @@ -131,62 +145,54 @@ const char *GetMetadata(int &dlen) virtual char *GetRequest(int &dlen) = 0; + //----------------------------------------------------------------------------- -//! Obtain the responder associated with this request. This member is set by the -//! responder and needs serialization. -//! -//! Note: This method locks the object's recursive mutex. +//! Get the request ID established at object creation time. //! -//! @return !0 - pointer to the responder object. -//! @retuen =0 - no alternate responder associated with this request. +//! @return Pointer to the request ID or nil if there is none. //----------------------------------------------------------------------------- + inline -XrdSsiResponder*GetResponder() {XrdSsiMutexMon(reqMutex); return theRespond;} +const char *GetRequestID() {return reqID;} //----------------------------------------------------------------------------- //! Asynchronously obtain response data. This is a helper method that allows a //! client to deal with a passive stream response. This method also handles //! data response, albeit ineffeciently by copying the data response. However, //! this allows for uniform response processing regardless of response type. -//! See the other from of GetResponseData() for a possible better approach. //! //! @param buff pointer to the buffer to receive the data. The buffer must -//! remain valid until the ProcessResponse() is called. +//! remain valid until ProcessResponseData() is called. //! @param blen the length of the buffer (i.e. maximum that can be returned). //! //! @return true A data return has been successfully scheduled. -//! @return false The stream could not be scheduled; eInfo holds the reason. +//! @return false The stream could not be scheduled; eRef holds the reason. //----------------------------------------------------------------------------- - bool GetResponseData(char *buff, int blen); + void GetResponseData(char *buff, int blen); //----------------------------------------------------------------------------- -//! Obtain the session associated with this request. This member is set by the -//! responder and needs seririalization. +//! Get timeout for initiating the request. //! -//! Note: This method locks the object's recursive mutex. -//! -//! @return !0 - pointer to the session object. -//! @retuen =0 - no session associated with this request. +//! @return The timeout value. //----------------------------------------------------------------------------- -inline -XrdSsiSession *GetSession() {XrdSsiMutexMon(reqMutex); return theSession;} + + uint16_t GetTimeOut() {return tOut;} //----------------------------------------------------------------------------- //! Notify request that a response is ready to be processed. This method must //! be supplied by the request object's implementation. //! -//! Note: This method is called with the object's recursive mutex locked. -//! +//! @param eInfo Error information. You can check if an error occured using +//! eInfo.hasError() or eInfo.isOK(). //! @param rInfo Raw response information. -//! @param isOK True: Normal response. -//! False: Error response, the eInfo object holds information. //! //! @return true Response processed. //! @return false Response could not be processed, the request is not active. //----------------------------------------------------------------------------- -virtual bool ProcessResponse(const XrdSsiRespInfo &rInfo, bool isOK=true)=0; +virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo, + const XrdSsiRespInfo &rInfo)=0; //----------------------------------------------------------------------------- //! Handle incomming async stream data or error. This method is called by a @@ -195,25 +201,28 @@ virtual bool ProcessResponse(const XrdSsiRespInfo &rInfo, bool isOK=true)=0; //! //! Note: This method is called with the object's recursive mutex locked. //! -//! @param buff Pointer to the buffer given to XrdSsiStream::SetBuff(). -//! @param blen The number of bytes in buff or an error indication if blen < 0. -//! @param last true This is the last stream segment, no more data remains. -//! @param false More data may remain in the stream. -//! @return One of the enum PRD_Xeq: -//! PRD_Normal - Processing completeted normally, continue. -//! PRD_Hold - Processing could not be done now, place request -//! in the global FIFO hold queue and resume when -//! RestartDataResponse() is called. -//! PRD_HoldLcl - Processing could not be done now, place request -//! in the request ID FIFO local queue and resume when -//! RestartDataResponse() is called with the ID that -//! passed to the this request object constructor. +//! @param eInfo Error information. You can check if an error occured using +//! eInfo.hasError() or eInfo.isOK(). +//! @param buff Pointer to the buffer given to XrdSsiStream::SetBuff(). +//! @param blen The number of bytes in buff or an error indication if blen < 0. +//! @param last true This is the last stream segment, no more data remains. +//! @param false More data may remain in the stream. +//! @return One of the enum PRD_Xeq: +//! PRD_Normal - Processing completeted normally, continue. +//! PRD_Hold - Processing could not be done now, place request +//! in the global FIFO hold queue and resume when +//! RestartDataResponse() is called. +//! PRD_HoldLcl - Processing could not be done now, place request +//! in the request ID FIFO local queue and resume +//! when RestartDataResponse() is called with the ID +//! that was passed to the this request object +//! constructor. //----------------------------------------------------------------------------- enum PRD_Xeq {PRD_Normal = 0, PRD_Hold = 1, PRD_HoldLcl = 2}; -virtual PRD_Xeq ProcessResponseData(char *buff, int blen, bool last) - {return PRD_Normal;} +virtual PRD_Xeq ProcessResponseData(const XrdSsiErrInfo &eInfo, char *buff, + int blen, bool last) {return PRD_Normal;} //----------------------------------------------------------------------------- //! Restart a ProcessResponseData() call for a request that was previosly held @@ -261,49 +270,31 @@ struct RDR_Info{int rCount; //!< Number restarted static RDR_Info RestartDataResponse(RDR_How rhow, const char *reqid=0); //----------------------------------------------------------------------------- -//! Run this request using a single session (variant 1). +//! @brief Set the detached request time to live value. //! -//! @param srvc Reference to the service object to be used. -//! @param rsrc Reference to the resource description for the request. -//! Members in this object are copied so the resource object may -//! be deleted upon return. -//! @param tmo the maximum number seconds the operation may last before -//! it is considered to fail. A zero value uses the default. +//! By deafult, rqeuests are executed in the foreground (i.e. during its +//! execution, if the TCP connection drops, the request is automatically +//! cancelled. When a non-zero time to live is set, the request is executed in +//! the background (i.e. detached) and no persistent TCP connection is required. +//! You must use the XrdSsiService::Attach() method to foreground such a +//! request within the number of seconds specified for dttl or the request is +//! automatically cancelled. The value must be set before passing the request +//! to XrdSsiService::ProcessRequest(). Once the request is started, a request +//! handle is returned which can be passed to XrdSsiService::Attach(). //! -//! @return The results of this call are reflected to the request via -//! it's ProcessResponse() callback method. +//! @param detttl The detach time to live value. //----------------------------------------------------------------------------- -virtual void SSRun(XrdSsiService &srvc, - XrdSsiResource &rsrc, - unsigned short tmo=0); +inline void SetDetachTTL(uint32_t dttl) {detTTL = dttl;} //----------------------------------------------------------------------------- -//! Run this request using a single session (variant 2). Use the resource name -//! and optional resource user with all other resource values defaulted. +//! Set timeout for initiating the request. If a non-default value is desired, +//! it must be set prior to calling XrdSsiService::ProcessRequest(). //! -//! @param srvc Reference to the service object to be used. -//! @param rname Pointer to the resource name. It is copied. -//! @param ruser Pointer to the resource user. It is copied. -//! @param tmo the maximum number seconds the operation may last before -//! it is considered to fail. A zero value uses the default. -//! -//! @return The results of this call are reflected to the request via -//! it's ProcessResponse() callback method. -//----------------------------------------------------------------------------- - -virtual void SSRun(XrdSsiService &srvc, - const char *rname, - const char *ruser=0, - unsigned short tmo=0); - -//----------------------------------------------------------------------------- -//! A handy pointer to allow for chaining these objects. It is initialized to 0. -//! It should only be touched by any object that is the current owner of this -//! object (e.g. the XrdSsiSession object after its ProcessRequest() is called). +//! @param tmo The timeout value. //----------------------------------------------------------------------------- -XrdSsiRequest *nextRequest; + void SetTimeOut(uint16_t tmo) {tOut = tmo;} //----------------------------------------------------------------------------- //! Constructor @@ -312,26 +303,33 @@ XrdSsiRequest *nextRequest; //! See ProcessResponseData() and RestartDataReponse(). If reqid //! is nil then held responses are placed in the global queue. //! The pointer must be valid for the life of this object. +//! +//! @param tmo The request initiation timeout value 0 equals default). //----------------------------------------------------------------------------- - XrdSsiRequest(const char *reqid=0) - : nextRequest(0), - reqMutex(XrdSsiMutex::Recursive), reqID(reqid), - theSession(0), theRespond(0), thePacer(0) {} + XrdSsiRequest(const char *reqid=0, uint16_t tmo=0) + : rrMutex(0), reqID(reqid), + nextRequest(0), theRespond(0), thePacer(0), + detTTL(0), tOut(0) {} + +// The following are for internal use only! +// +void SetMutex(XrdSsiMutex *mP) {rrMutex = mP;} protected: + //----------------------------------------------------------------------------- -//! Notify the underlying object that the request was bound to a session. This -//! method is meant for server-side internal use only. +//! Notify the underlying object that the request was bound to a responder. +//! This method is meant for server-side internal use only. //----------------------------------------------------------------------------- -virtual void BindDone(XrdSsiSession *sessP) {} +virtual void BindDone() {} //----------------------------------------------------------------------------- //! Release the request buffer. Use this method to optimize storage use; this //! is especially relevant for long-running requests. If the request buffer //! has been consumed and is no longer needed, early return of the buffer will -//! minimize memory usage. This method is only invoked via XrdSsiResponder. +//! minimize memory usage. This method is also invoked via XrdSsiResponder. //! //! //! Note: This method is called with the object's recursive mutex locked when @@ -342,10 +340,10 @@ virtual void BindDone(XrdSsiSession *sessP) {} virtual void RelRequestBuffer() {} //----------------------------------------------------------------------------- -//! Destructor. This object can only be deleted by the object creator. When the -//! object is passed and accepted by XrdSsiSession::ProcessRequest() it may -//! only be deleted after Finished() is called to allow the session object to -//! reclaim any resources granted to the request object. +//! Destructor. This object can only be deleted by the object creator. Once the +//! object is passed to XrdSsiService::ProcessRequest() it may only be deleted +//! after Finished() is called to allow the service to reclaim any resources +//! allocated for the request object. //----------------------------------------------------------------------------- virtual ~XrdSsiRequest() {} @@ -364,31 +362,18 @@ const XrdSsiRespInfo *RespP() {return &Resp;} //! It can also be used to serialize access to the underlying object. //----------------------------------------------------------------------------- -XrdSsiMutex reqMutex; +XrdSsiMutex *rrMutex; const char *reqID; private: bool CopyData(char *buff, int blen); -XrdSsiSession *theSession; // Set via XrdSsiResponder::BindRequest() +XrdSsiRequest *nextRequest; XrdSsiResponder *theRespond; // Set via XrdSsiResponder::BindRequest() XrdSsiRespInfo Resp; // Set via XrdSsiResponder::SetResponse() +XrdSsiErrInfo errInfo; XrdSsiPacer *thePacer; +uint32_t detTTL; +uint16_t tOut; }; - -//----------------------------------------------------------------------------- -//! We define the GetResponseData() helper method here as we need it to be -//! available in all compilation units and it depends on XrdSsiStream. -//----------------------------------------------------------------------------- - -#include "XrdSsi/XrdSsiStream.hh" - -inline bool XrdSsiRequest::GetResponseData(char *buff, int blen) - {XrdSsiMutexMon(reqMutex); - if (Resp.rType == XrdSsiRespInfo::isStream) - return Resp.strmP->SetBuff(this, buff, blen); - if (Resp.rType == XrdSsiRespInfo::isData) - return CopyData(buff, blen); - eInfo.Set(0, ENODATA); return false; - } #endif diff --git a/src/XrdSsi/XrdSsiResource.hh b/src/XrdSsi/XrdSsiResource.hh index 16033952616..23749560524 100644 --- a/src/XrdSsi/XrdSsiResource.hh +++ b/src/XrdSsi/XrdSsiResource.hh @@ -28,10 +28,13 @@ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ + +#include +#include //----------------------------------------------------------------------------- //! The XrdSsiResource object is used by the Scalable Service Interface to -//! describe a resource that may be provisioned by XrdSsiService. +//! describe the resource that a request needs in order to execute. //----------------------------------------------------------------------------- class XrdSsiEntity; @@ -40,10 +43,10 @@ class XrdSsiResource { public: -const char *rName; //!< -> Name of the resource to be provisioned -const char *rUser; //!< -> Name of the resource user (nil if anonymous) -const char *rInfo; //!< -> Additional information in CGI format -const char *hAvoid; //!< -> Comma separated list of hosts to avoid +std::string rName; //!< -> Name of the resource to be used +std::string rUser; //!< -> Name of the resource user (nil if anonymous) +std::string rInfo; //!< -> Additional information in CGI format +std::string hAvoid; //!< -> Comma separated list of hosts to avoid XrdSsiEntity *client; //!< -> Pointer to client identification (server-side) enum Affinity {Default, //!< Use configured affinity @@ -55,43 +58,48 @@ enum Affinity {Default, //!< Use configured affinity Affinity affinity;//!< Resource affinity static const -unsigned short autoUnP = 0x0001; //!< Auto unprovision on Finish() +uint16_t reusable= 0x0001; //!< Make this resource is reusable -unsigned short rOpts; //!< Resource options (see above) -unsigned short rsvd; +uint16_t rOpts; //!< Resource options (see above) +uint16_t reUse; //!< Number of minutes resource may be reused //----------------------------------------------------------------------------- //! Constructor //! -//! @param rname points to the name of the resource. If using directory +//! @param rname the name of the resource. If using directory //! notation (i.e. slash separated names); duplicate slashes //! and dot-shashes are compressed out. //! //! @param havoid if not null then points to a comma separated list of -//! hostnames to avoid when provisioning the resource. This +//! hostnames to avoid when finding the resource. This //! argument is only meaningful client-side. //! -//! @param ruser points to the name of the resource user. If nil the user -//! is anonymous (unnamed). By default, all resources share +//! @param ruser the name of the resource user. If nil the user is +//! anonymous (unnamed). By default, all resources share //! the TCP connection to any endpoint. Different users have -//! separate connections only if so requested during the -//! provision call (see XrdSsiService::Provision()). +//! separate connections only if so requested vis the newConn +//! option (see options above). //! -//! @param rinfo points to additional information to be passed to the -//! endpoint that provides the resource. The string should be -//! in cgi format (e.g. var=val&var2-val2&....). +//! @param rinfo additional information to be passed to the endpoint that +//! that provides the resource. The string should be in cgi +//! format (e.g. var=val&var2=val2&....). //! //! @param raff resource affinity (see Affinity enum). +//! +//! @param ropts resource handling options (see enum). +//! +//! @param reUse number of times the resource may be reused. //----------------------------------------------------------------------------- - XrdSsiResource(const char *rname, - const char *havoid=0, - const char *ruser=0, - const char *rinfo=0, + XrdSsiResource(std::string rname, + std::string havoid="", + std::string ruser="", + std::string rinfo="", + uint16_t ropts=0, Affinity raff=Default ) : rName(rname), rUser(ruser), rInfo(rinfo), - hAvoid(havoid), client(0), affinity(Default), - rOpts(0), rsvd(0) {} + hAvoid(havoid), client(0), affinity(raff), + rOpts(ropts), reUse(0) {} //----------------------------------------------------------------------------- //! Destructor diff --git a/src/XrdSsi/XrdSsiRespInfo.hh b/src/XrdSsi/XrdSsiRespInfo.hh index de1ee8e200c..13845930c47 100644 --- a/src/XrdSsi/XrdSsiRespInfo.hh +++ b/src/XrdSsi/XrdSsiRespInfo.hh @@ -69,4 +69,60 @@ struct XrdSsiRespInfo XrdSsiRespInfo() {Init();} ~XrdSsiRespInfo() {} }; + +/******************************************************************************/ +/* X r d S s i R e s p M s g */ +/******************************************************************************/ + +//----------------------------------------------------------------------------- +//! The RespInfoMsg class describes an async response message sent to the +//! XrdSsiRequest::Alert() method. It encapsulates the message sent and is +//! responsible for recovering any resources used by the message via Recycle(). +//----------------------------------------------------------------------------- + +class XrdSsiRespInfoMsg +{ +public: + +//----------------------------------------------------------------------------- +//! Obtain the message associated with the message object. +//! +//! @param mlen holds the length of the message after the call. +//! +//! @return =0 No message available, dlen has been set to zero. +//! @return !0 Pointer to the buffer holding the message, dlen has the length +//----------------------------------------------------------------------------- + +inline char *GetMsg(int &mlen) {mlen = msgLen; return msgBuf;} + +//----------------------------------------------------------------------------- +//! Release resources used by the message. This method must be called after the +//! message is processed by the XrdSsiRequest::Alert() method. +//! +//! Qparam sent When true, the message was sent. Otherwise, it was not sent. +//----------------------------------------------------------------------------- + +virtual void Recycle(bool sent=true) = 0; + +//----------------------------------------------------------------------------- +//! Contructor +//! +//! @param msgP Pointer to the message buffer. +//! @param mlen length of the message. +//----------------------------------------------------------------------------- + + XrdSsiRespInfoMsg(char *msgP, int mlen) + : msgBuf(msgP), msgLen(mlen) {} + +protected: + +//----------------------------------------------------------------------------- +//! Destructor. This object may not be deleted. Use Recycle() instead. +//----------------------------------------------------------------------------- + +virtual ~XrdSsiRespInfoMsg() {} + +char *msgBuf; +int msgLen; +}; #endif diff --git a/src/XrdSsi/XrdSsiResponder.hh b/src/XrdSsi/XrdSsiResponder.hh index 3292ac17183..53de6476766 100644 --- a/src/XrdSsi/XrdSsiResponder.hh +++ b/src/XrdSsi/XrdSsiResponder.hh @@ -35,71 +35,45 @@ #include "XrdSsi/XrdSsiRequest.hh" //----------------------------------------------------------------------------- -//! The XrdSsiResponder.hh class provides the session object and its agents a way to +//! The XrdSsiResponder.hh class provides a request processing object a way to //! respond to a request. It is a companion (friend) class of XrdSsiRequest. //! This class is only practically meaningful server-side. //! //! Any class that needs to post a response, release a request buffer or post //! stream data to the request object should inherit this class and use its -//! methods to get access to the request object. Typically, the XrdSsiSession -//! implementation class and its agent classes (i.e. classes that do work on -//! its behalf) inherit this class. +//! methods to get access to the request object. Typically, a task class that +//! is created by XrdSsiService::Execute() to handle te request would inherit +//! this class so it can respond. The object that wantsto post a response must +//! first call BindRequest() to establish the request-responder association. //! //! When the XrdSsiResponder::SetResponse() method is called to post a response //! the request object's ProcessResponse() method is called. Ownership of the //! request object does not revert back to the object's creator until the -//! XrdSsiRequest::Finished() method returns. This allows the session to +//! XrdSsiRequest::Finished() method returns. That method first calls +//! XrdSsiResponder::Finished() to break the request-responder association and //! reclaim any response data buffer or stream resource before it gives up -//! control of the request object. -//! -//! This is a real class that interposes itself between the abstract request -//! object and the real (i.e. derived class) session object or its agent. +//! control of the request object. This means you must provide an implementation +//! To the Finished() method defined here. //----------------------------------------------------------------------------- -#define SSI_VAL_RESPONSE(rX) XrdSsiRequest *rX = Atomic_GET(reqP);\ - if (!rX) return notPosted; \ - Atomic_SET(reqP, 0); \ - rX->reqMutex.Lock(); \ - if (rX->theRespond != this) \ - {rX->reqMutex.UnLock(); return notActive;} +#define SSI_VAL_RESPONSE(rX) rrMutex->Lock();\ + XrdSsiRequest *rX = reqP;\ + if (!rX)\ + {rrMutex->UnLock(); return notActive;}\ + reqP = 0;\ + if (rX->theRespond != this)\ + {rrMutex->UnLock(); return notActive;} -#define SSI_XEQ_RESPONSE(rX,oK) rX->reqMutex.UnLock(); \ - return (rP->ProcessResponse(rX->Resp, oK)\ - ? wasPosted : notActive) +#define SSI_XEQ_RESPONSE(rX) rrMutex->UnLock();\ + return (rX->ProcessResponse(rX->errInfo,rX->Resp)\ + ? wasPosted : notActive) -class XrdSsiSession; +class XrdSsiStream; class XrdSsiResponder { public: - -//----------------------------------------------------------------------------- -//! Obtain the object that inherited this responder (Version 1). Use this -//! version if the object is identified by an int type value. This member is -//! set at construction time and does not need serialization. -//! -//! @param oType Place to put object's type established at construction time. -//! @param oInfo Place to put Object's info established at construction time. -//! -//! @return The pointer to the object expressed as a void pointer along with -//! oType set to the object's type and oInfo set to any information. -//----------------------------------------------------------------------------- - -inline void *GetObject(int &oType, int &oInfo) - {oType=objIdent[0]; oInfo=objIdent[1]; return objVal;} - -//----------------------------------------------------------------------------- -//! Obtain the object that inherited this responder (Version 2). Use this -//! version if the object is identified by void * handle (e.g. constructor). -//! This member is set at construction time and does not need serialization. -//! -//! @param oHndl Place to put Object's handle established at construction time. -//! -//! @return The pointer to object expressed as a void pointer along with -//! oHndl set the the object's handle. -//----------------------------------------------------------------------------- - -inline void *GetObject(void *&oHndl) {oHndl = objHandle; return objVal;} +friend class XrdSsiRequest; //----------------------------------------------------------------------------- //! The maximum amount of metedata+data (i.e. the sum of two blen arguments in @@ -113,34 +87,70 @@ static const int MaxDirectXfr = 2097152; //< Max (metadata+data) direct xfr protected: //----------------------------------------------------------------------------- -//! Take ownership of a request object by binding a request object, to a session -//! object and a responder object if it differs from the session object. -//! This method should only be used by the session object or its agent. +//! Send an alert message to the request. This is a convenience method that +//! avoids race condistions with Finished() so it is safe to use in all cases. +//! This is a server-side call. The service is responsible for creating a +//! RespInfoMsg object containing the message and supplying a Recycle() method. //! -//! @param rqstP the pointer to the request object. -//! @param sessP the pointer to the session object. -//! @param respP the pointer to the responder object (optional). +//! @param aMsg reference to the message to be sent. //----------------------------------------------------------------------------- -inline void BindRequest(XrdSsiRequest *rqstP, - XrdSsiSession *sessP, +inline void Alert(XrdSsiRespInfoMsg &aMsg) + {XrdSsiMutexMon(rrMutex); + if (reqP) reqP->Alert(aMsg); + else aMsg.Recycle(false); + } + +//----------------------------------------------------------------------------- +//! Take ownership of a request object by binding the request object to a +//! responder object. This method must be called by the responder before +//! posting any responses. +//! +//! @param rqstP reference to the request object. +//! @param respP pointer to the responder object if it differs from this +//! object (i.e. setting on behalf of someone else). +//----------------------------------------------------------------------------- + +inline void BindRequest(XrdSsiRequest &rqstR, XrdSsiResponder *respP=0) - {XrdSsiMutexMon(rqstP->reqMutex); - rqstP->theSession = sessP; - rqstP->theRespond =(respP ? respP : this); - if (respP) {Atomic_SET(respP->reqP, rqstP);} - else {Atomic_SET(reqP, rqstP);} - rqstP->Resp.Init(); - rqstP->BindDone(sessP); + {XrdSsiMutexMon(rqstR.rrMutex); + if (!respP) respP = this; + rqstR.theRespond = respP; + respP->reqP = &rqstR; + respP->rrMutex = rqstR.rrMutex; + rqstR.Resp.Init(); + rqstR.errInfo.Clr(); + rqstR.BindDone(); } +//----------------------------------------------------------------------------- +//! Notify the responder that a request either completed or was canceled. This +//! allows the responder to release any resources given to the request object +//! (e.g. data response buffer or a stream). Upon return the object is owned by +//! the request object's creator who is responsible for releaasing or recycling +//! the object. This method is automatically invoked by XrdSsiRequest::Finish(). +//! +//! @param rqstP reference to the object describing the request. +//! @param rInfo reference to the object describing the response. +//! @param cancel False -> the request/response interaction completed. +//! True -> the request/response interaction aborted because +//! of an error or the client requested that the +//! request be canceled. +//----------------------------------------------------------------------------- + +virtual void Finished( XrdSsiRequest &rqstR, + const XrdSsiRespInfo &rInfo, + bool cancel=false) = 0; + //----------------------------------------------------------------------------- //! Release the request buffer of the request bound to this object. This is //! tricky member that requires atomics to correctly synchronize request ptr. +//! This is a convenience function as the same method may be called on the +//! request object itself. //----------------------------------------------------------------------------- -inline void ReleaseRequestBuffer() {XrdSsiRequest *rP = Atomic_GET(reqP); - if (rP) rP->RelRequestBuffer(); +inline void ReleaseRequestBuffer() {XrdSsiMutexMon(rrMutex); + if (reqP) reqP->RelRequestBuffer(); } //----------------------------------------------------------------------------- @@ -159,22 +169,20 @@ enum Status {wasPosted=0, //!< Success: The response was successfully posted //! Set a pointer to metadata to be sent out-of-band ahead of the response. //! //! @param buff pointer to a buffer holding the metadata. The buffer must -//! remain valid until XrdSsiSession::RequestFinished() is called. +//! remain valid until XrdSsiResponder::Finished() is called. //! @param blen the length of the metadata in buff that is to be sent. It must //! in the range 0 <= blen <= MaxMetaDataSZ. //! //! @return See Status enum for possible values. //----------------------------------------------------------------------------- -static const int MaxMetaDataSZ = 2097152; //< 2MB metadata limit +static const int MaxMetaDataSZ = 2097152; //!< 2MB metadata limit inline Status SetMetadata(const char *buff, int blen) - {XrdSsiRequest *rP = Atomic_GET(reqP); - if (!rP || blen < 0 || blen > MaxMetaDataSZ) + {XrdSsiMutexMon(rrMutex); + if (!reqP || blen < 0 || blen > MaxMetaDataSZ) return notPosted; - rP->reqMutex.Lock(); - rP->Resp.mdata = buff; rP->Resp.mdlen = blen; - rP->reqMutex.UnLock(); + reqP->Resp.mdata = buff; reqP->Resp.mdlen = blen; return wasPosted; } @@ -190,10 +198,10 @@ inline Status SetMetadata(const char *buff, int blen) inline Status SetErrResponse(const char *eMsg, int eNum) {SSI_VAL_RESPONSE(rP); - rP->eInfo.Set(eMsg, eNum); - rP->Resp.eMsg = rP->eInfo.Get(rP->Resp.eNum); + rP->errInfo.Set(eMsg, eNum); + rP->Resp.eMsg = rP->errInfo.Get(rP->Resp.eNum).c_str(); rP->Resp.rType = XrdSsiRespInfo::isError; - SSI_XEQ_RESPONSE(rP,false); + SSI_XEQ_RESPONSE(rP); } //----------------------------------------------------------------------------- @@ -208,7 +216,7 @@ inline Status SetNilResponse() {return SetResponse((const char *)0,0);} //! Set a memory buffer containing data as the request response. //! //! @param buff pointer to a buffer holding the response. The buffer must -//! remain valid until XrdSsiSession::RequestFinished() is called. +//! remain valid until XrdSsiResponder::Finished() is called. //! @param blen the length of the response in buff that is to be sent. //! //! @return See Status enum for possible values. @@ -218,7 +226,7 @@ inline Status SetResponse(const char *buff, int blen) {SSI_VAL_RESPONSE(rP); rP->Resp.buff = buff; rP->Resp.blen = blen; rP->Resp.rType = XrdSsiRespInfo::isData; - SSI_XEQ_RESPONSE(rP,true); + SSI_XEQ_RESPONSE(rP); } //----------------------------------------------------------------------------- @@ -235,7 +243,7 @@ inline Status SetResponse(long long fsize, int fdnum) rP->Resp.fdnum = fdnum; rP->Resp.fsize = fsize; rP->Resp.rType = XrdSsiRespInfo::isFile; - SSI_XEQ_RESPONSE(rP,true); + SSI_XEQ_RESPONSE(rP); } //----------------------------------------------------------------------------- @@ -252,48 +260,19 @@ inline Status SetResponse(XrdSsiStream *strmP) rP->Resp.eNum = 0; rP->Resp.strmP = strmP; rP->Resp.rType = XrdSsiRespInfo::isStream; - SSI_XEQ_RESPONSE(rP,true); + SSI_XEQ_RESPONSE(rP); } -//----------------------------------------------------------------------------- -//! Get the request bound to this object. -//! -//! @return A pointer to the request object. If it is nil then no request -//! is currently bound to this object. -//----------------------------------------------------------------------------- -inline -XrdSsiRequest *TheRequest() {return Atomic_GET(reqP);} - -//----------------------------------------------------------------------------- -//! The constructor is protected. This class is meant to be inherited by an -//! object (e.g. XrdSsiSession) that will actually post responses. This version -//! should be used if the object that inherited the responder is identified by -//! an integer type. -//! -//! @param objP Pointer to the underlying object (i.e. this pointer). -//! @param oType The value that identifies the object's type. -//! @param oInfo Optional additional information associated with the object. -//----------------------------------------------------------------------------- - - XrdSsiResponder(void *objP, int oType, int oInfo=0) - : reqP(0), objVal(objP) - {objIdent[0] = oType; objIdent[1] = oInfo;} //----------------------------------------------------------------------------- -//! The constructor is protected. This class is meant to be inherited by an -//! object (e.g. XrdSsiSession) that will actually post responses. This version -//! should be used if the object that inherited the responder is identified by -//! a void * handle (e.g. the underlying constructor). -//! -//! @param objP Pointer to the underlying object (i.e. this pointer). -//! @param objH The handle value that identifies the object's type. +//! This class is meant to be inherited by an object that will actually posts +//! responses. //----------------------------------------------------------------------------- - XrdSsiResponder(void *objP, void *objH) - : reqP(0), objVal(objP), objHandle(objH) {} + XrdSsiResponder() : reqP(0) {} //----------------------------------------------------------------------------- -//! Destructor is protected. You cannot use delete on a respond object, as it +//! Destructor is protected. You cannot use delete on a responder object, as it //! is meant to be inherited by a class and not separately instantiated. //----------------------------------------------------------------------------- @@ -302,10 +281,9 @@ protected: virtual ~XrdSsiResponder() {} private: -Atomic(XrdSsiRequest *) reqP; // Can be set or retrieved w/o a mutex -void *objVal; -union {int objIdent[2]; - void *objHandle; - }; +inline void Unbind() {rrMutex->Lock(); reqP = 0; rrMutex->UnLock();} + +XrdSsiMutex *rrMutex; +XrdSsiRequest *reqP; }; #endif diff --git a/src/XrdSsi/XrdSsiSSRun.cc b/src/XrdSsi/XrdSsiSSRun.cc deleted file mode 100644 index d7169b10735..00000000000 --- a/src/XrdSsi/XrdSsiSSRun.cc +++ /dev/null @@ -1,144 +0,0 @@ -/******************************************************************************/ -/* */ -/* X r d S s i S S R u n . c c */ -/* */ -/* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */ -/* Produced by Andrew Hanushevsky for Stanford University under contract */ -/* DE-AC02-76-SFO0515 with the Department of Energy */ -/* */ -/* This file is part of the XRootD software suite. */ -/* */ -/* XRootD is free software: you can redistribute it and/or modify it under */ -/* the terms of the GNU Lesser General Public License as published by the */ -/* Free Software Foundation, either version 3 of the License, or (at your */ -/* option) any later version. */ -/* */ -/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ -/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ -/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ -/* License for more details. */ -/* */ -/* You should have received a copy of the GNU Lesser General Public License */ -/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ -/* COPYING (GPL license). If not, see . */ -/* */ -/* The copyright holder's institutional names and contributor's names may not */ -/* be used to endorse or promote products derived from this software without */ -/* specific prior written permission of the institution or contributor. */ -/******************************************************************************/ - -#include "XrdSsi/XrdSsiAtomics.hh" -#include "XrdSsi/XrdSsiRequest.hh" -#include "XrdSsi/XrdSsiSSRun.hh" - -/******************************************************************************/ -/* L o c a l S t a t i c s */ -/******************************************************************************/ - -namespace -{ -XrdSsiMutex ssrMutex; -XrdSsiSSRun *freeSSR = 0; -int freeNum = 0; - -static const int maxFree = 512; -} - -/******************************************************************************/ -/* A l l o c */ -/******************************************************************************/ - -XrdSsiSSRun *XrdSsiSSRun::Alloc(XrdSsiRequest *reqp, - XrdSsiResource &rsrc, - unsigned short tmo) -{ - XrdSsiSSRun *ssrP; - char *datP; - int totLen = 0, rNameLen = 0, rUserLen = 0, rInfoLen = 0, hAvoidLen = 0; - -// The first step is to get the lengths of all the resource data members -// - {rNameLen = strlen(rsrc.rName) +1; totLen = rNameLen;} - if (rsrc.rUser) {rUserLen = strlen(rsrc.rUser) +1; totLen += rUserLen;} - if (rsrc.rInfo) {rInfoLen = strlen(rsrc.rInfo) +1; totLen += rInfoLen;} - if (rsrc.hAvoid) {hAvoidLen = strlen(rsrc.hAvoid)+1; totLen += hAvoidLen;} - -// Now allocate memory to copy all the members -// - datP = (char *)malloc(totLen); - if (!datP) return 0; - -// Now allocate a new SSRun object -// - ssrMutex.Lock(); - if ((ssrP = freeSSR)) - {freeSSR = ssrP->freeNext; - freeNum--; - ssrMutex.UnLock(); - } else ssrP = new XrdSsiSSRun(reqp, tmo); - -// Copy all the members (we are assured to have an rName by the caller). -// - {ssrP->rDesc.rName = datP; - strcpy(datP, rsrc.rName); - datP += rNameLen; - } - if (rUserLen) - {ssrP->rDesc.rUser = datP; - strcpy(datP, rsrc.rUser); - datP += rUserLen; - } - if (rInfoLen) - {ssrP->rDesc.rInfo = datP; - strcpy(datP, rsrc.rInfo); - datP += rInfoLen; - } - if (hAvoidLen) - {ssrP->rDesc.hAvoid = datP; - strcpy(datP, rsrc.hAvoid); - } - ssrP->rDesc.affinity = rsrc.affinity; - -// Indicate wwe want an automatic unprovision upon finish -// - ssrP->rDesc.rOpts |= XrdSsiResource::autoUnP; - -// Return the object -// - return ssrP; -} - -/******************************************************************************/ -/* P r o v i s i o n D o n e */ -/******************************************************************************/ - -void XrdSsiSSRun::ProvisionDone(XrdSsiSession *sessP) -{ - -// If provisioning was successful and if so, run it. -// - if (sessP) sessP->ProcessRequest(theReq, tOut); - else {const char *eText; int eNum; - eText = eInfo.Get(eNum); - theReq->eInfo.Set(eText, eNum); - theReq->Resp.eMsg = theReq->eInfo.Get(theReq->Resp.eNum); - theReq->Resp.rType = XrdSsiRespInfo::isError; - theReq->ProcessResponse(theReq->Resp, false); - } - -// We are done, recycle ourselves prior to returning. -// - if (rDesc.rName) free((void *)rDesc.rName); - memset(&rDesc, 0, sizeof(rDesc)); - - ssrMutex.Lock(); - if (freeNum < maxFree) - {freeNext = freeSSR; - freeSSR = this; - freeNum++; - ssrMutex.UnLock(); - } else { - ssrMutex.UnLock(); - delete this; - } -} diff --git a/src/XrdSsi/XrdSsiScale.hh b/src/XrdSsi/XrdSsiScale.hh new file mode 100644 index 00000000000..2e8293fbbbe --- /dev/null +++ b/src/XrdSsi/XrdSsiScale.hh @@ -0,0 +1,77 @@ +#ifndef __XRDSSISCALE_HH__ +#define __XRDSSISCALE_HH__ +/******************************************************************************/ +/* */ +/* X r d S s i S c a l e . h h */ +/* */ +/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */ +/* Produced by Andrew Hanushevsky for Stanford University under contract */ +/* DE-AC02-76-SFO0515 with the Department of Energy */ +/* */ +/* This file is part of the XRootD software suite. */ +/* */ +/* XRootD is free software: you can redistribute it and/or modify it under */ +/* the terms of the GNU Lesser General Public License as published by the */ +/* Free Software Foundation, either version 3 of the License, or (at your */ +/* option) any later version. */ +/* */ +/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ +/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ +/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ +/* License for more details. */ +/* */ +/* You should have received a copy of the GNU Lesser General Public License */ +/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ +/* COPYING (GPL license). If not, see . */ +/* */ +/* The copyright holder's institutional names and contributor's names may not */ +/* be used to endorse or promote products derived from this software without */ +/* specific prior written permission of the institution or contributor. */ +/******************************************************************************/ + +#include + +#include "XrdSys/XrdSysPthread.hh" + +class XrdSsiScale +{ +public: + +static const int maxEnt = 32; +static const unsigned int maxPend = 65500; + +int getEnt() {entMutex.Lock(); + if (pendCnt[nowEnt] < maxPend) + {pendCnt[nowEnt]++; + entMutex.UnLock(); + return nowEnt; + } + int xEnt = (nowEnt < maxEnt ? nowEnt+1 : 0); + int zEnt = maxEnt; + do {for (int i = xEnt; i < zEnt; i++) + {if (pendCnt[i] < maxPend) + {pendCnt[i]++; + nowEnt = i; + entMutex.UnLock(); + return i; + } + } + if (!xEnt) break; + xEnt = 0; zEnt = nowEnt; + } while(true); + entMutex.UnLock(); + return -1; + } + +void retEnt(int xEnt) {entMutex.Lock(); pendCnt[xEnt]--; entMutex.UnLock();} + + XrdSsiScale() : nowEnt(0) {memset(pendCnt, 0, sizeof(uint16_t)*maxEnt);} + ~XrdSsiScale() {} + +private: + +XrdSysMutex entMutex; +uint16_t pendCnt[maxEnt]; +int nowEnt; +}; +#endif diff --git a/src/XrdSsi/XrdSsiServReal.cc b/src/XrdSsi/XrdSsiServReal.cc index 14c4c6ddb45..76d49cbdf44 100644 --- a/src/XrdSsi/XrdSsiServReal.cc +++ b/src/XrdSsi/XrdSsiServReal.cc @@ -31,8 +31,23 @@ #include #include +#include "XrdSsi/XrdSsiResource.hh" +#include "XrdSsi/XrdSsiScale.hh" #include "XrdSsi/XrdSsiServReal.hh" #include "XrdSsi/XrdSsiSessReal.hh" +#include "XrdSsi/XrdSsiTrace.hh" +#include "XrdSsi/XrdSsiUtils.hh" + +/******************************************************************************/ +/* S t a t i c s & G l o b a l s */ +/******************************************************************************/ + +namespace XrdSsi +{ + XrdSsiScale sidScale; +} + +using namespace XrdSsi; /******************************************************************************/ /* D e s t r u c t o r */ @@ -58,11 +73,11 @@ XrdSsiServReal::~XrdSsiServReal() /* Private: A l l o c */ /******************************************************************************/ -XrdSsiSessReal *XrdSsiServReal::Alloc(const char *sName) +XrdSsiSessReal *XrdSsiServReal::Alloc(const char *sName, int uent, bool hold) { XrdSsiSessReal *sP; -// Check if we can grab this from out queue +// Reuse or allocate a new session object and return it // myMutex.Lock(); actvSes++; @@ -70,71 +85,73 @@ XrdSsiSessReal *XrdSsiServReal::Alloc(const char *sName) {freeCnt--; freeSes = sP->nextSess; myMutex.UnLock(); - sP->InitSession(this, sName); + sP->InitSession(this, sName, uent, hold); } else { myMutex.UnLock(); - if (!(sP = new XrdSsiSessReal(this, sName))) + if (!(sP = new XrdSsiSessReal(this, sName, uent, hold))) {myMutex.Lock(); actvSes--; myMutex.UnLock();} } - -// Return the pointer -// return sP; } - + /******************************************************************************/ /* Private: G e n U R L */ /******************************************************************************/ -bool XrdSsiServReal::GenURL(XrdSsiService::Resource *rP, - char *buff, int blen, bool uCon) +bool XrdSsiServReal::GenURL(XrdSsiResource *rP, char *buff, int blen, int uEnt) { static const char affTab[] = "\0\0n\0w\0s\0S"; - const char *xUsr, *xAt, *iSep, *iVal, *tVar, *tVal, *uVar, *uVal; - const char *aVar, *aVal; + const char *xUsr, *xAt, *iSep, *iVal, *tVar, *tVal, *uVar, *uVal; + const char *aVar, *aVal, *qVal = ""; + char uBuff[8]; int n; - bool xCGI = false; // Preprocess avoid list, if any // - if (!(rP->rDesc.hAvoid) || !*(rP->rDesc.hAvoid)) tVar = tVal = ""; - else {tVar = "?tried="; - tVal = rP->rDesc.hAvoid; - xCGI = true; + if (rP->hAvoid.length() == 0) tVar = tVal = ""; + else {tVar = "&tried="; + tVal = rP->hAvoid.c_str(); + qVal = "?"; } // Preprocess affinity // - if (!(rP->rDesc.affinity)) aVar = aVal = ""; - else {aVar = (xCGI ? "&cms.aff=" : "?cms.aff="); - aVal = &affTab[rP->rDesc.affinity*2]; - xCGI = true; + if (!(rP->affinity)) aVar = aVal = ""; + else {aVar = "&cms.aff="; + aVal = &affTab[rP->affinity*2]; + qVal = "?"; } -// Check if we need to qualify the host with a user name +// Check if we need to add a user name // - if (!rP->rDesc.rUser || !(*rP->rDesc.rUser)) xUsr = xAt = uVar = uVal = ""; - else {uVar = (xCGI ? "&ssi.user=" : "?ssi.user="); - uVal = rP->rDesc.rUser; - xCGI = true; - if (!uCon) xUsr = xAt = ""; - else {xUsr = rP->rDesc.rUser; xAt = "@";} + if (rP->rUser.length() == 0) uVar = uVal = ""; + else {uVar = "&ssi.user="; + uVal = rP->rUser.c_str(); + qVal = "?"; } // Preprocess the cgi information // - if (!(rP->rDesc.rInfo) || !*(rP->rDesc.rInfo)) iSep = iVal = ""; - else {iVal = rP->rDesc.rInfo; - if (xCGI) iSep = (*iVal == '&' ? "" : "&"); - else iSep = (*iVal == '?' ? "" : "?"); + if (rP->rInfo.length() == 0) iSep = iVal = ""; + else {iVal = rP->rInfo.c_str(); + iSep = "&ssi.cgi="; + qVal = "?"; + } + +// Check if we need to qualify the host with a user index +// + if (uEnt == 0) xUsr = xAt = ""; + else {snprintf(uBuff, sizeof(uBuff), "%d", uEnt); + uVal = uBuff; + xAt = "@"; } // Generate appropriate url -// t a u i - n = snprintf(buff, blen, "xroot://%s%s%s/%s%s%s%s%s%s%s%s%s", - xUsr, xAt, manNode, - rP->rDesc.rName, tVar, tVal, aVar, aVal, - uVar, uVal, iSep, iVal); +// ? t a u i + n = snprintf(buff, blen, "xroot://%s%s%s/%s%s%s%s%s%s%s%s%s%s", + xUsr, xAt, manNode, rP->rName.c_str(), qVal, + tVar, tVal, aVar, aVal, + uVar, uVal, iSep, iVal); // Return overflow or not // @@ -142,48 +159,50 @@ bool XrdSsiServReal::GenURL(XrdSsiService::Resource *rP, } /******************************************************************************/ -/* P r o v i s i o n */ +/* P r o c e s s R e q u e s t */ /******************************************************************************/ - -void XrdSsiServReal::Provision(XrdSsiService::Resource *resP, - unsigned short timeOut, - bool userConn - ) + +void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef, + XrdSsiResource &resRef) { XrdSsiSessReal *sObj; - char epURL[4096]; + int uEnt; + char epURL[4096]; // Validate the resource name // - if (!resP->rDesc.rName || !(*resP->rDesc.rName)) - {resP->eInfo.Set("Resource name missing.", EINVAL); - resP->ProvisionDone(0); + if (resRef.rName.length() == 0) + {XrdSsiUtils::RetErr(reqRef, "Resource name missing.", EINVAL); return; } -// Construct url +// Get a sid entry number // - if (!GenURL(resP, epURL, sizeof(epURL), userConn)) - {resP->eInfo.Set("Resource url is too long.", ENAMETOOLONG); - resP->ProvisionDone(0); + if ((uEnt = sidScale.getEnt()) < 0) + {XrdSsiUtils::RetErr(reqRef, "Out of stream resources.", ENOSR); return; } -// Obtain a new session object +// Construct url // - if (!(sObj = Alloc(resP->rDesc.rName))) - {resP->eInfo.Set("Insufficient memory.", ENOMEM); - resP->ProvisionDone(0); + if (!GenURL(&resRef, epURL, sizeof(epURL), uEnt)) + {XrdSsiUtils::RetErr(reqRef, "Resource url is too long.", ENAMETOOLONG); + sidScale.retEnt(uEnt); return; } -// Now just effect an open to this resource +// Obtain a new session object (note the first request uses the session mutex) // - if (!(sObj->Open(resP, epURL, timeOut, - (resP->rDesc.rOpts & XrdSsiResource::autoUnP) != 0))) - {Recycle(sObj); - resP->ProvisionDone(0); - } + if (!(sObj = Alloc(resRef.rName.c_str(), uEnt))) + {XrdSsiUtils::RetErr(reqRef, "Insufficient memory.", ENOMEM); + sidScale.retEnt(uEnt); + return; + } else reqRef.SetMutex(sObj->MutexP()); + +// Now just provision this resource which will execute the request should it +// be successful. +// + if (!(sObj->Provision(&reqRef, epURL))) Recycle(sObj); } /******************************************************************************/ @@ -192,11 +211,14 @@ void XrdSsiServReal::Provision(XrdSsiService::Resource *resP, void XrdSsiServReal::Recycle(XrdSsiSessReal *sObj) { + EPNAME("Recycle"); + static const char *tident = "ServReal"; // Add to queue unless we have too many of these // myMutex.Lock(); actvSes--; + DEBUG("Sessions: free=" < +#include //----------------------------------------------------------------------------- //! The XrdSsiService object is used by the Scalable Service Interface to @@ -47,8 +47,8 @@ //! XrdSsiProviderServer defined in the plugin shared library. //----------------------------------------------------------------------------- -class XrdSsiEntity; -class XrdSsiSession; +class XrdSsiRequest; +class XrdSsiResource; class XrdSsiService { @@ -60,91 +60,61 @@ public: //! If it does not, initialization fails. //----------------------------------------------------------------------------- -static const int SsiVersion = 0x00010000; +static const int SsiVersion = 0x00020000; int GetVersion() {return SsiVersion;} //----------------------------------------------------------------------------- -//! The Resource class describes the session resource to be provisioned and is -//! and is used to communicate the results of the provisioning request. -//----------------------------------------------------------------------------- - -class Resource -{ -public: -XrdSsiResource rDesc; //!< The resource description -XrdSsiErrInfo eInfo; //!< Holds error information upon failure - -//----------------------------------------------------------------------------- -//! Handle the ending results of a Provision() call. It is called by the -//! Service object when provisioning that has actually started completes. +//! @brief Attach to a backgrounded request. //! -//! @param sessP !0 -> to the successfully provisioned session object. -//! =0 Provisioning failed, the eInfo object holds the reason. -//----------------------------------------------------------------------------- - -virtual void ProvisionDone(XrdSsiSession *sessP) = 0; //!< Callback - -//----------------------------------------------------------------------------- -//! Constructor - See XrdSsiResource for details on the arguments as they are -//! the same here as there. All XrdSsiResource member pointers must remain -//! stable until ProvisionDone() is called. -//----------------------------------------------------------------------------- - - Resource(const char *rname, - const char *havoid=0, - const char *ruser=0, - const char *rinfo=0, - XrdSsiResource::Affinity raff=XrdSsiResource::Default - ) : rDesc(rname, havoid, ruser, rinfo, raff) {} - -//----------------------------------------------------------------------------- -//! Destructor +//! When a client calls Attach() the server-side Attach() is invoked to +//! indicate that the backgrounded request is now a foreground request. Many +//! times such notfication is not needed so a default nil implementation is +//! provided. Server-side Attach() calls are always passed the original request +//! object reference so that it can pair up the request with the attach. +//! +//! @param reqRef Reference to the request object that is to attach to the +//! backgrounded request. It need not be the original request +//! object (client-side) but it always is the original request +//! object server-side. +//! +//! @param handle The handle provided to the XrdSsiRequest::ProcessResponse() +//! callback method via isHandle message type. +//! +//! @return All results are returned via the request object callback methods. //----------------------------------------------------------------------------- -virtual ~Resource() {} -}; +virtual void Attach(XrdSsiRequest &reqRef, std::string handle) {} //----------------------------------------------------------------------------- -//! Provision of service session; client-side or server-side. -//! -//! @param resP Pointer to the Resource object (see above) that describes -//! the resource to be provisioned. -//! -//! @param timeOut the maximum number seconds the operation may last before -//! it is considered to fail. A zero value uses the default. -//! -//! @param userConn when false, prexisting TCP connections are shared even when -//! rUser is unique. Otherwise, TCP connections are only -//! shared for the same rUser to the same endpoint. -//! -//! @return The method returns all results via resP->ProvisionDone() callback -//! which may use the calling thread or a new thread. +//! @brief Process a request; client-side or server-side. //! -//! Special notes for server-side processing: +//! When a client calls ProcessRequest() the same method is called server-side +//! with the same parameters that the client specified except for timeOut which +//! is always set to zero server-side. //! -//! 1) Two special errors are recognized that allow for a client retry: +//! @param reqRef Reference to the Request object that describes the +//! request. //! -//! resP->eInfo.eNum = EAGAIN (client should retry elsewhere) -//! resP->eInfo.eMsg = the host name where the client is redirected -//! resP->eInfo.eArg = the port number to be used by the client +//! @param resRef Reference to the Resource object that describes the +//! resource that the request will be using. //! -//! resP->eInfo.eNum = EBUSY (client should wait and then retry). -//! resP->eInfo.eMsg = an optional reason for the wait. -//! resP->eInfo.eArg = the number of seconds the client should wait. +//! @return All results are returned via the request object callback methods. +//! For background queries, the XrdSsiRequest::ProcessResponse() is +//! called with a response type of isHandle when the request is handed +//! off to the endpoint for execution (see XrdSsiRequest::SetDetachTTL). //----------------------------------------------------------------------------- -virtual void Provision(Resource *resP, - unsigned short timeOut=0, - bool userConn=false - ) = 0; +virtual void ProcessRequest(XrdSsiRequest &reqRef, + XrdSsiResource &resRef + ) = 0; //----------------------------------------------------------------------------- -//! Stop the client-side service. This is never called server-side. +//! @brief Stop the client-side service. This is never called server-side. //! //! @return true Service has been stopped and this object has been deleted. //! @return false Service cannot be stopped because there are still active -//! sessions. Unprovision all the sessions then call Stop(). +//! foreground requests. Cancel the requests then call Stop(). //----------------------------------------------------------------------------- virtual bool Stop() {return false;} diff --git a/src/XrdSsi/XrdSsiSessReal.cc b/src/XrdSsi/XrdSsiSessReal.cc index 1403c578a4e..a8e1cad790b 100644 --- a/src/XrdSsi/XrdSsiSessReal.cc +++ b/src/XrdSsi/XrdSsiSessReal.cc @@ -32,14 +32,21 @@ #include #include #include +#include #include #include +#include "XrdSsi/XrdSsiAtomics.hh" #include "XrdSsi/XrdSsiRequest.hh" #include "XrdSsi/XrdSsiRRInfo.hh" +#include "XrdSsi/XrdSsiUtils.hh" +#include "XrdSsi/XrdSsiScale.hh" #include "XrdSsi/XrdSsiServReal.hh" #include "XrdSsi/XrdSsiSessReal.hh" +#include "XrdSsi/XrdSsiTaskReal.hh" #include "XrdSsi/XrdSsiTrace.hh" +#include "XrdSsi/XrdSsiUtils.hh" + #include "XrdSys/XrdSysHeaders.hh" using namespace XrdSsi; @@ -75,6 +82,15 @@ namespace const char *tident = 0; } +/******************************************************************************/ +/* G l o b a l s */ +/******************************************************************************/ + +namespace XrdSsi +{ +extern XrdSsiScale sidScale; +} + /******************************************************************************/ /* D e s t r u c t o r */ /******************************************************************************/ @@ -90,119 +106,26 @@ XrdSsiSessReal::~XrdSsiSessReal() } /******************************************************************************/ -/* I n i t S e s s i o n */ +/* Private: E x e c u t e */ /******************************************************************************/ -void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName) -{ - resource = 0; - attBase = 0; - freeTask = 0; - pendTask = 0; - myService = servP; - nextTID = 0; - alocLeft = XrdSsiRRInfo::maxID; - doUnProv = false; - stopping = false; - if (sName) - {if (sessName) free(sessName); - sessName = strdup(sName); - } -} +// Called with sessMutex locked! -/******************************************************************************/ -/* Private: M a p E r r */ -/******************************************************************************/ - -int XrdSsiSessReal::MapErr(int xEnum) -{ - switch(xEnum) - {case kXR_NotFound: return ENOENT; - case kXR_NotAuthorized: return EACCES; - case kXR_IOError: return EIO; - case kXR_NoMemory: return ENOMEM; - case kXR_NoSpace: return ENOSPC; - case kXR_ArgTooLong: return ENAMETOOLONG; - case kXR_noserver: return EHOSTUNREACH; - case kXR_NotFile: return ENOTBLK; - case kXR_isDirectory: return EISDIR; - case kXR_FSError: return ENOSYS; - default: return ECANCELED; - } -} - -/******************************************************************************/ -/* O p e n */ -/******************************************************************************/ - -bool XrdSsiSessReal::Open(XrdSsiService::Resource *resP, - const char *epURL, - unsigned short tOut, - bool finup) -{ - EPNAME("SessOpen"); - XrdCl::XRootDStatus epStatus; - -// Set resource, we will need to call ProvisionDone() on it later -// - resource = resP; - doUnProv = finup; - -// Issue the open and if the open was started, return success. -// - DEBUG("Opening " <eInfo); - return false; -} - -/******************************************************************************/ -/* P r o c e s s R e q u e s t */ -/******************************************************************************/ - -void XrdSsiSessReal::ProcessRequest(XrdSsiRequest *reqP, unsigned short tOut) +bool XrdSsiSessReal::Execute(XrdSsiRequest *reqP) { - EPNAME("SessProcReq"); XrdCl::XRootDStatus Status; XrdSsiRRInfo rrInfo; XrdSsiTaskReal *tP, *ptP; char *reqBuff; int reqBlen; -// Make sure the file is open -// - if (!epFile.IsOpen()) - {RequestFailed(reqP, "Session not provisioned.", ENOTCONN); return;} - // Get the request information // reqBuff = reqP->GetRequest(reqBlen); -// Remainder of the code here must have the mutex to hold off stops -// - myMutex.Lock(); - // Allocate a task object for this request // - if ((tP = freeTask)) freeTask = tP->attList.next; - else {if (!alocLeft || !(tP = new XrdSsiTaskReal(this, nextTID))) - {myMutex.UnLock(); - RequestFailed(reqP, "Too many active requests.", EMLINK); - return; - } - alocLeft--; nextTID++; - } - -// Initialize the task -// - tP->Init(reqP, tOut); - DEBUG("Task=" <attList.next;} - -// If we found it unchain and release it +// Insert the task into our list of tasks // - if (ntP) - {if (ptP) ptP->attList.next = ntP->attList.next; - else pendTask = ntP->attList.next; - RelTask(tP); - } else { - DEBUG("Task=" <GetResponder(); - XrdSsiTaskReal *rtP; - void *objHandle; - -// If we have no task then we are really done here (we may need to unprovision) -// - DEBUG("Request="<Kill()) RelTask(rtP); - else {rtP->attList.next = pendTask; pendTask = rtP; - DEBUG("Removal deferred; Task="<Init(reqP, reqP->GetTimeOut()); + DEBUG("Task=" <GetTimeOut()); + +// If there was an error, scuttle the request +// + if (!epStatus.IsOK()) + {std::string eTxt; + int eNum = XrdSsiUtils::GetErr(epStatus, eTxt); + XrdSsiUtils::RetErr(*requestP, eTxt.c_str(), eNum); + XrdSsi::sidScale.retEnt(uEnt); + return false; } + +// We succeeded. So, bind to this request so we can respond with any errors +// + inOpen = true; + requestP = reqP; + BindRequest(*reqP); + return true; } +/******************************************************************************/ +/* Private: R e l T a s k */ /******************************************************************************/ -void XrdSsiSessReal::SetErr(XrdCl::XRootDStatus &Status, - int &eNum, const char **eText) +void XrdSsiSessReal::RelTask(XrdSsiTaskReal *tP) // sessMutex locked! { -// If this is an xrootd error then get the xrootd generated error +// Delete this task or place it on the free list // - if (Status.code == XrdCl::errErrorResponse) - {*eText = Status.GetErrorMessage().c_str(); - eNum = MapErr(Status.errNo); - } else { - *eText = Status.ToStr().c_str(), - eNum = (Status.errNo ? Status.errNo : EFAULT); - } + if (!isHeld) delete tP; + else {tP->attList.next = freeTask; + freeTask = tP; + } } /******************************************************************************/ /* Private: S h u t d o w n */ /******************************************************************************/ + +// Called with sessMutex locked and return with it unlocked void XrdSsiSessReal::Shutdown(XrdCl::XRootDStatus &epStatus) { // If the close failed then we cannot recycle this object as it is not reusable +//?? Future: notify service of this if we are being held. // - if (!epStatus.IsOK()) - {XrdSsiErrInfo eInfo; - const char *eText; - int eNum; - SetErr(epStatus, eInfo); - eText = eInfo.Get(eNum); - cerr <<"Unprovision "<Recycle(this); } } - + /******************************************************************************/ -/* U n p r o v i s i o n */ +/* T a s k F i n i s h e d */ /******************************************************************************/ -bool XrdSsiSessReal::Unprovision(bool forced) +void XrdSsiSessReal::TaskFinished(XrdSsiTaskReal *tP) { - XrdSysMutexHelper rHelp(&myMutex); - XrdCl::XRootDStatus epStatus; - XrdSsiTaskReal *tP = pendTask; +// Lock our mutex +// + sessMutex.Lock(); -// Make sure there are no outstanding requests +// Remove task from the task list if it's in it // - if (attBase) return false; + if (tP == attBase || tP->attList.next != tP) + {REMOVE(attBase, attList, tP);} -// Make sure we are bing called only once +// Clear asny pending task events and decrease active count // - if (stopping) return true; - stopping = true; + tP->ClrEvent(); + numAT--; -// If we have any pending tasks then detach them, they will be deleted later +// Return the request entry number // - numPT = 0; - while(tP) {tP->Detach(true); tP = tP->attList.next; numPT++;} - pendTask = 0; + XrdSsi::sidScale.retEnt(uEnt); -// Close the file associated with this session if we have no pending tasks. If -// error occurs, the move ahead to Shutdown(). We cannot be holding our mutex! +// Place the task on the free list. If we can shutdown, then unprovision which +// will drive a shutdown. The returns without the sessMutex, otherwise we must +// unlock it before we return. // - if (!numPT) - {epStatus = epFile.Close((XrdCl::ResponseHandler *)this); - if (!epStatus.IsOK()) {rHelp.UnLock(); Shutdown(epStatus);} - } + RelTask(tP); + if (!isHeld && numAT < 1) Unprovision(); + else sessMutex.UnLock(); +} -// All done +/******************************************************************************/ +/* Private: U n p r o v i s i o n */ +/******************************************************************************/ + +// Called with sessMutex locked and returns with it unlocked + +void XrdSsiSessReal::Unprovision() // Called with sessMutex locked! +{ + XrdCl::XRootDStatus uStat; + +// Close the file this will schedule a shutdown if successful // - return true; + ClrEvent(); + uStat = epFile.Close((XrdCl::ResponseHandler *)this); + +// If this was not successful then we can do the shutdown right now. Note that +// Shutdown() unlocks the sessMutex. +// + if (!uStat.IsOK()) Shutdown(uStat); + else sessMutex.UnLock(); } /******************************************************************************/ @@ -452,34 +388,58 @@ bool XrdSsiSessReal::Unprovision(bool forced) bool XrdSsiSessReal::XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP) { - XrdSysMutexHelper rHelp(&myMutex); - XrdSsiSessReal *sObj = 0; -// If we are stopping then simply recycle ourselves. We must not be holding -// our mutex when we do so! +// Lock the session. We keep the lock if there is going to any continuation +// via the event handler. Otherwise, drop the lock. +// + sessMutex.Lock(); + +// If we are not in the open phase then this is due to a close event. Simply +// do a shutdown and return to stop event processing. +// + if (!inOpen) + {Shutdown(*status); + return false; + } + inOpen = false; + +// Check if the request that triggered the open was cancelled. If so, bail. +// Note that shutdown and unprovision unlock the sessMutex. +// + if (!requestP) + {if (!status->IsOK()) Shutdown(*status); + else {if (!isHeld) Unprovision();} + return false; + } + +// We are here because the open finally completed. If the open failed, then +// tell Finish() to do a shutdown and post an error response. // - if (stopping) - {rHelp.UnLock(); - Shutdown(*status); + if (!status->IsOK()) + {std::string eTxt; + int eNum = XrdSsiUtils::GetErr(*status, eTxt); + doStop = true; + sessMutex.UnLock(); + SetErrResponse(eTxt.c_str(), eNum); return false; } -// We are here because the open finally completed. Check for errors. +// Obtain the endpoint name // - if (status->IsOK()) - {std::string currNode; - if (epFile.GetProperty(dsProperty, currNode)) - {if (sessNode) free(sessNode); - sessNode = strdup(currNode.c_str()); - sObj = this; - } else resource->eInfo.Set("Unable to get node name!",EADDRNOTAVAIL); - } else SetErr(*status, resource->eInfo); - -// Do appropriate callback. Be careful, as the below is set up to allow the -// callback to implicitly delete us should it unprovision the session. So, -// we drop out lock at this point so we neither deadlock nor get SEGV. + std::string currNode; + if (epFile.GetProperty(dsProperty, currNode)) + {if (sessNode) free(sessNode); + sessNode = strdup(currNode.c_str()); + } else sessNode = strdup("Unknown!"); + +// Execute the request. If we failed and this is a single request session then +// we need to disband he session. We delay this until Finish() is called. // - rHelp.UnLock(); - resource->ProvisionDone(sObj); - return stopping; + if (!Execute(requestP) && !isHeld) + {if (!requestP) Unprovision(); + else {doStop = true; + sessMutex.UnLock(); + } + } else sessMutex.UnLock(); + return false; } diff --git a/src/XrdSsi/XrdSsiSessReal.hh b/src/XrdSsi/XrdSsiSessReal.hh index c06fdb04011..3dca14c65c0 100644 --- a/src/XrdSsi/XrdSsiSessReal.hh +++ b/src/XrdSsi/XrdSsiSessReal.hh @@ -32,80 +32,77 @@ #include #include "XrdCl/XrdClFile.hh" +#include "XrdSsi/XrdSsiAtomics.hh" #include "XrdSsi/XrdSsiEvent.hh" #include "XrdSsi/XrdSsiResponder.hh" -#include "XrdSsi/XrdSsiService.hh" -#include "XrdSsi/XrdSsiSession.hh" -#include "XrdSsi/XrdSsiTaskReal.hh" #include "XrdSys/XrdSysPthread.hh" class XrdSsiServReal; +class XrdSsiTaskReal; -class XrdSsiSessReal : public XrdSsiSession, public XrdSsiResponder, - public XrdSsiEvent +class XrdSsiSessReal : public XrdSsiEvent, public XrdSsiResponder { public: -union -{XrdSsiSessReal *nextSess; - XrdSsiService::Resource *resource; -}; - - - void InitSession(XrdSsiServReal *servP=0, const char *sName=0); - - void Lock() {myMutex.Lock();} - -XrdSysMutex *MutexP() {return &myMutex;} +XrdSsiSessReal *nextSess; - bool Open(XrdSsiService::Resource *resP, const char *epURL, - unsigned short tOut, bool finup); + bool Execute(XrdSsiRequest *reqP); - void ProcessRequest(XrdSsiRequest *reqP, unsigned short tOut=0); + void Finished( XrdSsiRequest &rqstR, + const XrdSsiRespInfo &rInfo, + bool cancel=false); - void Recycle(XrdSsiTaskReal *tP); + void InitSession(XrdSsiServReal *servP, + const char *sName, + int uent, + bool hold); - void RequestFinished( XrdSsiRequest *reqP, - const XrdSsiRespInfo &rInfo, - bool cancel=false); + void Lock() {sessMutex.Lock();} -static void SetErr(XrdCl::XRootDStatus &Status, - int &eNum, const char **eText); +XrdSsiMutex *MutexP() {return &sessMutex;} -static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo); + bool Provision(XrdSsiRequest *reqP, const char *epURL); - void UnLock() {myMutex.UnLock();} + void TaskFinished(XrdSsiTaskReal *tP); - bool Unprovision(bool forced=false); + void UnLock() {sessMutex.UnLock();} bool XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP); - XrdSsiSessReal(XrdSsiServReal *servP, const char *sName) - : XrdSsiSession(strdup(sName), 0), - XrdSsiResponder(this, (void *)0), - XrdSsiEvent("SessReal") - {InitSession(servP);} + XrdSsiSessReal(XrdSsiServReal *servP, + const char *sName, + int uent, + bool hold=false) + : XrdSsiEvent("SessReal"), + sessMutex(XrdSsiMutex::Recursive), + sessName(0), sessNode(0) + {InitSession(servP, sName, uent, hold);} ~XrdSsiSessReal(); -XrdCl::File epFile; +XrdCl::File epFile; private: -static int MapErr(int xEnum); +XrdSsiTaskReal *NewTask(XrdSsiRequest *reqP); void RelTask(XrdSsiTaskReal *tP); -void RequestFailed(XrdSsiRequest *rqstP,const char *eText,int eCode); void Shutdown(XrdCl::XRootDStatus &epStatus); +void Unprovision(); -XrdSysRecMutex myMutex; +XrdSsiMutex sessMutex; XrdSsiServReal *myService; XrdSsiTaskReal *attBase; XrdSsiTaskReal *freeTask; XrdSsiTaskReal *pendTask; -short nextTID; -short alocLeft; -short numPT; -bool doUnProv; -bool stopping; +XrdSsiRequest *requestP; +char *sessName; +char *sessNode; +int16_t nextTID; +int16_t alocLeft; +int16_t numAT; // Number of active tasks +int16_t uEnt; // User index for scaling +bool isHeld; +bool inOpen; +bool doStop; }; #endif diff --git a/src/XrdSsi/XrdSsiSession.hh b/src/XrdSsi/XrdSsiSession.hh deleted file mode 100644 index 0c26b6e6f44..00000000000 --- a/src/XrdSsi/XrdSsiSession.hh +++ /dev/null @@ -1,159 +0,0 @@ -#ifndef __XRDSSISESSION_HH__ -#define __XRDSSISESSION_HH__ -/******************************************************************************/ -/* */ -/* X r d S s i S e s s i o n . h h */ -/* */ -/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */ -/* Produced by Andrew Hanushevsky for Stanford University under contract */ -/* DE-AC02-76-SFO0515 with the Department of Energy */ -/* */ -/* This file is part of the XRootD software suite. */ -/* */ -/* XRootD is free software: you can redistribute it and/or modify it under */ -/* the terms of the GNU Lesser General Public License as published by the */ -/* Free Software Foundation, either version 3 of the License, or (at your */ -/* option) any later version. */ -/* */ -/* XRootD is distributed in the hope that it will be useful, but WITHOUT */ -/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ -/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ -/* License for more details. */ -/* */ -/* You should have received a copy of the GNU Lesser General Public License */ -/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ -/* COPYING (GPL license). If not, see . */ -/* */ -/* The copyright holder's institutional names and contributor's names may not */ -/* be used to endorse or promote products derived from this software without */ -/* specific prior written permission of the institution or contributor. */ -/******************************************************************************/ - -#include - -#include "XrdSsi/XrdSsiErrInfo.hh" - -//----------------------------------------------------------------------------- -//! The XrdSsiSession object is used by the Scalable Service Interface to -//! process client requests. There is one session object per logical connection. -//! The XrdSsiServer::Provision() method supplies the object. You must supply a -//! server-side implementation based on the abstract class below. One is -//! already provided for client-side use. -//----------------------------------------------------------------------------- - -class XrdSsiRequest; -struct XrdSsiRespInfo; -class XrdSsiTask; - -class XrdSsiSession -{ -public: -friend class XrdSsiRequest; - -//----------------------------------------------------------------------------- -//! Get the session's location (i.e. endpoint hostname and port) -//! -//! @return A pointer to the session's location. It remains valid until the -//! session is unprovisioned. A null means the session is not bound -//! to any endpoint. -//----------------------------------------------------------------------------- -inline -const char *Location() {return sessNode;} - -//----------------------------------------------------------------------------- -//! Get the session's name. -//! -//! @return A pointer to the session's name. It remains valid until the -//! session is unprovisioned. -//----------------------------------------------------------------------------- -inline -const char *Name() {return sessName;} - -//----------------------------------------------------------------------------- -//! Process a new request -//! -//! Client-side: ProcessRequest() should be called with an XrdSsiRequest object -//! to process a new request by a session. The session object -//! assumes ownership of the request object until the request is -//! completed or canceled. -//! -//! Server-side: ProcessRequest() is called when a new request is received. -//! It is always called using a new thread. -//! -//! @param reqP pointer to the object describing the request. This object -//! is also used to effect a response. Ownwership of the -//! request object is transfered to the session object. It must -//! remain valid until after XrdSsiRequest::Finished() is called. -//! -//! @param tOut the maximum time the request should take. A value of zero -//! uses the default value. -//! -//! @return All results are returned via the request's ProcessResponse callback. -//----------------------------------------------------------------------------- - -virtual void ProcessRequest(XrdSsiRequest *reqP, unsigned short tOut=0) = 0; - -//----------------------------------------------------------------------------- -//! Unprovision a session. -//! -//! Client-side: All outstanding requests must be finished by calling -//! XrdSsiRequest::Finished() prior to calling Unprovision(). -//! -//! Server-side: Unprovision() is called only after XrdSsiRequest::Finished() -//! is called on all outstanding requests indicating cancellation. -//! -//! The session object should release all if its resources and recycle or -//! delete itself. -//! -//! @param forced when true, the connection was lost. -//! -//! @return true Session unprovisioned, the object is no longer valid. -//! -//! @return false Session could not be unprovisioned because there are still -//! unfinished requests. -//----------------------------------------------------------------------------- - -virtual bool Unprovision(bool forced=false) = 0; - -//----------------------------------------------------------------------------- -//! Constructor -//! -//! @param sname pointer to session's name. -//! @param sloc pointer to session's location as "hostname:port" (optional). -//----------------------------------------------------------------------------- - - XrdSsiSession(char *sname, char *sloc=0) - : sessName(sname), sessNode(sloc) {} - -protected: - -//----------------------------------------------------------------------------- -//! Notify a session that a request either completed or was canceled. This -//! allows the session to release any resources given to the request object -//! (e.g. data response buffer or a stream). Upon return the object is owned by -//! the object's creator who is responsible for releaasing or recyling the -//! object. This method is automatically invoked by XrdSsiRequest::Finish(). -//! -//! @param rqstP pointer to the object describing the request. -//! @param rInfo reference to the object describing the response. -//! @param cancel False -> the request/response interaction completed. -//! True -> the request/response interaction aborted because -//! of an error or the client requested that the -//! request be canceled. -//----------------------------------------------------------------------------- - -virtual void RequestFinished( XrdSsiRequest *rqstP, - const XrdSsiRespInfo &rInfo, - bool cancel=false) = 0; - -//----------------------------------------------------------------------------- -//! Destructor is protected. You cannot use delete on a session, use -//! Unprovision() to effectively delete the session object. -//----------------------------------------------------------------------------- - -virtual ~XrdSsiSession() {} - -char *sessName; -char *sessNode; -}; -#endif diff --git a/src/XrdSsi/XrdSsiSfsConfig.cc b/src/XrdSsi/XrdSsiSfsConfig.cc index 6534f45915a..d05dee34334 100644 --- a/src/XrdSsi/XrdSsiSfsConfig.cc +++ b/src/XrdSsi/XrdSsiSfsConfig.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -380,8 +381,10 @@ int XrdSsiSfsConfig::ConfigSvc(char **myArgv, int myArgc) // Initialize the provider // - if (!(Provider->Init(&SsiLogger, (XrdSsiCluster *)SsiCms, ConfigFN, - SvcParms, myArgc, myArgv))) + if (!(Provider->Init(&SsiLogger, (XrdSsiCluster *)SsiCms, + std::string(ConfigFN), + std::string(SvcParms ? SvcParms : ""), + myArgc, myArgv))) {Log.Emsg("Config", "Provider initialization failed."); return 1; } @@ -392,8 +395,8 @@ int XrdSsiSfsConfig::ConfigSvc(char **myArgv, int myArgc) // Otherwise we need to get the service object (we get only one) // - if (!(Service = Provider->GetService(eInfo, 0))) - {const char *eText = eInfo.Get(); + if (!(Service = Provider->GetService(eInfo, ""))) + {const char *eText = eInfo.Get().c_str(); Log.Emsg("Config", "Unable to obtain server-side service object;", (eText ? eText : "reason unknown.")); } diff --git a/src/XrdSsi/XrdSsiStream.hh b/src/XrdSsi/XrdSsiStream.hh index b914ad30ec9..0e18ed9fb75 100644 --- a/src/XrdSsi/XrdSsiStream.hh +++ b/src/XrdSsi/XrdSsiStream.hh @@ -31,10 +31,13 @@ #include +#include "XrdSsi/XrdSsiErrInfo.hh" + //----------------------------------------------------------------------------- //! The XrdSsiStream class describes an object capable of providing data for a //! response in real time. A pointer to such an object may be used to set this //! response mode via XrdSsiResponder::SetResponse(). Two kinds of streams exist: +//! //! Active the stream supplies the buffer that contains the response data. //! The buffer is recycled via Buffer::Recycle() once the response data //! is sent. Active streams are supported only server-side. @@ -47,9 +50,6 @@ //! must supply an implementation for the associated stream type. //----------------------------------------------------------------------------- -class XrdSsiErrInfo; -class XrdSsiRequest; - class XrdSsiStream { public: @@ -75,7 +75,7 @@ virtual ~Buffer() {} //----------------------------------------------------------------------------- //! Synchronously obtain data from an active stream (server-side only). //! -//! @param eInfo The object to receive any error description. +//! @param eRef The object to receive any error description. //! @param dlen input: the optimal amount of data wanted (this is a hint) //! output: the actual amount of data returned in the buffer. //! @param last input: should be set to false. @@ -84,36 +84,38 @@ virtual ~Buffer() {} //! //! @return =0 No more data remains or an error occurred: //! last = true: No more data remains. -//! last = false: A fatal error occurred, eInfo has the reason. +//! last = false: A fatal error occurred, eRef has the reason. //! @return !0 Pointer to the Buffer object that contains a pointer to the //! the data (see below). The buffer must be returned to the //! stream using Buffer::Recycle(). The next member is usable. //----------------------------------------------------------------------------- -virtual Buffer *GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) - {eInfo.Set("Not an active stream", EOPNOTSUPP); return 0;} +virtual Buffer *GetBuff(XrdSsiErrInfo &eRef, int &dlen, bool &last) + {eRef.Set("Not an active stream", EOPNOTSUPP); return 0;} //----------------------------------------------------------------------------- //! Asynchronously obtain data from a passive stream (client-side only). //! -//! @param rqstP The request object whose ProcessResponseData() method is to be -//! called when data or an async error is ready for proccesing. +//! @param eRef reference to where error information is to be placed for +//! encountered before during the stream initiation. When data is +//! ready for processing, the ProcessResponseData() callback is +//! called on the request associated with this stream. //! Also see XrdSsiRequest::GetResponseData() helper method. //! @param buff pointer to the buffer to receive the data. The buffer must -//! remain valid until rqstP->ProcessResponse() is called. +//! remain valid until ProcessResponse() is called. //! @param blen the length of the buffer (i.e. maximum that can be returned). //! //! @return true The stream has been successfully scheduled to return the data. -//! @return false The stream could not be scheduled; rqstP->eInfo holds the -//! the reson for the failure. +//! @return false The stream could not be scheduled; eRef holds the reason. //----------------------------------------------------------------------------- -virtual bool SetBuff(XrdSsiRequest *rqstP, char *buff, int blen); +virtual bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen) + {eRef.Set("Not a passive stream", EOPNOTSUPP); return false;} //----------------------------------------------------------------------------- //! Synchronously obtain data from a passive stream (client- or server-side). //! -//! @param eInfo The object to receive any error description. +//! @param eRef The object to receive any error description. //! @param buff pointer to the buffer to receive the data. //! request object is notified that the operation completed. //! @param blen the length of the buffer (i.e. maximum that can be returned). @@ -123,11 +125,11 @@ virtual bool SetBuff(XrdSsiRequest *rqstP, char *buff, int blen); //! //! @return >0 The number of bytes placed in buff. //! @return =0 No more data remains and the stream becomes invalid. -//! @return <0 Fatal error occured; eInfo holds the reason. +//! @return <0 Fatal error occured; eRef holds the reason. //----------------------------------------------------------------------------- -virtual int SetBuff(XrdSsiErrInfo &eInfo, char *buff, int blen, bool &last) - {eInfo.Set("Not a passive stream", EOPNOTSUPP); return 0;} +virtual int SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen, bool &last) + {eRef.Set("Not a passive stream", EOPNOTSUPP); return 0;} //----------------------------------------------------------------------------- //! Stream type descriptor: @@ -157,20 +159,4 @@ protected: const StreamType SType; }; - -//----------------------------------------------------------------------------- -//! The following are default implementation for methods within this class so -//! that unneeded methods needs not be supplied. This implementation must be -//! available for all compilation units and is relies on XrdSsiRequest. Be -//! aware that server-side passive streams you must supply an synchronous -//! implementation. Normally, you will get better performance server-side using -//! an active stream as data will not be copied before it is sent to the client. -//----------------------------------------------------------------------------- - -#include "XrdSsi/XrdSsiRequest.hh" - -inline bool XrdSsiStream::SetBuff(XrdSsiRequest *rqstP, char *buff, int blen) - {rqstP->eInfo.Set("Not a passive stream", EOPNOTSUPP); - return false; - } #endif diff --git a/src/XrdSsi/XrdSsiTaskReal.cc b/src/XrdSsi/XrdSsiTaskReal.cc index cd4035f63df..2b1019991ba 100644 --- a/src/XrdSsi/XrdSsiTaskReal.cc +++ b/src/XrdSsi/XrdSsiTaskReal.cc @@ -27,17 +27,22 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ +#include + +#include "XrdSsi/XrdSsiAtomics.hh" #include "XrdSsi/XrdSsiRRInfo.hh" #include "XrdSsi/XrdSsiRequest.hh" +#include "XrdSsi/XrdSsiScale.hh" #include "XrdSsi/XrdSsiSessReal.hh" #include "XrdSsi/XrdSsiTaskReal.hh" #include "XrdSsi/XrdSsiTrace.hh" +#include "XrdSsi/XrdSsiUtils.hh" #include "XrdSys/XrdSysHeaders.hh" using namespace XrdSsi; /******************************************************************************/ -/* G l o b a l D a t a */ +/* L o c a l S t a t i c s */ /******************************************************************************/ namespace @@ -45,12 +50,88 @@ namespace const char *statName[] = {"isWrite", "isSync", "isReady", "isDone", "isDead"}; -XrdSsiSessReal voidSession(0, "voidSession"); +XrdSsiSessReal voidSession(0, "voidSession", 0); + +std::string pName("ReadRecovery"); +std::string pValue("false"); -char zedData = 0; const char *tident = 0; +char zedData = 0; } + +/******************************************************************************/ +/* G l o b a l s */ +/******************************************************************************/ + +namespace XrdSsi +{ +extern XrdSsiScale sidScale; +} + +/******************************************************************************/ +/* L o c a l C l a s s e s */ +/******************************************************************************/ +namespace +{ +class AlertMsg : public XrdSsiRespInfoMsg +{ +public: + +void Recycle(bool sent=true) {delete respObj; delete this;} + + AlertMsg(XrdCl::AnyObject *resp, char *dbuff, int dlen) + : XrdSsiRespInfoMsg(dbuff, dlen), respObj(resp) {} + + ~AlertMsg() {} + +private: +XrdCl::AnyObject *respObj; +}; +} + +/******************************************************************************/ +/* Private: A s k 4 R e s p */ +/******************************************************************************/ + +// Called with session mutex locked and returns with it unlocked! + +bool XrdSsiTaskReal::Ask4Resp() +{ + EPNAME("Ask4Resp"); + + XrdCl::XRootDStatus epStatus; + XrdSsiRRInfo rInfo; + XrdCl::Buffer qBuff(sizeof(unsigned long long)); + +// Disable read recovery +// + sessP->epFile.SetProperty(pName, pValue); + +// Compose request to wait for the response +// + rInfo.Id(tskID); rInfo.Cmd(XrdSsiRRInfo::Rwt); + memcpy(qBuff.GetBuffer(), rInfo.Data(), sizeof(long long)); + +// Do some debugging +// + DEBUG("Calling fcntl id=" <epFile.Fcntl(qBuff, (ResponseHandler *)this, tmOut); + +// Dianose any errors. If any occurred we simply return an error response but +// otherwise let this go as it really is not a logic error. +// + if (!epStatus.IsOK()) return RespErr(&epStatus); + mhPend = true; + defer = false; + tStat = isSync; + sessP->UnLock(); + return true; +} + /******************************************************************************/ /* D e t a c h */ /******************************************************************************/ @@ -60,6 +141,30 @@ void XrdSsiTaskReal::Detach(bool force) if (force) sessP = &voidSession; } +/******************************************************************************/ +/* F i n i s h e d */ +/******************************************************************************/ + +// Note that if we are called then Finished() must have been called while we +// were still in the open phase. + +void XrdSsiTaskReal::Finished(XrdSsiRequest &rqstR, + const XrdSsiRespInfo &rInfo, bool cancel) +{ + EPNAME("TaskReqFin"); + XrdSsiMutexMon rHelp(sessP->MutexP()); + +// Do some debugging +// + DEBUG("Request="<<&rqstR<<" cancel="<UnLock(); DEBUG("Redriving ProcessResponseData; len="<UnLock(); // Reflect an error to the request object. // - eText = eInfo.Get(eNum); - SetErrResponse(eText, eNum); + SetErrResponse(eTxt.c_str(), eNum); + return false; } /******************************************************************************/ /* S e t B u f f */ /******************************************************************************/ -int XrdSsiTaskReal::SetBuff(XrdSsiErrInfo &eInfo, +int XrdSsiTaskReal::SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen, bool &last) { EPNAME("TaskSetBuff"); - XrdSysMutexHelper rHelp(sessP->MutexP()); + XrdSsiMutexMon rHelp(sessP->MutexP()); XrdCl::XRootDStatus epStatus; XrdSsiRRInfo rrInfo; union {uint32_t ubRead; int ibRead;}; @@ -241,7 +351,7 @@ int XrdSsiTaskReal::SetBuff(XrdSsiErrInfo &eInfo, DEBUG("Sync Status=" <MutexP()); + XrdSsiMutexMon rHelp(sessP->MutexP()); XrdCl::XRootDStatus epStatus; XrdSsiRRInfo rrInfo; @@ -278,21 +388,21 @@ bool XrdSsiTaskReal::SetBuff(XrdSsiRequest *reqP, char *buff, int blen) // DEBUG("Async Status=" <eInfo.Set("Stream is already active", EINPROGRESS); + {eRef.Set("Stream is already active", EINPROGRESS); return false; } // Make sure the buffer length is valid // if (blen <= 0) - {reqP->eInfo.Set("Buffer length invalid", EINVAL); + {eRef.Set("Buffer length invalid", EINVAL); return false; } @@ -302,7 +412,7 @@ bool XrdSsiTaskReal::SetBuff(XrdSsiRequest *reqP, char *buff, int blen) // Issue a read // - dataBuff = buff; dataRlen = blen; rqstP = reqP; + dataBuff = buff; dataRlen = blen; epStatus = sessP->epFile.Read(rrInfo.Info(), (uint32_t)blen, buff, (XrdCl::ResponseHandler *)this, tmOut); @@ -312,12 +422,40 @@ bool XrdSsiTaskReal::SetBuff(XrdSsiRequest *reqP, char *buff, int blen) // We failed, return an error // - XrdSsiSessReal::SetErr(epStatus, reqP->eInfo); + XrdSsiUtils::SetErr(epStatus, eRef); tStat = isDone; DEBUG("Task Async SetBuff error id=" <Lock(); + +// Check if finished has been called while we were defered +// + if (tStat == isDead) + {DEBUG("Task Handler calling TaskFinished."); + sessP->UnLock(); + sessP->TaskFinished(this); + return false; + } + +// We can continue, no deferals are needed at this point +// + defer = false; + sessP->UnLock(); + return true; +} + /******************************************************************************/ /* X e q E v e n t */ /******************************************************************************/ @@ -326,48 +464,52 @@ bool XrdSsiTaskReal::XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP) { EPNAME("TaskXeqEvent"); - static std::string pName("ReadRecovery"); - static std::string pValue("false"); - XrdCl::XRootDStatus epStatus; XrdCl::AnyObject *response = *respP; - XrdSsiRRInfo rInfo; + XrdSsiRespInfoMsg *aMsg; char *dBuff; - XrdCl::Buffer qBuff(sizeof(unsigned long long)); union {uint32_t ubRead; int ibRead;}; int dLen; XrdSsiRequest::PRD_Xeq prdVal; bool last, aOK = status->IsOK(); -// Affect proper response +// Obtain a lock and indicate the any Finish() calls should be defered until +// we return from this method. The reason is that any callback that we do here +// may precipitate a Finish() call not to mention some other thread doing so. // sessP->Lock(); + defer = true; + mhPend = false; + +// Do some debugging +// DEBUG(" sess="<<(sessP==&voidSession?"no":"ok") <<" id=" <UnLock(); + SetErrResponse("Missing response", EFAULT); } - return true; break; + return XeqEnd(true); + case isReady: break; + case isDead: if (sessP != &voidSession) - {DEBUG("Task Handler calling Recycle."); - sessP->Recycle(this); + {DEBUG("Task Handler calling TaskFinished."); sessP->UnLock(); + sessP->TaskFinished(this); } else { DEBUG("Deleting task."); sessP->UnLock(); delete this; } - return false; break; + return false; + default: cerr <<"XrdSsiTaskReal: Invalid state " <eInfo); - else rqstP->eInfo.Set("Missing response", EFAULT); + if (!aOK) XrdSsiUtils::SetErr(*status, rqstP->errInfo); + else rqstP->errInfo.Set("Missing response", EFAULT); } else { XrdCl::ChunkInfo *cInfo = 0; response->Get(cInfo); @@ -412,18 +560,21 @@ bool XrdSsiTaskReal::XeqEvent(XrdCl::XRootDStatus *status, // if (ibRead < dataRlen) {tStat = isDone; dataRlen = ibRead;} dBuff = dataBuff; - mhPend = false; last = tStat == isDone; sessP->UnLock(); DEBUG("Calling ProcessResponseData; len="<reqID;} +char *RequestID() {return rqstP->GetRequestID();} -int SetBuff(XrdSsiErrInfo &eInfo, char *buff, int blen, bool &last); +int SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen, bool &last); -bool SetBuff(XrdSsiRequest *reqP, char *buff, int blen); +bool SetBuff(XrdSsiErrInfo &eRef, char *buff, int blen); void SetTaskID(short tid) {tskID = tid;} @@ -74,24 +79,27 @@ bool XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP); XrdSsiTaskReal(XrdSsiSessReal *sP, short tid) : XrdSsiEvent("TaskReal"), - XrdSsiResponder(this, (void *)0), XrdSsiStream(XrdSsiStream::isPassive), - sessP(sP), mdResp(0), tskID(tid) + sessP(sP), mdResp(0), tskID(tid), + mhPend(false), defer(false) {} ~XrdSsiTaskReal() {if (mdResp) delete mdResp;} -void RespErr(XrdCl::XRootDStatus *status); - struct dlQ {XrdSsiTaskReal *next; XrdSsiTaskReal *prev;}; dlQ attList; -enum respType {isBad=0, isData, isStream}; +enum respType {isBad=0, isAlert, isData, isStream}; private: +bool Ask4Resp(); respType GetResp(XrdCl::AnyObject **respP, char *&dbuf, int &dlen); +bool RespErr(XrdCl::XRootDStatus *status); +bool XeqEnd(bool getLock); +XrdSysRecMutex rrMutex; +XrdSysMutex taskMutex; XrdSsiSessReal *sessP; XrdSsiRequest *rqstP; XrdCl::AnyObject *mdResp; @@ -101,5 +109,6 @@ TaskStat tStat; unsigned short tmOut; short tskID; bool mhPend; +bool defer; }; #endif diff --git a/src/XrdSsi/XrdSsiUtils.cc b/src/XrdSsi/XrdSsiUtils.cc index 7bf26d629a2..808e34c6b4e 100644 --- a/src/XrdSsi/XrdSsiUtils.cc +++ b/src/XrdSsi/XrdSsiUtils.cc @@ -31,10 +31,23 @@ #include #include +#include "XProtocol/XProtocol.hh" + +#include "Xrd/XrdScheduler.hh" + +#include "XrdCl/XrdClXRootDResponses.hh" + #include "XrdOuc/XrdOucERoute.hh" #include "XrdOuc/XrdOucErrInfo.hh" + #include "XrdSfs/XrdSfsInterface.hh" + +#include "XrdSsi/XrdSsiAtomics.hh" +#include "XrdSsi/XrdSsiErrInfo.hh" +#include "XrdSsi/XrdSsiRequest.hh" +#include "XrdSsi/XrdSsiResponder.hh" #include "XrdSsi/XrdSsiUtils.hh" + #include "XrdSys/XrdSysError.hh" /******************************************************************************/ @@ -43,11 +56,57 @@ namespace XrdSsi { -extern XrdSysError Log; +extern XrdSysError Log; +extern XrdScheduler *schedP; }; using namespace XrdSsi; +/******************************************************************************/ +/* L o c a l C l a s s e s */ +/******************************************************************************/ + +class PostError : public XrdJob, public XrdSsiResponder +{ +public: + +void DoIt() {myMutex.Lock(); + if ( isActive) SetErrResponse(eTxt, eNum); + if (!isActive) delete this; + else {isActive = false; + myMutex.UnLock(); + } + } + +virtual void Finished( XrdSsiRequest &rqstR, + const XrdSsiRespInfo &rInfo, + bool cancel=false) + {myMutex.Lock(); + if (!isActive) delete this; + else {isActive = false; + myMutex.UnLock(); + } + } + + PostError(XrdSsiRequest *rP, char *emsg, int ec) + : myMutex(XrdSsiMutex::Recursive), + reqP(rP), eTxt(emsg), eNum(ec), isActive(true) + {rP->SetMutex(&myMutex); + BindRequest(*reqP); + } + +virtual ~PostError() {myMutex.UnLock(); + if (eTxt) free(eTxt); + } + +private: +XrdSsiMutex myMutex; // Allow possible rentry via SetErrResponse() +XrdSsiRequest *reqP; +char *eTxt; +int eNum; +bool isActive; +}; + /******************************************************************************/ /* E m s g */ /******************************************************************************/ @@ -78,3 +137,73 @@ int XrdSsiUtils::Emsg(const char *pfx, // Message prefix value eDest.setErrInfo(ecode, buffer); return SFS_ERROR; } + + +/******************************************************************************/ +/* G e t E r r */ +/******************************************************************************/ + +int XrdSsiUtils::GetErr(XrdCl::XRootDStatus &Status, std::string &eText) +{ + +// If this is an xrootd error then get the xrootd generated error +// + if (Status.code == XrdCl::errErrorResponse) + {eText = Status.GetErrorMessage(); + return MapErr(Status.errNo); + } + +// Internal error, we will need to copy strings here +// + eText = Status.ToStr(); + return (Status.errNo ? Status.errNo : EFAULT); +} + +/******************************************************************************/ +/* M a p E r r */ +/******************************************************************************/ + +int XrdSsiUtils::MapErr(int xEnum) +{ + switch(xEnum) + {case kXR_NotFound: return ENOENT; + case kXR_NotAuthorized: return EACCES; + case kXR_IOError: return EIO; + case kXR_NoMemory: return ENOMEM; + case kXR_NoSpace: return ENOSPC; + case kXR_ArgTooLong: return ENAMETOOLONG; + case kXR_noserver: return EHOSTUNREACH; + case kXR_NotFile: return ENOTBLK; + case kXR_isDirectory: return EISDIR; + case kXR_FSError: return ENOSYS; + default: return ECANCELED; + } +} + +/******************************************************************************/ +/* R e t E r r */ +/******************************************************************************/ + +void XrdSsiUtils::RetErr(XrdSsiRequest &reqP, const char *eTxt, int eNum) +{ + +// Schedule an error callback +// + XrdSsi::schedP->Schedule(new PostError(&reqP, strdup(eTxt), eNum)); +} + +/******************************************************************************/ +/* S e t E r r */ +/******************************************************************************/ + +void XrdSsiUtils::SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo) +{ + +// If this is an xrootd error then get the xrootd generated error +// + if (Status.code == XrdCl::errErrorResponse) + {eInfo.Set(Status.GetErrorMessage().c_str(), MapErr(Status.errNo)); + } else { + eInfo.Set(Status.ToStr().c_str(), (Status.errNo ? Status.errNo:EFAULT)); + } +} diff --git a/src/XrdSsi/XrdSsiUtils.hh b/src/XrdSsi/XrdSsiUtils.hh index d55e695bfb2..513a04b75aa 100644 --- a/src/XrdSsi/XrdSsiUtils.hh +++ b/src/XrdSsi/XrdSsiUtils.hh @@ -29,7 +29,13 @@ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ +#include + +namespace XrdCl {class XRootDStatus;} + class XrdOucErrInfo; +class XrdSsiErrInfo; +class XrdSsiRequest; class XrdSsiUtils { @@ -41,6 +47,14 @@ static int Emsg(const char *pfx, // Message prefix value const char *path, // Operation target XrdOucErrInfo &eDest); // Plase to put error +static int GetErr(XrdCl::XRootDStatus &Status, std::string &eText); + +static int MapErr(int xEnum); + +static void RetErr(XrdSsiRequest &reqP, const char *eTxt, int eNum); + +static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo); + XrdSsiUtils() {} ~XrdSsiUtils() {} };