Skip to content

Commit

Permalink
Make it possible to return more than 2K via XrdOucErrInfo object.
Browse files Browse the repository at this point in the history
Enable locate to return host names not just ip addreses.
  • Loading branch information
abh3 committed Aug 18, 2013
1 parent e268c8b commit fa5ecba
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 83 deletions.
2 changes: 1 addition & 1 deletion src/XProtocol/YProtocol.hh
Expand Up @@ -223,7 +223,7 @@ enum {kYR_refresh = 0x01,
};
// kXR_string Path;

static const int RILen = 32; // Max length of each response item
static const int RHLen =266; // Max length of each host response item
};

/******************************************************************************/
Expand Down
2 changes: 1 addition & 1 deletion src/XrdCms/XrdCmsClientMan.cc
Expand Up @@ -49,7 +49,7 @@ using namespace XrdCms;
/* G l o b a l s */
/******************************************************************************/

XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 64);
XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16);

XrdInet *XrdCmsClientMan::Network = 0;

Expand Down
16 changes: 11 additions & 5 deletions src/XrdCms/XrdCmsCluster.cc
Expand Up @@ -432,6 +432,7 @@ XrdCmsSelected *XrdCmsCluster::List(SMask_t mask, CmsLSOpts opts, int &nsel)
{
int i, lsall = opts & LS_All, retIP = opts & LS_IPO;
int retIP4 = opts & LS_IP4, onlyIP4 = (opts & LS_IP4) && !(opts & LS_IP6);
int retID = opts & LS_IDNT;
XrdCmsNode *nP;
XrdCmsSelected *sipp = 0, *sip;

Expand All @@ -443,14 +444,19 @@ XrdCmsSelected *XrdCmsCluster::List(SMask_t mask, CmsLSOpts opts, int &nsel)
if ((nP=NodeTab[i]) && (lsall || (nP->NodeMask & mask)))
{nsel++;
if (onlyIP4 && !(nP->IPV4Len)) continue;
sip = new XrdCmsSelected(retIP ? 0 : nP->Name(), sipp);
sip = new XrdCmsSelected(sipp);
if (retIP)
{if (retIP4 && nP->IPV4Len)
{sip->IPV6Len = nP->IPV4Len; strcpy(sip->IPV6, nP->IPV4);
{sip->IdentLen = nP->IPV4Len; strcpy(sip->Ident, nP->IPV4);
} else {
sip->IPV6Len = nP->IPV6Len; strcpy(sip->IPV6, nP->IPV6);
sip->IdentLen = nP->IPV6Len; strcpy(sip->Ident, nP->IPV6);
}
}
} else if (retID)
{strcpy(sip->Ident, nP->locName);
sip->IdentLen = nP->locNLen;
} else {strcpy(sip->Ident, nP->myName);
sip->IdentLen = nP->myNlen;
}
sip->Mask = nP->NodeMask;
sip->Id = nP->NodeID;
sip->Port = nP->Port;
Expand Down Expand Up @@ -1115,7 +1121,7 @@ int XrdCmsCluster::Statt(char *bfr, int bln)
*stp = 0;
if (AddShr) snprintf(shrBuff, sizeof(shrBuff), statfmt3,
(sp->Share ? sp->Share : 100), sp->Shrin);
mlen = snprintf(bfr, bln, statfmt2, n, sp->Name,
mlen = snprintf(bfr, bln, statfmt2, n, sp->Ident,
XrdCmsRole::Type(static_cast<XrdCmsRole::RoleID>(sp->RoleID)),
stat, sp->RefTotR, sp->RefTotW, shrBuff);
bfr += mlen; bln -= mlen; tlen += mlen;
Expand Down
3 changes: 2 additions & 1 deletion src/XrdCms/XrdCmsCluster.hh
Expand Up @@ -137,7 +137,8 @@ SMask_t getMask(const char *Cid);

// Extracts out node information. Opts are one or more of CmsLSOpts
//
enum CmsLSOpts {LS_NULL=0, LS_IP4=1, LS_IP6=2, LS_IPO=3, LS_All=8};
enum CmsLSOpts {LS_NULL=0, LS_IP4=1, LS_IP6=2, LS_IPO=3,
LS_IDNT=4, LS_All=8};

XrdCmsSelected *List(SMask_t mask, CmsLSOpts opts, int &nsel);

Expand Down
45 changes: 29 additions & 16 deletions src/XrdCms/XrdCmsNode.cc
Expand Up @@ -137,6 +137,7 @@ XrdCmsNode::XrdCmsNode(XrdLink *lnkp, int port,
TZValid = 0;
TimeZone = 0;
IPV6Len = 0; *IPV6 = 0; IPV4Len = 0; *IPV4 = 0;
locName = 0;

// setName() will set Ident, netID, IPV6, myName, myNlen, & Port!
//
Expand All @@ -161,6 +162,7 @@ XrdCmsNode::~XrdCmsNode()
if (Ident) free(Ident);
if (myNID) free(myNID);
if (myName)free(myName);
if (locName)free(locName);
}

/******************************************************************************/
Expand Down Expand Up @@ -196,13 +198,17 @@ void XrdCmsNode::setName(XrdLink *lnkp, int port)
if (Ident) free(Ident);
Ident = strdup(buff);

if (!Port) fmtOpts |= XrdNetAddr::noPort;
else oldPort = netID.Port(Port);
// Format locate target name
//
locNLen = sprintf(buff, "%s:%d", myName, (Port ? Port : 1094));
if (locName) free(locName);
locName = strdup(buff);

// Format out the address in IPv6 format
//
oldPort = netID.Port((Port ? Port : 1094));
netID.Format(buff, sizeof(buff), XrdNetAddr::fmtAdv6, fmtOpts);
if (oldPort) netID.Port(oldPort);
netID.Port(oldPort);
strlcpy(IPV6, buff, sizeof(IPV6));
IPV6Len = strlen(IPV6);

Expand Down Expand Up @@ -514,7 +520,7 @@ const char *XrdCmsNode::do_Locate(XrdCmsRRData &Arg)
XrdCmsSelect Sel(0, Arg.Path, Arg.PathLen-1);
XrdCmsSelected *sP = 0;
struct {kXR_unt32 Val;
char outbuff[CmsLocateRequest::RILen*STMax];} Resp;
char outbuff[CmsLocateRequest::RHLen*STMax];} Resp;
struct iovec ioV[2] = {{(char *)&Arg.Request, sizeof(Arg.Request)},
{(char *)&Resp, 0}};
const char *Why;
Expand All @@ -530,12 +536,27 @@ const char *XrdCmsNode::do_Locate(XrdCmsRRData &Arg)
if (Xmi_Select->Select(&Req, XMI_LOCATE, Arg.Path, Arg.Opaque)) return 0;
}

// Get the right options
//
if (Arg.Opts & CmsLocateRequest::kYR_retname)
lsopts = XrdCmsCluster::LS_IDNT;
else if (Arg.Opts & CmsLocateRequest::kYR_retipv6)
{lsopts |= XrdCmsCluster::LS_IP6;
if (Arg.Opts & CmsLocateRequest::kYR_retipv4)
lsopts |= XrdCmsCluster::LS_IP4;
}
else if (Arg.Opts & CmsLocateRequest::kYR_retipv4)
lsopts |= XrdCmsCluster::LS_IP4;
else lsopts = XrdCmsCluster::LS_IPO;

// Grab the refresh option (the only one we support)
//
if (Arg.Opts & CmsLocateRequest::kYR_refresh)
{Sel.Opts = XrdCmsSelect::Refresh; *toP++='s';}
if (Arg.Opts & CmsLocateRequest::kYR_asap)
{Sel.Opts |= XrdCmsSelect::Asap; *toP++='i'; Sel.InfoP = &reqInfo;}
{Sel.Opts |= XrdCmsSelect::Asap; *toP++='i'; Sel.InfoP = &reqInfo;
reqInfo.lsLU = static_cast<char>(lsopts);
}
else Sel.InfoP = 0;

// Do some debugging
Expand All @@ -558,14 +579,6 @@ const char *XrdCmsNode::do_Locate(XrdCmsRRData &Arg)
}
} else {Why = "?"; bytes = 0;}

