Skip to content

Commit

Permalink
[Server] Make space usage maintenance fully thread safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Jun 17, 2020
1 parent 960cbc5 commit 3afb8aa
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 27 deletions.
106 changes: 91 additions & 15 deletions src/XrdOss/XrdOssSpace.cc
Expand Up @@ -30,6 +30,7 @@

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <errno.h>
#include <stddef.h>
Expand All @@ -42,6 +43,7 @@
#include "XrdOuc/XrdOucStream.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdSys/XrdSysFD.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSys/XrdSysPthread.hh"

Expand All @@ -55,20 +57,30 @@ extern XrdSysError OssEroute;

const char *XrdOssSpace::qFname = 0;
const char *XrdOssSpace::uFname = 0;
const char *XrdOssSpace::uUname = 0;
XrdOssSpace::uEnt XrdOssSpace::uData[XrdOssSpace::maxEnt];
short XrdOssSpace::uDvec[XrdOssSpace::maxEnt] = {0};
int XrdOssSpace::fencEnt = 0;
int XrdOssSpace::freeEnt =-1;
int XrdOssSpace::aFD =-1;
int XrdOssSpace::uAdj = 0;
int XrdOssSpace::uSync = 0;
int XrdOssSpace::Solitary = 0;
time_t XrdOssSpace::lastMtime = 0;
time_t XrdOssSpace::lastUtime = 0;

namespace
{
XrdSysMutex uMutex;
}

/******************************************************************************/
/* A d j u s t */
/******************************************************************************/

