Skip to content

Commit

Permalink
Streamline locking to reduce latency by atleast 50% and increase para…
Browse files Browse the repository at this point in the history
…llelism.

Fix bug that unlocked an unlocked mutex.
  • Loading branch information
abh3 committed Jan 20, 2017
1 parent e32e2f8 commit 5a8f6c7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 45 deletions.
87 changes: 48 additions & 39 deletions src/XrdCms/XrdCmsCluster.cc
Expand Up @@ -268,8 +268,9 @@ XrdCmsNode *XrdCmsCluster::Add(XrdLink *lp, int port, int Status, int sport,
nP->isBad & XrdCmsNode::isSuspend ? 0 : 1,
nP->isNoStage ? 0 : 1);

// All done
// All done. Return the node locked.
//
nP->Lock(false);
return nP;
}

Expand Down Expand Up @@ -348,8 +349,7 @@ void XrdCmsCluster::BlackList(XrdOucTList *blP)
{inBL = (blP && (blRD = XrdCmsBlackList::Present(nP->Name(), blP)));
if ((!inBL && !(nP->isBad & XrdCmsNode::isBlisted))
|| ( inBL && (nP->isBad & XrdCmsNode::isBlisted))) continue;
nP->Lock(true);
STMutex.UnLock();
nP->g2nLock(STMutex); // Downgrade to only node lock
if (inBL)
{nP->isBad |= XrdCmsNode::isBlisted | XrdCmsNode::isDoomed;
if (blRD < -1)
Expand All @@ -364,8 +364,7 @@ void XrdCmsCluster::BlackList(XrdOucTList *blP)
nP->isBad &= ~(XrdCmsNode::isBlisted | XrdCmsNode::isDoomed);
Say.Emsg("Manager", nP->Name(), "removed from blacklist.");
}
nP->UnLock();
STMutex.Lock();
nP->n2gLock(STMutex);
}
}
STMutex.UnLock();
Expand All @@ -388,19 +387,19 @@ SMask_t XrdCmsCluster::Broadcast(SMask_t smask, const struct iovec *iod,
STMutex.Lock();
bmask = smask & peerMask;

// Run through the table looking for nodes to send messages to
// Run through the table looking for nodes to send messages to. We don't need
// the node lock for this but we do need to up the reference count to keep the
// node pointer valid for the duration of the send() (may or may not block).
//
for (i = 0; i <= STHi; i++)
{if ((nP = NodeTab[i]) && nP->isNode(bmask))
{if (nP->isOffline) unQueried |= nP->Mask();
else {nP->Lock(true);
STMutex.UnLock();
else {nP->g2Ref(STMutex);
if (nP->Send(iod, iovcnt, iotot) < 0)
{unQueried |= nP->Mask();
DEBUG(nP->Ident <<" is unreachable");
}
nP->UnLock();
STMutex.Lock();
nP->Ref2g(STMutex);
}
}
}
Expand Down Expand Up @@ -468,17 +467,17 @@ int XrdCmsCluster::Broadsend(SMask_t Who, XrdCms::CmsRRHdr &Hdr,
Beg = Start = (Start <= STHi ? Start+1 : 0);
Fin = STHi;

// Run through the table looking for a node to send the message to
// Run through the table looking for nodes to send messages to. We don't need
// the node lock for this but we do need to up the reference count to keep the
// node pointer valid for the duration of the send() (may or may not block).
//
do{for (i = Beg; i <= Fin; i++)
{if ((nP = NodeTab[i]) && nP->isNode(Who))
{if (nP->isOffline) continue;
nP->Lock(true);
STMutex.UnLock();
nP->g2Ref(STMutex);
if (nP->Send(ioV, 2, ioTot) >= 0) {nP->UnLock(); return 1;}
DEBUG(nP->Ident <<" is unreachable");
nP->UnLock();
STMutex.Lock();
nP->Ref2g(STMutex);
}
}
if (!Beg) break;
Expand Down Expand Up @@ -729,11 +728,9 @@ void *XrdCmsCluster::MonRefs()
{for (i = 0; i <= STHi; i++)
if ((nP = NodeTab[i])
&& (resetWR || (doReset && nP->isNode(resetMask))) )
{nP->Lock(true);
if (resetW || doReset) nP->RefW=0;
{if (resetW || doReset) nP->RefW=0;
if (resetR || doReset) nP->RefR=0;
nP->Shrem = nP->Share;
nP->UnLock();
}
if (resetWR)
{if (resetW) {SelWtot += SelWcnt; SelWcnt = 0;}
Expand Down Expand Up @@ -1095,15 +1092,15 @@ int XrdCmsCluster::Select(SMask_t pmask, int &port, char *hbuff, int &hlen,

// If we are exporting a shared-everything system then the incomming mask
// may have more than one server indicated. So, we need to do a full select.
// This is forced when isMulti is true, indicating a choice may exist.
// This is forced when isMulti is true, indicating a choice may exist. Note
// that the node, if any, is returned unlocked but we have the global mutex.
//
if (isMulti || baseFS.isDFS())
{STMutex.Lock();
nP = (Config.sched_RR ? SelbyRef(pmask,selR) : SelbyLoad(pmask,selR));
if (nP) hlen = nP->netIF.GetName(hbuff, port, nType) + 1;
else hlen = 0;
STMutex.UnLock();
if (!nP) return 0;
hlen = nP->netIF.GetName(hbuff, port, nType) + 1;
nP->UnLock();
return hlen != 1;
}

Expand All @@ -1124,20 +1121,20 @@ int XrdCmsCluster::Select(SMask_t pmask, int &port, char *hbuff, int &hlen,
if (nP)
{if (isrw)
if (nP->isNoStage || nP->DiskFree < nP->DiskMinF) nP = 0;
else {SelWcnt++; nP->RefTotW++; nP->RefW++; nP->Lock(true);}
else {SelRcnt++; nP->RefTotR++; nP->RefR++; nP->Lock(true);}
else {SelWcnt++; nP->RefTotW++; nP->RefW++;}
else {SelRcnt++; nP->RefTotR++; nP->RefR++;}
}
}
STMutex.UnLock();

// At this point either we have a node or we do not
//
if (nP)
{hlen = nP->netIF.GetName(hbuff, port, nType) + 1;
nP->RefR++;
nP->UnLock();
STMutex.UnLock();
return hlen != 1;
}
STMutex.UnLock();
return 0;
}

Expand Down Expand Up @@ -1564,7 +1561,8 @@ int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
? XrdCmsNode::allowsRW : 0);

// Scan for a primary and alternate node (alternates do staging). At this
// point we omit all peer nodes as they are our last resort.
// point we omit all peer nodes as they are our last resort. Note that Selbyxxx
// returns the node unlocked but we have he global mutex so that is OK.
//
STMutex.Lock();
mask = pmask & peerMask;
Expand All @@ -1578,12 +1576,13 @@ int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
mask = amask & peerMask; isalt = XrdCmsNode::allowsSS;
if (!(Sel.Opts & XrdCmsSelect::isMeta)) selR.needSpace |= isalt;
}
STMutex.UnLock();

// If we found an eligible node then dispatch the client to it.
// If we found an eligible node then dispatch the client to it. We will
// swap the global mutex for the node mutex to minimize interefrence.
//
if (nP)
{Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data, Sel.Resp.Port, nType);
{nP->g2nLock(STMutex);
Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data, Sel.Resp.Port, nType);
if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
if (isalt || (Sel.Opts & XrdCmsSelect::Create) || Sel.iovN)
Expand All @@ -1605,28 +1604,31 @@ int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
// do not forward to a peer unless we have a suffficient number of local nodes.
//
if (!selR.delay && NodeCnt < Config.SUPCount)
{Record(Sel.Path.Val, "insufficient number of nodes", true);
{STMutex.UnLock();
Record(Sel.Path.Val, "insufficient number of nodes", true);
return Config.SUPDelay;
}

// Return delay if we should avoid selecting a peer manager
//
if (selR.delay && selR.delay < Config.PSDelay)
{Record(Sel.Path.Val, selR.reason);
{STMutex.UnLock();
Record(Sel.Path.Val, selR.reason);
return selR.delay;
}

// At this point, we attempt a peer node selection (choice of last resort)
// At this point, we attempt a peer node selection (choice of last resort). Note
// that we are still holding the global lock! If we find a peer node we will
// swap it with the node lock.
//
if (Sel.Opts & XrdCmsSelect::Peers)
{const char *reason1 = selR.reason;
int delay1 = selR.delay;
bool noNet = selR.xNoNet;
STMutex.Lock();
if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
STMutex.UnLock();
if (nP)
{Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data,Sel.Resp.Port,nType);
{nP->g2nLock(STMutex);
Sel.Resp.DLen = nP->netIF.GetName(Sel.Resp.Data,Sel.Resp.Port,nType);
if (!Sel.Resp.DLen) {nP->UnLock(); return Unreachable(Sel, false);}
Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
if (Sel.iovN && Sel.iovP) nP->Send(Sel.iovP, Sel.iovN);
Expand All @@ -1638,6 +1640,10 @@ int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
{selR.delay = delay1; selR.reason = reason1; selR.xNoNet = noNet;}
}

// At this point we don't need the global lock so let it go.
//
STMutex.UnLock();

// At this point we either don't have enough nodes or simply can't handle this
//
if (selR.delay)
Expand Down Expand Up @@ -1673,6 +1679,8 @@ int XrdCmsCluster::SelNode(XrdCmsSelect &Sel, SMask_t pmask, SMask_t amask)
// Cost selection is used only for peer node selection as peers do not
// report a load and handle their own scheduling.

// Caller must have the STMutex locked. The returned node. if any, is unlocked.

XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, XrdCmsSelector &selR)
{
XrdCmsNode *np, *sp = 0;
Expand Down Expand Up @@ -1706,14 +1714,15 @@ XrdCmsNode *XrdCmsCluster::SelbyCost(SMask_t mask, XrdCmsSelector &selR)
// Check for overloaded node and return result
//
if (!sp) return calcDelay(selR);
sp->Lock(true);
RefCount(sp, Multi, selR.needSpace);
return sp;
}

/******************************************************************************/
/* S e l b y L o a d */
/******************************************************************************/

// Caller must have the STMutex locked. The returned node. if any, is unlocked.

XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, XrdCmsSelector &selR)
{
Expand Down Expand Up @@ -1757,7 +1766,6 @@ XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, XrdCmsSelector &selR)
// Check for overloaded node and return result
//
if (!sp) return calcDelay(selR);
sp->Lock(true);
RefCount(sp, Multi, selR.needSpace);
return sp;
}
Expand All @@ -1766,6 +1774,8 @@ XrdCmsNode *XrdCmsCluster::SelbyLoad(SMask_t mask, XrdCmsSelector &selR)
/* S e l b y R e f */
/******************************************************************************/

// Caller must have the STMutex locked. The returned node. if any, is unlocked.

XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, XrdCmsSelector &selR)
{
XrdCmsNode *np, *sp = 0;
Expand Down Expand Up @@ -1795,7 +1805,6 @@ XrdCmsNode *XrdCmsCluster::SelbyRef(SMask_t mask, XrdCmsSelector &selR)
// Check for overloaded node and return result
//
if (!sp) return calcDelay(selR);
sp->Lock(true);
RefCount(sp, Multi, selR.needSpace);
return sp;
}
Expand Down
12 changes: 7 additions & 5 deletions src/XrdCms/XrdCmsNode.cc
Expand Up @@ -228,18 +228,20 @@ void XrdCmsNode::Delete(XrdSysMutex &gMutex)
// when the lkCount equals the ulCount. The lkCount is under control of the
// global mutex passed to us. The ulCount is under control of the node lock.
// we will wait until they are equal. As this node has been removed from all
// table at this point, the lkCount cannot change and we need only to fetch
// it once. However, we will refresh it if we timeout. Setting isGone will
// signal us whenever the ulCount changes and is under global mutex control.
// tables at this point, the lkCount cannot increase but it may decrease when
// Ref2G() is called which happens for none lock-free operations (e.g. Send).
// However, we will refresh it if we timeout.
//
gMutex.Lock();
theLKCnt = lkCount;
isGone = 1;
gMutex.UnLock();

// Get the node lock and do some debugging
// Get the node lock and do some debugging. Set tghe isGone flag even though it
// should be set. We need to do that under the node lock to make sure we get
// signalled whenever the node gets unlocked by some thread (ulCount changed).
//
nodeMutex.Lock();
isGone = 1;
DEBUG(Ident <<" locks=" <<theLKCnt <<" unlocks=" <<ulCount);

// Now wait for things to simmer down. We wait for an appropriate time because
Expand Down
22 changes: 22 additions & 0 deletions src/XrdCms/XrdCmsNode.hh
Expand Up @@ -145,6 +145,28 @@ inline char *Name() {return (myName ? myName : (char *)"?");}

inline SMask_t Mask() {return NodeMask;}

inline void g2Ref(XrdSysMutex &gMutex) {lkCount++; gMutex.UnLock();}

inline void Ref2g(XrdSysMutex &gMutex) {gMutex.Lock(); lkCount--;}

inline void g2nLock(XrdSysMutex &gMutex)
{lkCount++; // gMutex must be held
gMutex.UnLock(); // Safe because lkCount != ulCount
nodeMutex.Lock(); // Downgrade to node lock
incUL = 1;
isLocked = 1;
}

inline void n2gLock(XrdSysMutex &gMutex)
{isLocked = 0;
if (incUL)
{ulCount++; incUL = 0;
if (isGone) nodeMutex.Signal();
}
nodeMutex.UnLock(); // Release this node
gMutex.Lock(); // Upgade to global mutex
}

inline void Lock(bool doinc)
{if (!doinc) nodeMutex.Lock();
else {lkCount++; // Global lock must be held
Expand Down
5 changes: 4 additions & 1 deletion src/XrdCms/XrdCmsProtocol.cc
Expand Up @@ -769,7 +769,10 @@ XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(int wasSuspended)
//
myNode = new XrdCmsNode(Link); myNode->Lock(false);
if (!(RSlot = RTable.Add(myNode)))
{Say.Emsg("Protocol",myNode->Ident,"login failed; too many redirectors.");
{myNode->UnLock();
delete myNode;
myNode = 0;
Say.Emsg("Protocol",myNode->Ident,"login failed; too many redirectors.");
return 0;
} else myNode->setSlot(RSlot);

Expand Down

0 comments on commit 5a8f6c7

Please sign in to comment.