Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle plugin - groundwork #22

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/XrdSfs/XrdSfsInterface.hh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@
//
#define SFS_FCTL_GETFD 1 // Return file descriptor if possible
#define SFS_FCTL_STATV 2 // Return visa information
#define SFS_FCTL_PREAD 3 // Return 1 if a pre-read should be issued for each
// IO operation that bypasses the SFS object. This
// is useful if the SFS is collecting statistics
// about the reads performed, but would like to still
// support sendfile()- and mmap()-based IO.

// Common fsctl command values (0 to 255)
//
Expand Down
4 changes: 2 additions & 2 deletions src/XrdSys/XrdSysAtomics.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
#else
#define AtomicBeg(Mtx) Mtx.Lock()
#define AtomicEnd(Mtx) Mtx.UnLock()
#define AtomicAdd(x, y) x += y
#define AtomicAdd(x, y) x; x += y
#define AtomicCAS(x, y, z) if (x == y) x = z
#define AtomicDec(x) x--
#define AtomicFAZ(x) x; x = 0
#define AtomicGet(x) x
#define AtomicInc(x) x++
#define AtomicSub(x, y) x -= y
#define AtomicSub(x, y) x; x -= y
#endif
#endif
4 changes: 4 additions & 0 deletions src/XrdXrootd/XrdXrootdFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ XrdXrootdFile::XrdXrootdFile(const char *id, XrdSfsFile *fp, char mode,
else fdNum = fp->error.getErrInfo();
sfEnabled = (sfOK && sfok && fdNum >= 0 ? 1 : 0);

// Determine if we should issue pre-reads prior to any sendfile or mmap calls
//
prEnabled = (fp->fctl(SFS_FCTL_PREAD, 0, fp->error) == SFS_OK);

// Determine if file is memory mapped
//
if (fp->getMmap((void **)&mmAddr, mmSize) != SFS_OK) isMMapped = 0;
Expand Down
2 changes: 2 additions & 0 deletions src/XrdXrootd/XrdXrootdFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ char FileMode; // 'r' or 'w'
char AsyncMode; // 1 -> if file in async r/w mode
char isMMapped; // 1 -> file is memory mapped
char sfEnabled; // 1 -> file is sendfile enabled
char prEnabled; // 1 -> prereads should be emitted prior to
// mmap/sendfile IO
int fdNum; // File descriptor number if regular file
const char *ID; // File user

Expand Down
7 changes: 6 additions & 1 deletion src/XrdXrootd/XrdXrootdMonData.hh
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const kXR_char XROOTD_MON_MAPTRCE = 't';
const kXR_char XROOTD_MON_MAPUSER = 'u';
const kXR_char XROOTD_MON_MAPXFER = 'x';

// The following bits are insert in the low order 4 bits of the MON_REDIRECT
// The following bits are insert in the low order 6 bits of the MON_REDIRECT
// entry code to indicate the actual operation that was requestded.
//
const kXR_char XROOTD_MON_REDSID = 0xf0; // Server Identification
Expand All @@ -129,6 +129,11 @@ const kXR_char XROOTD_MON_RMDIR = 0x0c;
const kXR_char XROOTD_MON_STAT = 0x0d;
const kXR_char XROOTD_MON_TRUNC = 0x0e;

// Note - these were added later on to support redirection on reads.
const kXR_char XROOTD_RMON_READ = 0x0f;
const kXR_char XROOTD_RMON_READV = 0x21;
const kXR_char XROOTD_RMON_WRITE = 0x22;

const kXR_char XROOTD_MON_FORCED = 0x01;
const kXR_char XROOTD_MON_BOUNDP = 0x02;

Expand Down
14 changes: 12 additions & 2 deletions src/XrdXrootd/XrdXrootdXeq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,13 @@ int XrdXrootdProtocol::do_Read()
return do_ReadAll();
}

#define DO_PREREAD(offset, amount) \
if (myFile->prEnabled) \
{int prc = myFile->XrdSfsp->read(myOffset, myIOLen); \
if (prc == SFS_REDIRECT) \
return fsError(prc, XROOTD_RMON_READ, myFile->XrdSfsp->error, myFile->XrdSfsp->FName()); \
}

/******************************************************************************/
/* d o _ R e a d A l l */
/******************************************************************************/
Expand All @@ -1667,6 +1674,7 @@ int XrdXrootdProtocol::do_ReadAll(int asyncOK)
//
if (myFile->isMMapped)
{if (myOffset >= myFile->Stats.fSize) return Response.Send();
DO_PREREAD(myOffset, myIOLen);
if (myOffset+myIOLen <= myFile->Stats.fSize)
{myFile->Stats.rdOps(myIOLen);
return Response.Send(myFile->mmAddr+myOffset, myIOLen);
Expand All @@ -1681,14 +1689,16 @@ int XrdXrootdProtocol::do_ReadAll(int asyncOK)
if (myFile->sfEnabled && myIOLen >= as_minsfsz
&& myOffset+myIOLen <= myFile->Stats.fSize)
{myFile->Stats.rdOps(myIOLen);
DO_PREREAD(myOffset, myIOLen);
return Response.Send(myFile->fdNum, myOffset, myIOLen);
}

// If we are in async mode, schedule the read to ocur asynchronously
//
if (asyncOK && myFile->AsyncMode)
{if (myIOLen >= as_miniosz && Link->UseCnt() < as_maxperlnk)
if ((rc = aio_Read()) != -EAGAIN) return rc;
if ((rc = aio_Read()) != -EAGAIN)
return rc == 0 ? 0 : fsError(rc, XROOTD_RMON_READ, myFile->XrdSfsp->error, myFile->XrdSfsp->FName());
SI->AsyncRej++;
}

Expand All @@ -1713,7 +1723,7 @@ int XrdXrootdProtocol::do_ReadAll(int asyncOK)
// Determine why we ended here
//
if (xframt == 0) return Response.Send();
return fsError(xframt, 0, myFile->XrdSfsp->error, 0);
return fsError(xframt, XROOTD_RMON_READ, myFile->XrdSfsp->error, myFile->XrdSfsp->FName());
}

/******************************************************************************/
Expand Down