Skip to content

Commit

Permalink
[XrdSsi] Implement reusable resources.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Feb 23, 2017
1 parent f371071 commit ee94694
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 194 deletions.
3 changes: 3 additions & 0 deletions src/XrdSsi/XrdSsiReqAgent.hh
Expand Up @@ -41,6 +41,9 @@ static XrdSsiErrInfo &ErrInfoRef(XrdSsiRequest *rP) {return rP->errInfo;}

static XrdSsiRespInfo *RespP(XrdSsiRequest *rP) {return &(rP->Resp);}

static void SetNode(XrdSsiRequest *rP, const char *name)
{rP->epNode = name;}

static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
{rP->rrMutex = mP;}
};
Expand Down
21 changes: 17 additions & 4 deletions src/XrdSsi/XrdSsiRequest.hh
Expand Up @@ -61,6 +61,8 @@
//! ProcessResponse() Initial response: Mandatary
//! ProcessResponseData() Data response: Mandatory only if response data is
//! asynchronously received.
//!
//! All callbacks are invoked with no locks outstanding unless otherwise noted.
//-----------------------------------------------------------------------------

class XrdSsiPacer;
Expand Down Expand Up @@ -115,6 +117,19 @@ virtual void Alert(XrdSsiRespInfoMsg &aMsg) {aMsg.RecycleMsg(false);}

inline uint32_t GetDetachTTL() {return detTTL;}

//-----------------------------------------------------------------------------
//! Obtain the enpoint host name.
//!
//! @return =0 Host name not available. Typically, the host name will be
//! available on the first callback to this object.
//! @return !0 Pointer to the host name. The name is valid until Finished().
//-----------------------------------------------------------------------------

inline
const char *GetEndPoint() {XrdSsiMutexMon(rrMutex);
return epNode;
}

//-----------------------------------------------------------------------------
//! Obtain the metadata associated with a response.
//!
Expand All @@ -138,7 +153,6 @@ const char *GetMetadata(int &dlen)
//! Obtain the request data sent by a client.
//!
//! This method is duplicated in XrdSsiResponder to allow calling consistency.
//! This method may be called with the object's recursive mutex unlocked!
//!
//! @param dlen holds the length of the request after the call.
//!
Expand Down Expand Up @@ -201,8 +215,6 @@ virtual bool ProcessResponse(const XrdSsiErrInfo &eInfo,
//! stream object after a successful GetResponseData() or an asynchronous
//! stream SetBuff() call.
//!
//! Note: This method is called with the object's recursive mutex locked.
//!
//! @param eInfo Error information. You can check if an error occured using
//! eInfo.hasError() or eInfo.isOK().
//! @param buff Pointer to the buffer given to XrdSsiStream::SetBuff().
Expand Down Expand Up @@ -295,7 +307,7 @@ static RDR_Info RestartDataResponse(RDR_How rhow, const char *reqid=0);

XrdSsiRequest(const char *reqid=0, uint16_t tmo=0)
: reqID(reqid), rrMutex(0),
nextRequest(0), theRespond(0), thePacer(0),
theRespond(0), thePacer(0), epNode(0),
detTTL(0), tOut(0) {}

protected:
Expand Down Expand Up @@ -368,6 +380,7 @@ XrdSsiResponder *theRespond; // Set via XrdSsiResponder::BindRequest()
XrdSsiRespInfo Resp; // Set via XrdSsiResponder::SetResponse()
XrdSsiErrInfo errInfo;
XrdSsiPacer *thePacer;
const char *epNode;
uint32_t detTTL;
uint16_t tOut;
};
Expand Down
4 changes: 2 additions & 2 deletions src/XrdSsi/XrdSsiResource.hh
Expand Up @@ -59,9 +59,9 @@ Affinity affinity;//!< Resource affinity

uint32_t rOpts; //!< Resource options. One or more of he following:
static
const uint32_t Reusable;//!> Resource context may be cached and reused
const uint32_t Reusable= 1;//!> Resource context may be cached and reused
static
const uint32_t Discard; //!> Discard cached resource if it exists
const uint32_t Discard = 2;//!> Discard cached resource if it exists

//-----------------------------------------------------------------------------
//! Constructor
Expand Down
18 changes: 17 additions & 1 deletion src/XrdSsi/XrdSsiScale.hh
Expand Up @@ -63,7 +63,23 @@ int getEnt() {entMutex.Lock();
return -1;
}

void retEnt(int xEnt) {entMutex.Lock(); pendCnt[xEnt]--; entMutex.UnLock();}
void retEnt(int xEnt) {if (xEnt >= 0 && xEnt < maxEnt)
{entMutex.Lock();
if (pendCnt[xEnt]) pendCnt[xEnt]--;
entMutex.UnLock();
}
}

bool rsvEnt(int xEnt) {if (xEnt < 0 && xEnt >= maxEnt) return false;
entMutex.Lock();
if (pendCnt[nowEnt] < maxPend)
{pendCnt[nowEnt]++;
entMutex.UnLock();
return true;
}
entMutex.UnLock();
return false;
}

