Skip to content

Commit

Permalink
si] Additional bug fixes and improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Feb 12, 2017
1 parent 38c643a commit 1b3ab4e
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 77 deletions.
1 change: 1 addition & 0 deletions src/XrdSsi.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ XrdSsi/XrdSsiPacer.cc XrdSsi/XrdSsiPacer.hh
XrdSsi/XrdSsiProvider.hh
XrdSsi/XrdSsiRRInfo.hh
XrdSsi/XrdSsiRRTable.hh
XrdSsi/XrdSsiReqAgent.hh
XrdSsi/XrdSsiRequest.cc XrdSsi/XrdSsiRequest.hh
XrdSsi/XrdSsiResponder.hh
XrdSsi/XrdSsiResource.hh
Expand Down
54 changes: 35 additions & 19 deletions src/XrdSsi/XrdSsiFileReq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "XrdSsi/XrdSsiFileReq.hh"
#include "XrdSsi/XrdSsiFileResource.hh"
#include "XrdSsi/XrdSsiFileSess.hh"
#include "XrdSsi/XrdSsiReqAgent.hh"
#include "XrdSsi/XrdSsiService.hh"
#include "XrdSsi/XrdSsiSfs.hh"
#include "XrdSsi/XrdSsiStream.hh"
Expand Down Expand Up @@ -122,7 +123,7 @@ void XrdSsiFileReq::Activate(XrdOucBuffer *oP, XrdSfsXioHandle *bR, int rSz)
void XrdSsiFileReq::Alert(XrdSsiRespInfoMsg &aMsg)
{
EPNAME("Alert");
const XrdSsiRespInfo *rP = RespP();
const XrdSsiRespInfo *rP = XrdSsiReqAgent::RespP(this);
XrdSsiAlert *aP;
int msgLen;

Expand Down Expand Up @@ -204,7 +205,7 @@ XrdSsiFileReq *XrdSsiFileReq::Alloc(XrdOucErrInfo *eiP,
}

/******************************************************************************/
/* B i n d D o n e */
/* Private: B i n d D o n e */
/******************************************************************************/

// This is called with frqMutex locked!
Expand Down Expand Up @@ -247,22 +248,22 @@ void XrdSsiFileReq::BindDone()
void XrdSsiFileReq::DoIt()
{
EPNAME("DoIt");
XrdSsiMutexMon mHelper(frqMutex);
bool cancel;

// Processing is determined by the responder's state. Only listed states are
// valid. Others should never occur in this context.
//
frqMutex.Lock();
switch(urState)
{case isNew: myState = xqReq; urState = isBegun;
DEBUGXQ("Calling service processor");
mHelper.UnLock();
frqMutex.UnLock();
Service->ProcessRequest((XrdSsiRequest &)*this,
(XrdSsiFileResource &)*fileR);
return;
break;
case isAbort: DEBUGXQ("Skipped calling service processor");
mHelper.UnLock();
frqMutex.UnLock();
Recycle();
return;
break;
Expand All @@ -271,9 +272,7 @@ void XrdSsiFileReq::DoIt()
Finished(cancel);
if (respWait) WakeUp();
if (finWait) finWait->Post();
if (cancel && !(RespP()->rType)) isPerm = true;
mHelper.UnLock();
Recycle();
frqMutex.UnLock();
return;
break;
default: break;
Expand All @@ -282,6 +281,7 @@ void XrdSsiFileReq::DoIt()
// If we get here then we have an invalid state. Report it but otherwise we
// can't really do anything else. This means some memory may be lost.
//
frqMutex.UnLock();
Log.Emsg(epname, tident, "Invalid req/rsp state; giving up on object!");
}

Expand Down Expand Up @@ -312,14 +312,16 @@ void XrdSsiFileReq::Done(int &retc, XrdOucErrInfo *eiP, const char *name)

// Do some debugging
//
DEBUGXQ("wtrsp sent; resp "<<(RespP()->rType ? "here" : "pend"));
DEBUGXQ("wtrsp sent; resp "
<<(XrdSsiReqAgent::RespP(this)->rType ? "here" : "pend"));

// We are invoked when sync() waitresp has been sent, check if a response was
// posted while this was going on. If so, make sure to send a wakeup. Note
// that the respWait flag is at this moment false as this is called in the
// sync response path for fctl() and the response may have been posted.
//
if (RespP()->rType == XrdSsiRespInfo::isNone) respWait = true;
if (XrdSsiReqAgent::RespP(this)->rType == XrdSsiRespInfo::isNone)
respWait = true;
else WakeUp();
}

Expand Down Expand Up @@ -432,9 +434,7 @@ void XrdSsiFileReq::Finalize()
DEBUGXQ("Calling Finished(" <<cancel <<')');
Finished(cancel);
if (respWait) WakeUp();
if (cancel && !(RespP()->rType)) isPerm = true;
mHelper.UnLock();
Recycle();
return;
break;

Expand Down Expand Up @@ -500,9 +500,8 @@ void XrdSsiFileReq::Init(const char *cID)
schedDone = false;
respWait = false;
strmEOF = false;
isPerm = false;
isEnding = false;
SetMutex(&frqMutex);
XrdSsiReqAgent::SetMutex(this, &frqMutex);
}