// Get the right options
//
if (Arg.Opts & CmsLocateRequest::kYR_retipv4)
{lsopts |= XrdCmsCluster::LS_IP4;
if (Arg.Opts & CmsLocateRequest::kYR_retipv6)
lsopts |= XrdCmsCluster::LS_IP6;
} else lsopts |= XrdCmsCluster::LS_IP6;

// List the servers
//
if (!rc)
Expand All @@ -591,7 +604,7 @@ const char *XrdCmsNode::do_Locate(XrdCmsRRData &Arg)
DEBUGR(Why <<Arg.Path);
} else {
bytes = do_LocFmt(Resp.outbuff, sP, Sel.Vec.pf, Sel.Vec.wf, lsall)
+ sizeof(Resp.Val)+1;
+ sizeof(Resp.Val) + 1;
Resp.Val = 0;
Arg.Request.rrCode = kYR_data;
}
Expand Down Expand Up @@ -626,7 +639,7 @@ if (lsall)
{*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S');
if (sP->Status & Hung) *oP = tolower(*oP);
*(oP+1) = (sP->Mask & wfVec ? 'w' : 'r');
strcpy(oP+2, sP->IPV6); oP += sP->IPV6Len + 2;
strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2;
if (sP->next) *oP++ = ' ';
pP = sP; sP = sP->next; delete pP;
}
Expand All @@ -636,7 +649,7 @@ if (lsall)
{*oP = (sP->Status & XrdCmsSelected::isMangr ? 'M' : 'S');
if (sP->Mask & pfVec) *oP = tolower(*oP);
*(oP+1) = (sP->Mask & wfVec ? 'w' : 'r');
strcpy(oP+2, sP->IPV6); oP += sP->IPV6Len + 2;
strcpy(oP+2, sP->Ident); oP += sP->IdentLen + 2;
if (sP->next) *oP++ = ' ';
}
pP = sP; sP = sP->next; delete pP;
Expand Down
7 changes: 5 additions & 2 deletions src/XrdCms/XrdCmsNode.hh
Expand Up @@ -184,10 +184,13 @@ XrdNetAddr netID;
XrdCmsNode *Next;
time_t DropTime;
XrdCmsDrop *DropJob;
int IPV6Len; // 12345678901234567890123456
char *locName;
short locNLen;
short IPV6Len; // 12345678901234567890123456
char IPV6[INET6_ADDRSTRLEN+10]; // [full_ipv6_address]:123456
int IPV4Len; // 12345678901234567890123456
short IPV4Len; // 12345678901234567890123456
char IPV4[INET_ADDRSTRLEN+12]; // [::123.123.123.123]:123456
char rsv1[4];

