Skip to content

Commit

Permalink
[Server] Implement list with data, responde optimization, bug fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 authored and osschar committed Oct 10, 2019
1 parent 64cbb67 commit 9c34165
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 32 deletions.
3 changes: 2 additions & 1 deletion src/XProtocol/XProtocol.hh
Expand Up @@ -458,7 +458,8 @@ struct ClientFattrRequest {

// Valid options:
//
static const int isNew = 0x01; // For set, the variable must not exist
static const int isNew = 0x01; // For set, the variable must not exist
static const int aData = 0x10; // For list, return attribute value
};
struct ClientGetfileRequest {
kXR_char streamid[2];
Expand Down
55 changes: 46 additions & 9 deletions src/XrdOfs/XrdOfsFSctl.cc
Expand Up @@ -253,7 +253,6 @@ int XrdOfs::fsctl(const int cmd,
int retc, i, blen, privs, opcode = cmd & SFS_FSCTL_CMD;
const char *tident = einfo.getErrUser();
char *bP, *cP;
XTRACE(fsctl, args, "");

// Process the LOCATE request
//
Expand All @@ -269,6 +268,8 @@ int XrdOfs::fsctl(const int cmd,
int find_flag = SFS_O_LOCATE | (cmd & locMask);
XrdOucEnv loc_Env(opq ? opq+1 : 0,0,client);

ZTRACE(fsctl, "locate args=" <<(args ? args : "''"));

if (cmd & SFS_O_TRUNC) locArg = (char *)"*";
else { if (*Path == '*') {locArg = Path; Path++;}
else locArg = Path;
Expand Down Expand Up @@ -303,6 +304,7 @@ int XrdOfs::fsctl(const int cmd,
{char pbuff[1024];
const char *opq, *Path = Split(args, &opq, pbuff, sizeof(pbuff));
XrdOucEnv fs_Env(opq ? opq+1 : 0,0,client);
ZTRACE(fsctl, "statfs args=" <<(args ? args : "''"));
AUTHORIZE(client,0,AOP_Stat,"statfs",Path,einfo);
if (Finder && Finder->isRemote()
&& (retc = Finder->Space(einfo, Path, &fs_Env)))
Expand All @@ -320,6 +322,7 @@ int XrdOfs::fsctl(const int cmd,
{char pbuff[1024];
const char *opq, *Path = Split(args, &opq, pbuff, sizeof(pbuff));
XrdOucEnv statls_Env(opq ? opq+1 : 0,0,client);
ZTRACE(fsctl, "statls args=" <<(args ? args : "''"));
AUTHORIZE(client,0,AOP_Stat,"statfs",Path,einfo);
if (Finder && Finder->isRemote())
{statls_Env.Put("cms.qvfs", "1");
Expand All @@ -341,6 +344,7 @@ int XrdOfs::fsctl(const int cmd,
{char pbuff[1024];
const char *opq, *Path = Split(args, &opq, pbuff, sizeof(pbuff));
XrdOucEnv xa_Env(opq ? opq+1 : 0,0,client);
ZTRACE(fsctl, "statxa args=" <<(args ? args : "''"));
AUTHORIZE(client,0,AOP_Stat,"statxa",Path,einfo);
if (Finder && Finder->isRemote()
&& (retc = Finder->Locate(einfo,Path,SFS_O_RDONLY|SFS_O_STAT,&xa_Env)))
Expand All @@ -366,6 +370,7 @@ int XrdOfs::fsctl(const int cmd,
if (opcode == SFS_FSCTL_STATCC)
{static const int lcc_flag = SFS_O_LOCATE | SFS_O_LOCAL;
XrdOucEnv lcc_Env(0,0,client);
ZTRACE(fsctl, "statcc args=" <<(args ? args : "''"));
if (Finder) retc = Finder ->Locate(einfo,".",lcc_flag,&lcc_Env);
else if (Balancer) retc = Balancer->Locate(einfo,".",lcc_flag,&lcc_Env);
else retc = SFS_ERROR;
Expand All @@ -376,7 +381,8 @@ int XrdOfs::fsctl(const int cmd,
// Process the FATTR request.
//
if (opcode == SFS_FSCTL_FATTR)
{if (args) return ctlFAttr((XrdSfsFACtl &)*args, einfo, client);
{ZTRACE(fsctl, "fattr args=" <<(args ? "action" : "info"));
if (args) return ctlFAttr((XrdSfsFACtl &)*args, einfo, client);
else {XrdOucEnv *envP = einfo.getEnv();
if (!envP || !usxMaxNsz) return SFS_ERROR;
envP->PutInt("usxMaxNsz", usxMaxNsz);
Expand Down Expand Up @@ -503,9 +509,10 @@ int XrdOfs::ctlFALst(XrdSfsFACtl &faCtl, XrdOucEnv &faEnv, XrdOucErrInfo &einfo)
{
EPNAME("ctlFALst");
XrdSysXAttr::AList *alP, *aEnt;
char *nP;
int rc, pfLen, iX = 0, faSize = 0;
int getMsz = (faCtl.opts & XrdSfsFACtl::retvsz) != 0;
char *nP, *bP;
int bL, rc, pfLen, iX = 0, faSize = 0, fvSize = 0;
bool getMsz = (faCtl.opts & XrdSfsFACtl::retvsz) == XrdSfsFACtl::retvsz;
bool getVal = (faCtl.opts & XrdSfsFACtl::retval) == XrdSfsFACtl::retval;
bool xPlode = (faCtl.opts & XrdSfsFACtl::xplode) != 0;

// Get all of the attribute names
Expand Down Expand Up @@ -556,20 +563,50 @@ int XrdOfs::ctlFALst(XrdSfsFACtl &faCtl, XrdOucEnv &faEnv, XrdOucErrInfo &einfo)
{faCtl.info[iX].Name = nP;
faCtl.info[iX].NLen = aEnt->Nlen - pfLen;
faCtl.info[iX].VLen = aEnt->Vlen;
if (getVal && aEnt->Vlen) faCtl.info[iX].Value = aEnt->Name;
fvSize += aEnt->Vlen;
iX++;
}
nP += aEnt->Nlen-pfLen+1;
}
aEnt = aEnt->Next;
}

{faCtl.info = 0;
faCtl.iNum = 0;
// If we don't need to return values, we are done
//
if (!getVal)
{XrdSysFAttr::Xat->Free(alP);
return SFS_OK;
}
faCtl.fabP->dlen = nP - faCtl.fabP->data;

// Finish up
// Allocate a buffer to hold all of the values
//
if (!GetFABuff(faCtl, fvSize))
{XrdSysFAttr::Xat->Free(alP);
return SetNoMem(faCtl, 0);
}

// Setup to retrieve attributes
//
bP = faCtl.fabP->data;
bL = faCtl.fabP->dlen;

// Retrieve the attribute values
//
for (unsigned int i = 0; i < faCtl.iNum; i++)
{if (faCtl.info[i].VLen)
{nP = faCtl.info[i].Name;
faCtl.info[i].Name = faCtl.info[i].Value;
faCtl.info[i].Value = 0;
if (!GetFAVal(faCtl, bP, bL, i) && !GulpFAVal(faCtl, bP, bL, i))
{XrdSysFAttr::Xat->Free(alP);
return SetNoMem(faCtl, i);
}
faCtl.info[i].Name = nP;
}
}

// Free up the buffer list and return success
//
XrdSysFAttr::Xat->Free(alP);
return SFS_OK;
Expand Down
1 change: 1 addition & 0 deletions src/XrdSfs/XrdSfsFAttr.hh
Expand Up @@ -87,6 +87,7 @@ static const int accChk = 0x01; //!< Perform access check
static const int newAtr = 0x02; //!< For set the attribute must not exist
static const int xplode = 0x04; //!< Construct an info vec from faList
static const int retvsz = 0x0c; //!< Above plus return size of attr value
static const int retval = 0x1c; //!< Above plus return actual attr value

XrdSfsFACtl(const char *p, const char *opq, int anum)
: path(p), pcgi(opq), info(0), envP(0), fabP(0),
Expand Down
2 changes: 1 addition & 1 deletion src/XrdSys/XrdSysFAttr.cc
Expand Up @@ -143,7 +143,7 @@ XrdSysFAttr::AList *XrdSysFAttr::getEnt(const char *Path, int fd,

// Get the data size of this attribute if so wanted
//
if (!n || (msP && !(sz = Get(Aname, 0, 0, Path, fd)))) return 0;
if (!n || (msP && (sz = Get(Aname, 0, 0, Path, fd)) < 0)) return 0;

// Allocate a new dynamic struct
//
Expand Down
1 change: 1 addition & 0 deletions src/XrdXrootd/XrdXrootdProtocol.hh
Expand Up @@ -235,6 +235,7 @@ static int xlimit(XrdOucStream &Config);
int faALen, int faCode, bool doAChk);
int XeqFADel(XrdSfsFACtl &ctl, char *faVars, int faVLen);
int XeqFAGet(XrdSfsFACtl &ctl, char *faVars, int faVLen);
int XeqFALsd(XrdSfsFACtl &ctl);
int XeqFALst(XrdSfsFACtl &ctl);
int XeqFASet(XrdSfsFACtl &ctl, char *faVars, int faVLen);

Expand Down
134 changes: 113 additions & 21 deletions src/XrdXrootd/XrdXrootdXeqFAttr.cc
Expand Up @@ -66,8 +66,6 @@ bool verr; // True if a value is in error, otherwise it's the name
vnsz(0), iNum(anum), iEnd(0), verr(false) {}
~faCTL() {if (info) delete [] info;}
};

static const int iovNum = 16;
}

#define CRED (const XrdSecEntity *)Client
Expand Down Expand Up @@ -171,6 +169,29 @@ void FillRC(kXR_char *faRC, XrdSfsFAInfo *info, int inum)
}
}

/******************************************************************************/
/* I O V e c */
/******************************************************************************/

namespace
{
class IOVec
{
public:
struct iovec *Alloc(int &num)
{if (num > IOV_MAX) num = IOV_MAX;
theIOV = new struct iovec[num];
return theIOV;
}

IOVec() : theIOV(0) {}
~IOVec() {if (theIOV) delete [] theIOV;}

private:
struct iovec *theIOV;
};
}

/******************************************************************************/
/* S e n d E r r */
/******************************************************************************/
Expand Down Expand Up @@ -287,8 +308,8 @@ int XrdXrootdProtocol::ProcFAttr(char *faPath, char *faCgi, char *faArgs,

// Prevalidate the number of attributes (list must have zero)
//
if ((faCode == kXR_fattrList && fNumAttr != 0)
|| (faCode != kXR_fattrList && fNumAttr > kXR_faMaxVars))
if ((faCode == kXR_fattrList && fNumAttr != 0)
|| (faCode != kXR_fattrList && (fNumAttr == 0 || fNumAttr > kXR_faMaxVars)))
return Response.Send( kXR_ArgInvalid, "fattr numattr is invalid");

// Allocate an SFS control object now
Expand Down Expand Up @@ -363,11 +384,11 @@ int XrdXrootdProtocol::XeqFADel(XrdSfsFACtl &ctl, char *faVars, int faVLen)
int XrdXrootdProtocol::XeqFAGet(XrdSfsFACtl &ctl, char *faVars, int faVLen)
{
XrdOucErrInfo eInfo(Link->ID, Monitor.Did, clientPV);
struct iovec iov[iovNum];
kXR_int32 fasz[iovNum];
IOVec iovHelper;
struct iovec *iov;
kXR_char faRC[2];
XResponseType rcode;
int k, rc, dlen;
int k, rc, dlen, vLen;

// Set correct subcode
//
Expand All @@ -382,6 +403,11 @@ int XrdXrootdProtocol::XeqFAGet(XrdSfsFACtl &ctl, char *faVars, int faVLen)
//
FillRC(faRC, ctl.info, ctl.iNum);

// Allocate an iovec. We need two elements for each info entry.
//
int iovNum = ctl.iNum*2+3;
iov = iovHelper.Alloc(iovNum);

// Prefill the io vector (number of errors, vars, followed the rc-names
//
iov[1].iov_base = faRC;
Expand All @@ -394,16 +420,17 @@ int XrdXrootdProtocol::XeqFAGet(XrdSfsFACtl &ctl, char *faVars, int faVLen)
// Return the value for for each variable, segment the response, if need be
//
for (int i = 0; i < ctl.iNum; i++)
{iov[k ].iov_base = &fasz[i];
iov[k++].iov_len = sizeof(fasz[0]);
dlen += sizeof(fasz[0]);
if (ctl.info[i].faRC || ctl.info[i].VLen == 0) fasz[i] = 0;
else {fasz[i] = htonl(ctl.info[i].VLen);
{iov[k ].iov_base = &ctl.info[i].VLen;
iov[k++].iov_len = sizeof(ctl.info[i].VLen);
dlen += sizeof(ctl.info[i].VLen);
if (ctl.info[i].faRC || ctl.info[i].VLen == 0) ctl.info[i].VLen = 0;
else {vLen = ctl.info[i].VLen;
ctl.info[i].VLen = htonl(ctl.info[i].VLen);
iov[k ].iov_base = (void *)ctl.info[i].Value;
iov[k++].iov_len = ctl.info[i].VLen;
dlen += ctl.info[i].VLen;
iov[k++].iov_len = vLen;
dlen += vLen;
}
if (k+2 >= iovNum)
if (k+1 >= iovNum)
{rcode = (i+1 == ctl.iNum ? kXR_ok : kXR_oksofar);
if ((rc = Response.Send(rcode, iov, k, dlen))) return rc;
k = 1; dlen = 0;
Expand All @@ -415,25 +442,89 @@ int XrdXrootdProtocol::XeqFAGet(XrdSfsFACtl &ctl, char *faVars, int faVLen)
return (dlen ? Response.Send(iov, k, dlen) : 0);
}

/******************************************************************************/
/* X e q F A L s d */
/******************************************************************************/

int XrdXrootdProtocol::XeqFALsd(XrdSfsFACtl &ctl)
{
IOVec iovHelper;
struct iovec *iov;
XResponseType rcode;
int k = 1, rc = 0, dlen = 0, vLen;
bool xresp = false;

// Make sure we have something to send
//
if (!ctl.iNum) return Response.Send();

// Allocate an iovec. We need three elements for each info entry.
//
int iovNum = ctl.iNum*3+1;
iov = iovHelper.Alloc(iovNum);

// Return the value for for each variable, segment the response, if need be
//
for (int i = 0; i < ctl.iNum; i++)
{if (ctl.info[i].faRC) continue;
iov[k ].iov_base = ctl.info[i].Name;
iov[k++].iov_len = ctl.info[i].NLen+1;
dlen += ctl.info[i].NLen+1;

vLen = ctl.info[i].VLen;
ctl.info[i].VLen = htonl(vLen);
iov[k ].iov_base = &ctl.info[i].VLen;
iov[k++].iov_len = sizeof(ctl.info[i].VLen);
dlen += sizeof(ctl.info[i].VLen);

iov[k ].iov_base = (void *)ctl.info[i].Value;
iov[k++].iov_len = vLen;
dlen += vLen;

if (k+2 >= iovNum)
{rcode = (i+1 == ctl.iNum ? kXR_ok : kXR_oksofar);
if ((rc = Response.Send(rcode, iov, k, dlen))) return rc;
k = 1; dlen = 0; xresp = true;
}
}

// Check if we need to send out the last amount of data
//
return (dlen ? Response.Send(iov, k, dlen) : 0);

// Check if anything was sent at all
//
return (xresp ? 0 : Response.Send());
}

/******************************************************************************/
/* X e q F A L s t */
/******************************************************************************/

int XrdXrootdProtocol::XeqFALst(XrdSfsFACtl &ctl)
{
struct iovec iov[iovNum];
struct iovec iov[16];
XrdOucErrInfo eInfo(Link->ID, Monitor.Did, clientPV);
int rc;

// Set correct subcode
//
ctl.rqst = XrdSfsFACtl::faLst;

// Set correct options
//
if (Request.fattr.options & ClientFattrRequest::aData)
ctl.opts |= XrdSfsFACtl::retval;

// Execute the action
//
if ((rc = osFS->fsctl(SFS_FSCTL_FATTR, (const char *)&ctl, eInfo, CRED)))
return fsError(rc, XROOTD_MON_OPENR, eInfo, ctl.path, (char *)ctl.pcgi);

// Check for more complicated return
//
if (ctl.opts & XrdSfsFACtl::retval) return XeqFALsd(ctl);

// If there is only a single buffer, hen we can do a simple response
//
if (!ctl.fabP) return Response.Send();
Expand All @@ -451,16 +542,17 @@ int XrdXrootdProtocol::XeqFALst(XrdSfsFACtl &ctl)
dlen += dP->dlen;
dP = dP->next;
i++;
if (i == iovNum)
if (i == (int)sizeof(iov))
{rc = Response.Send((dP ? kXR_oksofar : kXR_ok), iov, i, dlen);
if (rc || dP == 0) break;
if (rc || dP == 0) return rc;
dlen = 0;
i = 1;
}
i = 1;
}

// All done
// Check if we need to send out the last amount of data
//
return rc;
return (dlen ? Response.Send(iov, i, dlen) : 0);
}

/******************************************************************************/
Expand Down

0 comments on commit 9c34165

Please sign in to comment.