/******************************************************************************/
Expand Down Expand Up @@ -577,7 +576,7 @@ XrdSfsXferSize XrdSsiFileReq::Read(bool &done, // Out
{
static const char *epname = "read";
XrdSfsXferSize nbytes;
XrdSsiRespInfo const *Resp = RespP();
XrdSsiRespInfo const *Resp = XrdSsiReqAgent::RespP(this);

// A read should never be issued unless a response has been set
//
Expand Down Expand Up @@ -726,7 +725,7 @@ void XrdSsiFileReq::Recycle()
//
aqMutex.Lock();
if (tident) {free(tident); tident = 0;}
if (freeCnt >= freeMax && !isPerm) {aqMutex.UnLock(); delete this;}
if (freeCnt >= freeMax) {aqMutex.UnLock(); delete this;}
else {nextReq = freeReq;
freeReq = this;
freeCnt++;
Expand Down Expand Up @@ -761,7 +760,7 @@ void XrdSsiFileReq::RelRequestBuffer()
int XrdSsiFileReq::Send(XrdSfsDio *sfDio, XrdSfsXferSize blen)
{
static const char *epname = "send";
XrdSsiRespInfo const *Resp = RespP();
XrdSsiRespInfo const *Resp = XrdSsiReqAgent::RespP(this);
XrdOucSFVec sfVec[2];
int rc;

Expand Down Expand Up @@ -876,6 +875,23 @@ int XrdSsiFileReq::sendStrmA(XrdSsiStream *strmP,
return Emsg(epname, rc, "send");
}

/******************************************************************************/
/* U n b i n d */
/******************************************************************************/

void XrdSsiFileReq::Unbind(XrdSsiResponder *respP) // Caled with frqMutex unlocked!
{
EPNAME("Unbind");

// Do some debugging
//
DEBUGXQ("Recycling request...");

// Simply recycle the object
//
Recycle();
}

/******************************************************************************/
/* W a n t R e s p o n s e */
/******************************************************************************/
Expand All @@ -894,7 +910,7 @@ bool XrdSsiFileReq::WantResponse(XrdOucErrInfo &eInfo)
// Serialize the remainder of this code
//
frqMon.Lock(frqMutex);
rspP = RespP();
rspP = XrdSsiReqAgent::RespP(this);

// If we have a pending alert then we need to send it now. Suppress the callback
// as we will recycle the alert on the next call (there should be one).
Expand Down Expand Up @@ -939,7 +955,7 @@ void XrdSsiFileReq::WakeUp(XrdSsiAlert *aP) // Called with frqMutex locked!
EPNAME("WakeUp");
XrdOucErrInfo *wuInfo =
new XrdOucErrInfo(tident,(XrdOucEICB *)0,respCBarg);
const XrdSsiRespInfo *rspP = RespP();
const XrdSsiRespInfo *rspP = XrdSsiReqAgent::RespP(this);
int respCode = SFS_DATAVEC;

// Do some debugging
Expand Down
5 changes: 2 additions & 3 deletions src/XrdSsi/XrdSsiFileReq.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ static XrdSsiFileReq *Alloc(XrdOucErrInfo *eP, XrdSsiFileResource *rP,
XrdSsiFileSess *fP, const char *sn,
const char *id, int rnum);

void BindDone();

void Finalize();

using XrdSsiRequest::Finished;
Expand Down Expand Up @@ -117,6 +115,7 @@ enum rspState {isNew=0, isBegun, isBound, isAbort, isDone, isMax};

private:

void BindDone(); // Override
int Emsg(const char *pfx, int ecode, const char *op);
int Emsg(const char *pfx, XrdSsiErrInfo &eObj,
const char *op);
Expand All @@ -128,6 +127,7 @@ XrdSfsXferSize readStrmP(XrdSsiStream *strmP, char *buff,
int sendStrmA(XrdSsiStream *strmP, XrdSfsDio *sfDio,
XrdSfsXferSize blen);
void Recycle();
void Unbind(XrdSsiResponder *respP); // Override
void WakeUp(XrdSsiAlert *aP=0);

static XrdSysMutex aqMutex;
Expand Down Expand Up @@ -165,7 +165,6 @@ int reqID;
bool respWait;
bool strmEOF;
bool schedDone;
bool isPerm;
bool isEnding;
char rID[8];
};
Expand Down
2 changes: 1 addition & 1 deletion src/XrdSsi/XrdSsiRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool XrdSsiRequest::Finished(bool cancel)
// Obtain the responder
//
rrMutex->Lock();
if ((respP = theRespond)) theRespond->reqP = 0;
respP = theRespond;
theRespond = 0;
rrMutex->UnLock();

Expand Down
38 changes: 10 additions & 28 deletions src/XrdSsi/XrdSsiRequest.hh
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class XrdSsiResponder;
class XrdSsiRequest
{
public:
friend class XrdSsiReqAgent;
friend class XrdSsiResponder;
friend class XrdSsiTaskReal;

//-----------------------------------------------------------------------------
//! @brief Send or receive a server generated alert.
Expand All @@ -82,8 +82,8 @@ friend class XrdSsiTaskReal;
//!
//! @param aMsg Reference to the message object containing the alert message.
//! Non-positive alert lengths cause the alert call to be
//! ignored. You should call the message Recycle() method once
//! you have consumed the message to release its resources.
//! ignored. You should call the message RecycleMsg() method
//! once you have consumed the message to release its resources.
//-----------------------------------------------------------------------------

virtual void Alert(XrdSsiRespInfoMsg &aMsg) {aMsg.RecycleMsg(false);}
Expand Down Expand Up @@ -294,23 +294,12 @@ static RDR_Info RestartDataResponse(RDR_How rhow, const char *reqid=0);
//-----------------------------------------------------------------------------

XrdSsiRequest(const char *reqid=0, uint16_t tmo=0)
: rrMutex(0), reqID(reqid),
: reqID(reqid), rrMutex(0),
nextRequest(0), theRespond(0), thePacer(0),
detTTL(0), tOut(0) {}

// The following are for internal use only!
//
void SetMutex(XrdSsiMutex *mP) {rrMutex = mP;}

protected:

//-----------------------------------------------------------------------------
//! Notify the underlying object that the request was bound to a responder.
//! This method is meant for server-side internal use only.
//-----------------------------------------------------------------------------

virtual void BindDone() {}

//-----------------------------------------------------------------------------
//! Release the request buffer. Use this method to optimize storage use; this
//! is especially relevant for long-running requests. If the request buffer
Expand All @@ -319,8 +308,8 @@ virtual void BindDone() {}
//!
//!
//! Note: This method is called with the object's recursive mutex locked when
//! it is invoked via XrdSsiResponder's ReleaseRequestBuffer() which is
//! the only proper way of invoking this method.
//! it is invoked via XrdSsiResponder's ReleaseRequestBuffer() or the
//! one defined in this class.
//-----------------------------------------------------------------------------

virtual void RelRequestBuffer() {}
Expand Down Expand Up @@ -361,26 +350,19 @@ inline void SetDetachTTL(uint32_t dttl) {detTTL = dttl;}

virtual ~XrdSsiRequest() {}

//-----------------------------------------------------------------------------
//! Get a pointer to the RespInfo structure. This is meant to be used by
//! classes that inherit this class to simplify response handling.
//!
//! @return Pointer to the RespInfo structure.
//-----------------------------------------------------------------------------
inline
const XrdSsiRespInfo *RespP() {return &Resp;}

//-----------------------------------------------------------------------------
//! The following mutex is used to serialize acccess to the request object.
//! It can also be used to serialize access to the underlying object.
//-----------------------------------------------------------------------------

XrdSsiMutex *rrMutex;
const char *reqID;

private:
virtual void BindDone() {}
bool CopyData(char *buff, int blen);
virtual void Unbind(XrdSsiResponder *respP) {}

const char *reqID;
XrdSsiMutex *rrMutex;
XrdSsiRequest *nextRequest;
XrdSsiResponder *theRespond; // Set via XrdSsiResponder::BindRequest()
XrdSsiRespInfo Resp; // Set via XrdSsiResponder::SetResponse()
Expand Down
34 changes: 24 additions & 10 deletions src/XrdSsi/XrdSsiResponder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@

#define SSI_VAL_RESPONSE(rX) rrMutex->Lock();\
XrdSsiRequest *rX = reqP;\
if (!rX)\
if (!rX || rX->theRespond != this)\
{rrMutex->UnLock(); return notActive;}\
reqP = 0;\
if (rX->theRespond != this)\
{rrMutex->UnLock(); return notActive;}
if (rX->Resp.rType)\
{rrMutex->UnLock(); return notPosted;}

#define SSI_XEQ_RESPONSE(rX) rrMutex->UnLock();\
return (rX->ProcessResponse(rX->errInfo,rX->Resp)\
Expand Down Expand Up @@ -102,6 +101,25 @@ inline void BindRequest(XrdSsiRequest &rqstR)
rqstR.BindDone();
}

//-----------------------------------------------------------------------------
//! Unbind this responder from the request object it is bound to.
//!
//! @return true Request successfully unbound.
//! false UnBindRequest already called or called prior to Finish().
//-----------------------------------------------------------------------------

inline bool UnBindRequest() {rrMutex->Lock();
XrdSsiRequest *rP = reqP;
if (reqP && reqP->theRespond == 0)
{reqP = 0;
rrMutex->UnLock();
rP->Unbind(this);
return true;
}
rrMutex->UnLock();
return false;
}

protected:

//-----------------------------------------------------------------------------
Expand All @@ -124,7 +142,7 @@ inline void Alert(XrdSsiRespInfoMsg &aMsg)
//! allows the responder to release any resources given to the request object
//! (e.g. data response buffer or a stream). Upon return the object is owned by
//! the request object's creator who is responsible for releaasing or recycling
//! the object. This method is automatically invoked by XrdSsiRequest::Finish().
//! the object. This method is invoked by XrdSsiRequest::Finished().
//!
//! @param rqstP reference to the object describing the request.
//! @param rInfo reference to the object describing the response.
Expand All @@ -141,7 +159,7 @@ virtual void Finished( XrdSsiRequest &rqstR,
//-----------------------------------------------------------------------------
//! Obtain the request data sent by a client.
//!
//! Note: This method may be called with the object's recursive mutex unlocked!
//! Note: This method is called with the object's recursive mutex unlocked!
//!
//! @param dlen holds the length of the request after the call.
//!
Expand Down Expand Up @@ -274,7 +292,6 @@ inline Status SetResponse(XrdSsiStream *strmP)
SSI_XEQ_RESPONSE(rP);
}


//-----------------------------------------------------------------------------
//! This class is meant to be inherited by an object that will actually posts
//! responses.
Expand All @@ -291,9 +308,6 @@ protected:

virtual ~XrdSsiResponder() {}

private:
inline void Unbind() {rrMutex->Lock(); reqP = 0; rrMutex->UnLock();}

XrdSsiMutex *rrMutex;
XrdSsiRequest *reqP;
};
Expand Down
Loading

0 comments on commit 1b3ab4e

Please sign in to comment.