SMask_t NodeMask;
int NodeID;
Expand Down
4 changes: 2 additions & 2 deletions src/XrdCms/XrdCmsParser.cc
Expand Up @@ -289,7 +289,7 @@ int XrdCmsParser::Decode(const char *Man, CmsRRHdr &hdr, XrdOucBuffer *dBuff,
EPNAME("Decode");
static const int mvsz = static_cast<int>(sizeof(kXR_unt32));
kXR_unt32 uval;
int Result, msgval, msglen, dlen = dBuff->Length();
int Result, msgval, msglen, dlen = dBuff->DataLen();
const char *Path = eInfo->getErrData(), *User = eInfo->getErrUser();
const char *Mgr = (Man ? Man : "?");
char *msg, *data = dBuff->Buffer();
Expand Down Expand Up @@ -327,7 +327,7 @@ int XrdCmsParser::Decode(const char *Man, CmsRRHdr &hdr, XrdOucBuffer *dBuff,
{XrdOucBuffer *myBuff=dBuff->Highjack(XrdOucEI::Max_Error_Len);
if (myBuff)
{myBuff->SetLen(msglen, (msglen ? mvsz : 0));
eInfo->setErrInfo(msglen, "");
eInfo->setErrInfo(msglen, myBuff);
return Result;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/XrdCms/XrdCmsRRQ.cc
Expand Up @@ -287,6 +287,7 @@ void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
static const int ovhd = sizeof(kXR_unt32);
XrdCmsSelected *sP;
XrdCmsNode *nP;
XrdCmsCluster::CmsLSOpts lsopts;
int bytes, nsel;

// Send a delay if we timed out
Expand All @@ -300,7 +301,8 @@ void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
// client to wait as this should never happen and the long path is called for.
// ASAP responses always respond in with IPv6 addresses or mapped IPv4 ones.
//
if (!(sP = Cluster.List(lP->Arg1, XrdCmsCluster::LS_IPO, nsel))
lsopts = static_cast<XrdCmsCluster::CmsLSOpts>(lP->Info.lsLU);
if (!(sP = Cluster.List(lP->Arg1, lsopts, nsel))
|| (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
{sendLwtResp(lP);
return;
Expand Down
7 changes: 4 additions & 3 deletions src/XrdCms/XrdCmsRRQ.hh
Expand Up @@ -54,13 +54,14 @@ char isRW; // True if r/w access wanted
char isLU; // True if locate response wanted
char minR; // Minimum number of responses for fast redispatch
char actR; // Actual number of responses
short Rsvd;
char lsLU; // Lookup options
char Rsvd;
SMask_t rwVec; // R/W servers for corresponding path (if isLU is true)

XrdCmsRRQInfo() : isLU(0) {}
XrdCmsRRQInfo(int rinst, short rnum, kXR_unt32 id, int minQ=0)
: Key(0), ID(id), Rinst(rinst), Rnum(rnum),
isRW(0), isLU(0), minR(minQ), actR(0), Rsvd(0),
isRW(0), isLU(0), minR(minQ), actR(0), lsLU(0), Rsvd(0),
rwVec(0) {}
~XrdCmsRRQInfo() {}
};
Expand Down Expand Up @@ -157,7 +158,7 @@ static const int iov_cnt = 2;
XrdCms::CmsResponse redrResp;
XrdCms::CmsResponse waitResp;
union {char hostbuff[288];
char databuff[XrdCms::CmsLocateRequest::RILen
char databuff[XrdCms::CmsLocateRequest::RHLen
*STMax];
};
Info Stats;
Expand Down
11 changes: 4 additions & 7 deletions src/XrdCms/XrdCmsSelect.hh
Expand Up @@ -94,13 +94,11 @@ class XrdCmsSelected // Argument to List() after select or locate
public:

XrdCmsSelected *next;
char *Name;
SMask_t Mask;
int Id;
int Rsvc;
int IdentLen; // 12345678901234567890123456
char Ident[264]; // [::123.123.123.123]:123456
int Port;
int IPV6Len; // 12345678901234567890123456
char IPV6[INET6_ADDRSTRLEN+10]; // [::123.123.123.123]:123456
int RefTotW;
int RefTotR;
int Shrin; // Share intervals used
Expand All @@ -117,9 +115,8 @@ enum {Disable = 0x0001,
isMangr = 0x0100
};

XrdCmsSelected(const char *sname, XrdCmsSelected *np=0)
{Name = (sname ? strdup(sname) : 0); next=np;}
XrdCmsSelected(XrdCmsSelected *np=0) : next(np) {}

~XrdCmsSelected() {if (Name) free(Name);}
~XrdCmsSelected() {}
};
#endif
45 changes: 34 additions & 11 deletions src/XrdOuc/XrdOucBuffer.cc
Expand Up @@ -53,7 +53,8 @@ int XrdOucBuffPool::alignit = sysconf(_SC_PAGESIZE);
/* C o n s t r u c t o r */
/******************************************************************************/

XrdOucBuffPool::XrdOucBuffPool(int minsz, int maxsz, int maxh, bool fixh)
XrdOucBuffPool::XrdOucBuffPool(int minsz, int maxsz,
int minh, int maxh, int rate)
{
int keep, pct, i, n = 0;

Expand All @@ -65,6 +66,10 @@ XrdOucBuffPool::XrdOucBuffPool(int minsz, int maxsz, int maxh, bool fixh)
incBsz = 1024*(1<<n);
shfBsz = 10 + n;
rndBsz = incBsz - 1;
if (maxh < 0) maxh = 0;
if (minh < 0) minh = 0;
if (maxh < minh) maxh = minh;
if (rate < 0) rate = 0;

// Round up the maxsz and make it a multiple of 4k
//
Expand All @@ -81,15 +86,13 @@ XrdOucBuffPool::XrdOucBuffPool(int minsz, int maxsz, int maxh, bool fixh)
n = incBsz;
for (i = 0; i < slots; i++)
{bSlot[i].size = n; n += incBsz;
if (fixh) bSlot[i].maxbuff = maxh;
else {pct = (slots - i)*100/slots;
if (pct >= 100) keep = maxh;
else {keep = ((maxh * pct) + 55)/100;
if (keep > maxh) keep = maxh;
else if (keep <= 0) keep = 1;
}
bSlot[i].maxbuff = keep;
pct = (slots - i + 1)*100/slots;
if (pct >= 100) keep = maxh;
else {keep = ((maxh * pct) + 55)/100 - i*rate;
if (keep > maxh) keep = maxh;
else if (keep < minh) keep = minh;
}
bSlot[i].maxbuff = keep;
}
}

Expand Down Expand Up @@ -117,7 +120,7 @@ XrdOucBuffer *XrdOucBuffPool::Alloc(int bsz)
//
if ((bP = sP->buffFree))
{sP->buffFree = bP->buffNext;
bP->Init(this, snum);
bP->buffPool = this;
sP->numbuff--;
} else {
if ((bP = new XrdOucBuffer(this, snum)))
Expand Down Expand Up @@ -161,6 +164,7 @@ void XrdOucBuffPool::BuffSlot::Recycle(XrdOucBuffer *bP)
//
if (numbuff >= maxbuff) {delete bP; return;}
bP->dlen = 0;
bP->doff = 0;

// Add the buffer to the recycle list
//
Expand All @@ -175,6 +179,24 @@ void XrdOucBuffPool::BuffSlot::Recycle(XrdOucBuffer *bP)
/******************************************************************************/
/* X r d O u c B u f f e r M e t h o d s */
/******************************************************************************/
/******************************************************************************/
/* P u b l i c C o n s t r u c t o r */
/******************************************************************************/

XrdOucBuffer::XrdOucBuffer(char *buff, int blen)
{
static XrdOucBuffPool nullPool(0, 0, 0, 0, 0);

// Initialize the one time buffer
//
data = buff;
dlen = blen;
doff = 0;
size = blen;
slot = 0;
buffPool = &nullPool;
};

/******************************************************************************/
/* C l o n e */
/******************************************************************************/
Expand Down Expand Up @@ -206,7 +228,7 @@ XrdOucBuffer *XrdOucBuffer::Clone(bool trim)

XrdOucBuffer *XrdOucBuffer::Highjack(int xsz)
{
XrdOucBuffer tempBuff(0,0), *newbP;
XrdOucBuffer tempBuff, *newbP;

// Adjust the size to revert highjacked buffer
//
Expand All @@ -221,6 +243,7 @@ XrdOucBuffer *XrdOucBuffer::Highjack(int xsz)
tempBuff = *this;
*this = *newbP;
*newbP = tempBuff;
tempBuff.data = 0;
return newbP;
}

Expand Down

0 comments on commit fa5ecba

Please sign in to comment.