void XrdOssSpace::Adjust(int Gent, off_t Space, sType stNum)
{
XrdSysMutexHelper uHelp(uMutex);
int offset, unlk = 0;
int uOff = offsetof(uEnt,Bytes[0]) + (sizeof(long long)*stNum);

Expand Down Expand Up @@ -109,6 +121,17 @@ void XrdOssSpace::Adjust(int Gent, off_t Space, sType stNum)
if (pwrite(aFD, &uData[Gent].Bytes[stNum], ULen, offset) < 0)
OssEroute.Emsg("Adjust", errno, "update usage file", uFname);

// Update the time this occurred if we are not a server
//
if (stNum != Serv) utimes(uUname, 0);

// Check if we need to sync the file
//
if (uSync)
{uAdj++;
if (uAdj >= uSync) {fsync(aFD); uAdj = 0;}
}

// Unlock the file if we locked it
//
if (unlk) UsageLock(0);
Expand All @@ -128,7 +151,10 @@ void XrdOssSpace::Adjust(const char *GName, off_t Space, sType stNum)
/******************************************************************************/
/* A s s i g n */
/******************************************************************************/


// This is called during initialization and only needs a file lock if the
// file is going to be updated. No local mutex is needed.
//
int XrdOssSpace::Assign(const char *GName, long long &Usage)
{
off_t offset;
Expand Down Expand Up @@ -199,7 +225,7 @@ int XrdOssSpace::Init() {return (uFname ? haveUsage:0) | (qFname ? haveQuota:0);

/******************************************************************************/

int XrdOssSpace::Init(const char *aPath, const char *qPath, int isSOL)
int XrdOssSpace::Init(const char *aPath, const char *qPath, int isSOL, int us)
{
static const mode_t theMode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
struct stat buf;
Expand Down Expand Up @@ -235,8 +261,21 @@ int XrdOssSpace::Init(const char *aPath, const char *qPath, int isSOL)
}
strcpy(aP, ".Usage");
uFname = strdup(buff);
strncat(buff, ".upd", sizeof(buff));
uUname = strdup(buff);
XrdOucEnv::Export("XRDOSSUSAGEFILE", uFname);

// Create the usage update file if it does not exist
//
if ((i = open(uUname, O_CREAT|O_TRUNC|O_RDWR, theMode)) < 0)
{OssEroute.Emsg("Init", errno, "create", uUname);
return 0;
} else {
if (!fstat(i, &buf)) lastUtime = buf.st_mtime;
close(i);
utimes(uUname, 0);
}

// First check if the file really exists, if not, create it
//
if (stat(uFname, &buf))
Expand All @@ -248,9 +287,14 @@ int XrdOssSpace::Init(const char *aPath, const char *qPath, int isSOL)
{OssEroute.Emsg("Init", uFname, "has invalid size."); return 0;}
else opts = 0;

// Handle synchornization
//
if (us > 1) uSync = us;
else opts |= O_DSYNC;

// Open the target file
//
if ((aFD = open(uFname, opts|O_RDWR|O_SYNC, theMode)) < 0)
if ((aFD = XrdSysFD_Open(uFname, opts|O_RDWR, theMode)) < 0)
{OssEroute.Emsg("Init", errno, "open", uFname);
return 0;
}
Expand Down Expand Up @@ -363,22 +407,30 @@ int XrdOssSpace::Quotas()

int XrdOssSpace::Readjust()
{
static time_t lastUtime = 0;
XrdSysMutexHelper uHelp(uMutex);
struct stat buf;
int k, rwsz, updt = 0;

// Sync the usage file if need be
//
if (uSync && uAdj)
{uAdj = 0;
if (fsync(aFD))
OssEroute.Emsg("Readjust", errno, "sync usage file", uFname);
}

// No readjustment needed if we are not a server or we have nothing
//
if (fencEnt <= 0) return 0;
if (!fstat(aFD, &buf))
if (!stat(uUname, &buf))
{if (buf.st_mtime == lastUtime) return 0;
lastUtime = buf.st_mtime;
}
rwsz = sizeof(uEnt)*(uDvec[fencEnt-1] + 1);

// Lock the file
//
UsageLock();
if (!UsageLock()) return 0;

// Read the file again
//
Expand All @@ -393,8 +445,12 @@ int XrdOssSpace::Readjust()

// If we need to rewrite the data, do so
//
if (updt && pwrite(aFD, uData, rwsz, 0) < 0)
OssEroute.Emsg("Readjust", errno, "rewrite", uFname);
if (updt)
{if (pwrite(aFD, uData, rwsz, 0) < 0)
OssEroute.Emsg("Readjust", errno, "rewrite", uFname);
else if (uSync && fsync(aFD))
OssEroute.Emsg("Readjust", errno, "sync usage file", uFname);
}

// All done
//
Expand All @@ -410,9 +466,14 @@ int XrdOssSpace::Readjust(int i)
// Check if any readjustment is needed
//
if (uData[i].Bytes[Pstg] || uData[i].Bytes[Purg] || uData[i].Bytes[Admin])
{uData[i].Bytes[Serv] = uData[i].Bytes[Serv] + uData[i].Bytes[Pstg]
{long long oldVal = uData[i].Bytes[Serv];
char buff[256];
uData[i].Bytes[Serv] = uData[i].Bytes[Serv] + uData[i].Bytes[Pstg]
- uData[i].Bytes[Purg] + uData[i].Bytes[Admin];
uData[i].Bytes[Pstg] = uData[i].Bytes[Purg] = uData[i].Bytes[Admin] = 0;
snprintf(buff, sizeof(buff), "%lld to %lld bytes",
oldVal, uData[i].Bytes[Serv]);
OssEroute.Emsg("Readjust",uData[i].gName,"space usage adjusted from",buff);
return 1;
}
return 0;
Expand Down Expand Up @@ -456,9 +517,24 @@ int XrdOssSpace::Unassign(const char *GName)
/******************************************************************************/
/* U s a g e */
/******************************************************************************/

long long XrdOssSpace::Usage(int gent)
{
long long retVal;

// Safelu get the value and return it
//
uMutex.Lock();
retVal = (gent < 0 || gent >= maxEnt ? 0 : uData[gent].Bytes[Serv]);
uMutex.UnLock();
return retVal;
}

/******************************************************************************/

long long XrdOssSpace::Usage(const char *GName, struct uEnt &uVal, int rrd)
{
XrdSysMutexHelper uHelp(uMutex);
int i, rwsz;

// If we need to re-read the file, do so
Expand Down Expand Up @@ -490,6 +566,11 @@ long long XrdOssSpace::Usage(const char *GName, struct uEnt &uVal, int rrd)
/******************************************************************************/
/* private: U s a g e L o c k */
/******************************************************************************/

// Warning: The uMutex must be held when calling this method as it is the
// only thing that allows file locking to be effective in an MT environment!
// There is no need to hold the mutex when MT execution has not yet started
// such as during initialization sequencing.

int XrdOssSpace::UsageLock(int Dolock)
{
Expand All @@ -504,15 +585,10 @@ int XrdOssSpace::UsageLock(int Dolock)
if (Dolock) {lock_args.l_type = F_WRLCK; What = "lock";}
else {lock_args.l_type = F_UNLCK; What = "unlock";}

// First obtain the usage mutex or unlock it
//
if (Dolock) uMutex.Lock();
else uMutex.UnLock();

// Perform action.
//
do {rc = fcntl(aFD,F_SETLKW,&lock_args);} while(rc < 0 && errno == EINTR);
if (rc < 0) {OssEroute.Emsg("UpdateLock", errno, What, uFname); return 0;}
if (rc < 0) {OssEroute.Emsg("UsageLock", errno, What, uFname); return 0;}

// All done
//
Expand Down
25 changes: 13 additions & 12 deletions src/XrdOss/XrdOssSpace.hh
Expand Up @@ -35,6 +35,7 @@ class XrdSysError;
class XrdOssSpace
{
public:
friend class XrdOssCache;

enum sType {Serv = 0, Pstg = 1, Purg = 2, Admin = 3,
RsvA = 4, RsvB = 5, RsvC = 6, addT = 7,
Expand All @@ -47,37 +48,33 @@ static void Adjust(int Gent, off_t Space, sType=Serv);

static void Adjust(const char *GName, off_t Space, sType=Serv);

static int Assign(const char *GName, long long &bytesUsed);

static const int haveUsage = 1;
static const int haveQuota = 2;

static int Init(); // Return the "or" of havexxxx (above)

static int Init(const char *aPath, const char *qFile, int isSOL);
static int Init(const char *aPath,const char *qFile,int isSOL,int us=0);

static int Quotas();

static int Readjust();

static void Refresh();

static int Unassign(const char *GName);

static long long Usage(int gent) {return (gent < 0 || gent >= maxEnt
? 0 : uData[gent].Bytes[Serv]);}

XrdOssSpace() {} // Everything is static
~XrdOssSpace() {} // Never gets deleted
static long long Usage(int gent);

struct uEnt {char gName[minSNbsz];
long long Bytes[Totn]; // One of sType, above
};

static long long Usage(const char *GName, struct uEnt &uVal, int rrd=0);

XrdOssSpace() {} // Everything is static
~XrdOssSpace() {} // Never gets deleted

private:

static int Assign(const char *GName, long long &bytesUsed);
static int findEnt(const char *GName);
static int Readjust();
static int Readjust(int);
static int UsageLock(int Dolock=1);

Expand All @@ -87,12 +84,16 @@ static const int maxEnt = DataSz/sizeof(uEnt);

static const char *qFname;
static const char *uFname;
static const char *uUname;
static uEnt uData[maxEnt];
static short uDvec[maxEnt];
static time_t lastMtime;
static time_t lastUtime;
static int fencEnt;
static int freeEnt;
static int aFD;
static int uSync;
static int uAdj;
static int Solitary;
};
#endif

0 comments on commit 3afb8aa

Please sign in to comment.