Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiple sequence numbers, one for each monitor destination #1226

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/XrdXrootd/XrdXrootdMonFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ void XrdXrootdMonFile::Flush() // The bfMutex must be locked

// Write this out
//
XrdXrootdMonitor::Send(XROOTD_MON_FSTA, repBuff, bfSize);
XrdXrootdMonitor::Send(XROOTD_MON_FSTA, repBuff, bfSize, NULL);
repTOD->tBeg = repTOD->tEnd;
xfrRecs = totRecs = 0;
}
Expand Down
53 changes: 33 additions & 20 deletions src/XrdXrootd/XrdXrootdMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ XrdSysError *eDest = 0;
char *monHost = 0;
long long mySID = 0;
int32_t startTime = InitStartTime();
int seq = 0;
int seq[2] = {0, 0};
XrdSysMutex seqMutex;
}

Expand Down Expand Up @@ -623,7 +623,7 @@ int XrdXrootdMonitor::Init(XrdScheduler *sp, XrdSysError *errp,
idLen = strlen(iBuff) + sizeof(XrdXrootdMonHeader) + sizeof(kXR_int32);
idRec = (char *)malloc(idLen+1);
mP = (XrdXrootdMonMap *)idRec;
fillHeader(&(mP->hdr), XROOTD_MON_MAPIDNT, idLen);
fillHeader(&(mP->hdr), XROOTD_MON_MAPIDNT, idLen, -1);
mP->hdr.pseq = 0;
mP->dictid = 0;
strcpy(mP->info, iBuff);
Expand Down Expand Up @@ -709,14 +709,13 @@ kXR_unt32 XrdXrootdMonitor::Map(char code, XrdXrootdMonitor::User &uInfo,
// Fill in the header
//
size = sizeof(XrdXrootdMonHeader)+sizeof(kXR_int32)+size;
fillHeader(&map.hdr, code, size);

// Route the packet to all destinations that need them
//
if (code == XROOTD_MON_MAPPATH) montype = XROOTD_MON_PATH;
else if (code == XROOTD_MON_MAPUSER) montype = XROOTD_MON_USER;
else montype = XROOTD_MON_INFO;
Send(montype, (void *)&map, size);
Send(montype, (void *)&map, size, &map.hdr);

// Return the dictionary id
//
Expand Down Expand Up @@ -907,14 +906,21 @@ unsigned char XrdXrootdMonitor::do_Shift(long long xTot, unsigned int &xVal)
/******************************************************************************/

void XrdXrootdMonitor::fillHeader(XrdXrootdMonHeader *hdr,
const char id, int size)
const char id, int size, int seq_idx)
{ int myseq;

// Generate a new sequence number
//
seqMutex.Lock();
myseq = 0x00ff & (seq++);
seqMutex.UnLock();
if (seq_idx >= 0)
{
seqMutex.Lock();
myseq = 0x00ff & (seq[seq_idx]++);
seqMutex.UnLock();
}
else
{
myseq = 0;
}

// Fill in the header
//
Expand Down Expand Up @@ -945,7 +951,6 @@ void XrdXrootdMonitor::Flush()
// Fill in the header and in the process we will have the current time
//
size = (nextEnt+1)*sizeof(XrdXrootdMonTrace)+sizeof(XrdXrootdMonHeader);
fillHeader(&monBuff->hdr, XROOTD_MON_MAPTRCE, size);

// Punt on the right ending time. We are trying to keep same-sized windows
// This was corrected by Matevz Tadel, as before we were using real time which
Expand All @@ -957,8 +962,8 @@ void XrdXrootdMonitor::Flush()

// Send off the buffer and reinitialize it
//
if (this != altMon) Send(XROOTD_MON_IO, (void *)monBuff, size);
else {Send(XROOTD_MON_FILE, (void *)monBuff, size);
if (this != altMon) Send(XROOTD_MON_IO, (void *)monBuff, size, &monBuff->hdr);
else {Send(XROOTD_MON_FILE, (void *)monBuff, size, &monBuff->hdr);
FlushTime = localWindow + autoFlush;
}
setTMark(monBuff, 0, localWindow);
Expand All @@ -985,11 +990,10 @@ void XrdXrootdMonitor::Flush(XrdXrootdMonitor::MonRdrBuff *mP)
// Fill in the header and in the process we will have the current time
//
size = (mP->nextEnt+1)*sizeof(XrdXrootdMonRedir)+sizeof(XrdXrootdMonHeader)+8;
fillHeader(&(mP->Buff->hdr), XROOTD_MON_MAPREDR, size);

// Send off the buffer and reinitialize it
//
Send(XROOTD_MON_REDR, (void *)(mP->Buff), size);
Send(XROOTD_MON_REDR, (void *)(mP->Buff), size, &(mP->Buff->hdr));
mP->nextEnt = 0;
}

Expand Down Expand Up @@ -1052,7 +1056,7 @@ void XrdXrootdMonitor::Mark()
/* S e n d */
/******************************************************************************/

int XrdXrootdMonitor::Send(int monMode, void *buff, int blen)
int XrdXrootdMonitor::Send(int monMode, void *buff, int blen, XrdXrootdMonHeader *header)
{
#ifndef NODEBUG
const char *TraceID = "Monitor";
Expand All @@ -1062,14 +1066,23 @@ int XrdXrootdMonitor::Send(int monMode, void *buff, int blen)

sendMutex.Lock();
if (monMode & monMode1 && InetDest1)
{rc1 = InetDest1->Send((char *)buff, blen);
TRACE(DEBUG,blen <<" bytes sent to " <<Dest1 <<" rc=" <<rc1);
}
{
// Fill in the header
if (header != NULL) {
fillHeader(header, monMode, blen, 0);
}
rc1 = InetDest1->Send((char *)buff, blen);
TRACE(DEBUG,blen <<" bytes sent to " <<Dest1 <<" rc=" <<rc1);
}
else rc1 = 0;
if (monMode & monMode2 && InetDest2)
{rc2 = InetDest2->Send((char *)buff, blen);
TRACE(DEBUG,blen <<" bytes sent to " <<Dest2 <<" rc=" <<rc2);
}
{
if (header != NULL) {
fillHeader(header, monMode, blen, 1);
}
rc2 = InetDest2->Send((char *)buff, blen);
TRACE(DEBUG,blen <<" bytes sent to " <<Dest2 <<" rc=" <<rc2);
}
else rc2 = 0;
sendMutex.UnLock();

Expand Down
19 changes: 16 additions & 3 deletions src/XrdXrootd/XrdXrootdMonitor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static void Defaults(int msz, int rsz, int wsz,

static int Flushing() {return autoFlush;}

static void Ident() {Send(-1, idRec, idLen);}
static void Ident() {Send(-1, idRec, idLen, NULL);}

static int Init(XrdScheduler *sp, XrdSysError *errp,
const char *iHost, const char *iProg,
Expand All @@ -134,7 +134,15 @@ static int Redirect() {return monREDR;}
static int Redirect(kXR_unt32 mID, const char *hName, int Port,
const char opC, const char *Path);

static int Send(int mmode, void *buff, int size);
/*
* Send data to the monitoring nodes
* Args:
* mmode:
* buf: Message buffer
* size: Size of the buffer
* header: Pointer to the header within the buff. If NULL, do not set a header.
*/
static int Send(int mmode, void *buff, int size, XrdXrootdMonHeader *header = NULL);

static time_t Tick();

Expand Down Expand Up @@ -223,8 +231,13 @@ inline void Add_io(kXR_unt32 duid, kXR_int32 blen, kXR_int64 offs)
static XrdXrootdMonitor *Alloc(int force=0);
unsigned char do_Shift(long long xTot, unsigned int &xVal);
void Dup(XrdXrootdMonTrace *mrec);

/*
* Fill the message header. If the seq_idx < 0, also insert 0.
* Otherwise, seq_idx is the index of the sender. Currently limited to 0 or 1
*/
static void fillHeader(XrdXrootdMonHeader *hdr,
const char id, int size);
const char id, int size, int seq_idx = 0);
static MonRdrBuff *Fetch();
void Flush();
static void Flush(MonRdrBuff *mP);
Expand Down