XrdSsiScale() : nowEnt(0) {memset(pendCnt, 0, sizeof(uint16_t)*maxEnt);}
~XrdSsiScale() {}
Expand Down
84 changes: 73 additions & 11 deletions src/XrdSsi/XrdSsiServReal.cc
Expand Up @@ -166,8 +166,13 @@ bool XrdSsiServReal::GenURL(XrdSsiResource *rP, char *buff, int blen, int uEnt)
void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef,
XrdSsiResource &resRef)
{
XrdSsiSessReal *sObj;
static const uint32_t useCache = XrdSsiResource::Reusable
| XrdSsiResource::Discard;
XrdSysMutexHelper mHelp;
XrdSsiSessReal *sObj;
std::string resKey;
int uEnt;
bool hold = (resRef.rOpts & XrdSsiResource::Reusable) != 0;
char epURL[4096];

// Validate the resource name
Expand All @@ -177,6 +182,14 @@ void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef,
return;
}

// Check if this is a reusable resource. Reusable resources are a bit more
// complicated to pull off. In any case, we need to hold the cache lock.
//
if (resRef.rOpts & useCache)
{mHelp.Lock(&rcMutex);
if (ResReuse(reqRef, resRef, resKey)) return;
}

// Get a sid entry number
//
if ((uEnt = sidScale.getEnt()) < 0)
Expand All @@ -192,43 +205,92 @@ void XrdSsiServReal::ProcessRequest(XrdSsiRequest &reqRef,
return;
}

// Obtain a new session object (note the first request uses the session mutex)
// Obtain a new session object
//
if (!(sObj = Alloc(resRef.rName.c_str(), uEnt)))
if (!(sObj = Alloc(resRef.rName.c_str(), uEnt, hold)))
{XrdSsiUtils::RetErr(reqRef, "Insufficient memory.", ENOMEM);
sidScale.retEnt(uEnt);
return;
} else XrdSsiReqAgent::SetMutex(&reqRef, sObj->MutexP());
}

// Now just provision this resource which will execute the request should it
// be successful.
// be successful. If Provision() fails, we need to delete the session object
// because its file object now is in an usable state (funky client interface).
//
if (!(sObj->Provision(&reqRef, epURL))) Recycle(sObj);
if (!(sObj->Provision(&reqRef, epURL))) Recycle(sObj, false);

// If this was started with a reusable resource, put the session in the cache.
// The resource key was constructed by the call to ResReuse() and teh cache
// mutex is still held at this point (will be released upon return).
//
if (hold) resCache[resKey] = sObj;
}

/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/

void XrdSsiServReal::Recycle(XrdSsiSessReal *sObj)
void XrdSsiServReal::Recycle(XrdSsiSessReal *sObj, bool reuse)
{
EPNAME("Recycle");
static const char *tident = "ServReal";

// Clear all pending events (likely not needed)
//
sObj->ClrEvent();

// Add to queue unless we have too many of these
//
myMutex.Lock();
actvSes--;
DEBUG("Sessions: free=" <<freeCnt <<" active=" <<actvSes);
if (freeCnt >= freeMax) {myMutex.UnLock(); delete sObj;}
else {sObj->ClrEvent();
sObj->nextSess = freeSes;
DEBUG("reuse=" <<reuse <<"; sessions: free=" <<freeCnt <<" active=" <<actvSes);
if (!reuse || freeCnt >= freeMax) {myMutex.UnLock(); delete sObj;}
else {sObj->nextSess = freeSes;
freeSes = sObj;
freeCnt++;
myMutex.UnLock();
}
}

/******************************************************************************/
/* Private: R e s R e u s e */
/******************************************************************************/

// Called with rcMutex held!

bool XrdSsiServReal::ResReuse(XrdSsiRequest &reqRef,
XrdSsiResource &resRef,
std::string &resKey)
{
std::map<std::string, XrdSsiSessReal *>::iterator it;
XrdSsiSessReal *sesP;

// Construct lookup key
//
resKey.reserve(resRef.rUser.size() + resRef.rName.size() + 2);
resKey = resRef.rUser;
resKey += "@";
resKey += resRef.rName;

// Find the cache entry
//
it = resCache.find(resKey);
if (it == resCache.end()) return false;

// Entry found, check if this session can actually be reused
//
sesP = it->second;
if (resRef.rOpts & XrdSsiResource::Discard || !sesP->Run(&reqRef))
{resCache.erase(it);
sesP->UnHold();
return false;
}

// All done, the request should have been sent off via Reusable() call.
//
return true;
}

/******************************************************************************/
/* S t o p */
/******************************************************************************/
Expand Down
12 changes: 10 additions & 2 deletions src/XrdSsi/XrdSsiServReal.hh
Expand Up @@ -29,6 +29,9 @@
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/

#include <map>
#include <string>

#include "XrdSsi/XrdSsiService.hh"
#include "XrdSys/XrdSysPthread.hh"

Expand All @@ -41,7 +44,7 @@ public:

void ProcessRequest(XrdSsiRequest &reqRef, XrdSsiResource &resRef);

void Recycle(XrdSsiSessReal *sObj);
void Recycle(XrdSsiSessReal *sObj, bool reuse);

bool Stop();

Expand All @@ -52,8 +55,13 @@ bool Stop();
~XrdSsiServReal();
private:

XrdSsiSessReal *Alloc(const char *sName, int uent, bool hold=false);
XrdSsiSessReal *Alloc(const char *sName, int uent, bool hold);
bool GenURL(XrdSsiResource *rP, char *buff, int blen, int uEnt);
bool ResReuse(XrdSsiRequest &reqRef, XrdSsiResource &resRef,
std::string &resKey);

std::map<std::string, XrdSsiSessReal *> resCache;
XrdSysMutex rcMutex;

char *manNode;
XrdSysMutex myMutex;
Expand Down

0 comments on commit ee94694

Please sign in to comment.