Skip to content

Commit

Permalink
[Server] Make sure no temporary popens occur during error recovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Mar 11, 2018
1 parent 41662c4 commit 2c16914
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 81 deletions.
39 changes: 10 additions & 29 deletions src/XrdXrootd/XrdXrootdFile.cc
Expand Up @@ -65,18 +65,18 @@ extern XrdOucTrace *XrdXrootdTrace;
/* C o n s t r u c t o r */
/******************************************************************************/

XrdXrootdFile::XrdXrootdFile(const char *id, XrdSfsFile *fp, char mode,
char async, int sfok, struct stat *sP)
XrdXrootdFile::XrdXrootdFile(const char *id, const char *path, XrdSfsFile *fp,
char mode, bool async, int sfok, struct stat *sP)
{
static XrdSysMutex seqMutex;
struct stat buf;
off_t mmSize;

XrdSfsp = fp;
FileKey = strdup(fp->FName());
FileKey = strdup(path);
mmAddr = 0;
FileMode = mode;
AsyncMode= async;
AsyncMode= (async ? 1 : 0);
ID = id;

Stats.Init();
Expand All @@ -101,22 +101,6 @@ XrdXrootdFile::XrdXrootdFile(const char *id, XrdSfsFile *fp, char mode,
fp->stat(sP);
if (!isMMapped) Stats.fSize = static_cast<long long>(sP->st_size);
}

// Develop a unique hash for this file. The key will not be longer than 33 bytes
// including the null character. We now use the filename to avoid plugin
// vagaries. We will keep the code here commented out for now.
//
// if (sP->st_dev != 0 || sP->st_ino != 0)
// {i = bin2hex( FileKey, (char *)&sP->st_dev, sizeof(sP->st_dev));
// i = bin2hex(&FileKey[i],(char *)&sP->st_ino, sizeof(sP->st_ino));
// }
// else if (fdNum > 0)
// {strcpy( FileKey, "fdno");
// bin2hex(&FileKey[4], (char *)&fdNum, sizeof(fdNum));
// }
// else {strcpy( FileKey, "sfsp");
// bin2hex(&FileKey[4], (char *)&XrdSfsp, sizeof(XrdSfsp));
// }
}

/******************************************************************************/
Expand All @@ -125,16 +109,13 @@ XrdXrootdFile::XrdXrootdFile(const char *id, XrdSfsFile *fp, char mode,

XrdXrootdFile::~XrdXrootdFile()
{
char *fn;

if (XrdSfsp) {Locker->Unlock(this);
if (TRACING(TRACE_FS))
{if (!(fn = (char *)XrdSfsp->FName())) fn = (char *)"?";
TRACEI(FS, "closing " <<FileMode <<' ' <<fn);
}
delete XrdSfsp;
XrdSfsp = 0;
}
if (XrdSfsp)
{TRACEI(FS, "closing " <<FileMode <<' ' <<FileKey);
delete XrdSfsp;
XrdSfsp = 0;
Locker->Unlock(FileKey, FileMode);
}
if (FileKey) free(FileKey);
}

Expand Down
5 changes: 3 additions & 2 deletions src/XrdXrootd/XrdXrootdFile.hh
Expand Up @@ -60,8 +60,9 @@ XrdXrootdFileStats Stats; // File access statistics

static void Init(XrdXrootdFileLock *lp, int sfok) {Locker = lp; sfOK = sfok;}

XrdXrootdFile(const char *id, XrdSfsFile *fp, char mode='r',
char async='\0', int sfOK=0, struct stat *sP=0);
XrdXrootdFile(const char *id, const char *path, XrdSfsFile *fp,
char mode='r', bool async=false, int sfOK=0,
struct stat *sP=0);
~XrdXrootdFile();

private:
Expand Down
8 changes: 3 additions & 5 deletions src/XrdXrootd/XrdXrootdFileLock.hh
Expand Up @@ -29,17 +29,15 @@
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/

#include "XrdXrootd/XrdXrootdFile.hh"

class XrdXrootdFileLock
{
public:

virtual int Lock(XrdXrootdFile *fp, int force=0) = 0;
virtual int Lock(const char *path, char mode, bool force) = 0;

virtual void numLocks(XrdXrootdFile *fp, int &rcnt, int &wcnt) = 0;
virtual void numLocks(const char *path, int &rcnt, int &wcnt) = 0;

virtual int Unlock(XrdXrootdFile *fp) = 0;
virtual int Unlock(const char *path, char mode) = 0;

XrdXrootdFileLock() {}
virtual ~XrdXrootdFileLock() {}
Expand Down
24 changes: 12 additions & 12 deletions src/XrdXrootd/XrdXrootdFileLock1.cc
Expand Up @@ -45,8 +45,8 @@ int numReaders;
int numWriters;

XrdXrootdFileLockInfo(char mode)
{if ('r' == mode) {numReaders = 1; numWriters = 0;}
else {numReaders = 0; numWriters = 1;}
{if (mode == 'r') {numReaders = 1; numWriters = 0;}
else {numReaders = 0; numWriters = 1;}
}
~XrdXrootdFileLockInfo() {}
};
Expand Down Expand Up @@ -77,15 +77,15 @@ const char *XrdXrootdFileLock1::TraceID = "FileLock1";
/* L o c k */
/******************************************************************************/

int XrdXrootdFileLock1::Lock(XrdXrootdFile *fp, int force)
int XrdXrootdFileLock1::Lock(const char *path, char mode, bool force)
{
XrdXrootdLockFileLock locker(&LTMutex);
XrdXrootdFileLockInfo *lp;

// See if we already have a lock on this file
//
if ((lp = XrdXrootdLockTable.Find(fp->FileKey)))
{if (fp->FileMode == 'r')
if ((lp = XrdXrootdLockTable.Find(path)))
{if (mode == 'r')
{if (lp->numWriters && !force)
return -lp->numWriters;
lp->numReaders++;
Expand All @@ -99,7 +99,7 @@ int XrdXrootdFileLock1::Lock(XrdXrootdFile *fp, int force)

// Item does not exist, add it to the table
//
XrdXrootdLockTable.Add(fp->FileKey, new XrdXrootdFileLockInfo(fp->FileMode));
XrdXrootdLockTable.Add(path, new XrdXrootdFileLockInfo(mode));
return 0;
}

Expand All @@ -109,31 +109,31 @@ int XrdXrootdFileLock1::Lock(XrdXrootdFile *fp, int force)
/* */
/******************************************************************************/

void XrdXrootdFileLock1::numLocks(XrdXrootdFile *fp, int &rcnt, int &wcnt)
void XrdXrootdFileLock1::numLocks(const char *path, int &rcnt, int &wcnt)
{
XrdXrootdLockFileLock locker(&LTMutex);
XrdXrootdFileLockInfo *lp;

if (!(lp = XrdXrootdLockTable.Find(fp->FileKey))) rcnt = wcnt = 0;
if (!(lp = XrdXrootdLockTable.Find(path))) rcnt = wcnt = 0;
else {rcnt = lp->numReaders; wcnt = lp->numWriters;}
}

/******************************************************************************/
/* U n l o c k */
/******************************************************************************/

int XrdXrootdFileLock1::Unlock(XrdXrootdFile *fp)
int XrdXrootdFileLock1::Unlock(const char *path, char mode)
{
XrdXrootdLockFileLock locker(&LTMutex);
XrdXrootdFileLockInfo *lp;

// See if we already have a lock on this file
//
if (!(lp = XrdXrootdLockTable.Find(fp->FileKey))) return 1;
if (!(lp = XrdXrootdLockTable.Find(path))) return 1;

// Adjust the lock information
//
if (fp->FileMode == 'r')
if (mode == 'r')
{if (lp->numReaders == 0) return 1;
lp->numReaders--;
} else {
Expand All @@ -144,6 +144,6 @@ int XrdXrootdFileLock1::Unlock(XrdXrootdFile *fp)
// Delete the entry if we no longer need it
//
if (lp->numReaders == 0 && lp->numWriters == 0)
XrdXrootdLockTable.Del(fp->FileKey);
XrdXrootdLockTable.Del(path);
return 0;
}
7 changes: 3 additions & 4 deletions src/XrdXrootd/XrdXrootdFileLock1.hh
Expand Up @@ -30,7 +30,6 @@
/******************************************************************************/

#include "XrdSys/XrdSysPthread.hh"
#include "XrdXrootd/XrdXrootdFile.hh"
#include "XrdXrootd/XrdXrootdFileLock.hh"

// This class implements a single server per host lock manager by simply using
Expand All @@ -40,11 +39,11 @@ class XrdXrootdFileLock1 : XrdXrootdFileLock
{
public:

int Lock(XrdXrootdFile *fp, int force=0);
int Lock(const char *path, char mode, bool force);

void numLocks(XrdXrootdFile *fp, int &rcnt, int &wcnt);
void numLocks(const char *path, int &rcnt, int &wcnt);

int Unlock(XrdXrootdFile *fp);
int Unlock(const char *path, char mode);

XrdXrootdFileLock1() {}
~XrdXrootdFileLock1() {} // This object is never destroyed!
Expand Down
88 changes: 59 additions & 29 deletions src/XrdXrootd/XrdXrootdXeq.cc
Expand Up @@ -1179,16 +1179,39 @@ int XrdXrootdProtocol::do_OffloadIO()
/******************************************************************************/
/* d o _ O p e n */
/******************************************************************************/

namespace
{
struct OpenHelper
{XrdSfsFile *fp;
XrdXrootdFile *xp;
XrdXrootdFileLock *Locker;
const char *path;
char mode;
bool isOK;

OpenHelper(XrdXrootdFileLock *lkP, const char *fn)
: fp(0), xp(0), Locker(lkP), path(fn), mode(0),
isOK(false) {}

~OpenHelper() {if (!isOK)
{if (xp) delete xp;
else if (fp) delete fp;
if (mode) Locker->Unlock(path,mode);
}
}
};
}

int XrdXrootdProtocol::do_Open()
{
static XrdXrootdCallBack openCB("open file", XROOTD_MON_OPENR);
int fhandle;
int rc, mode, opts, openopts, doforce = 0, compchk = 0;
int rc, mode, opts, openopts, compchk = 0;
int popt, retStat = 0;
char *opaque, usage, ebuff[2048], opC;
bool doDig;
char *fn = argp->buff, opt[16], *op=opt, isAsync = '\0';
bool doDig, doforce = false, isAsync = false;
char *fn = argp->buff, opt[16], *op=opt;
XrdSfsFile *fp;
XrdXrootdFile *xp;
struct stat statbuf;
Expand Down Expand Up @@ -1235,9 +1258,9 @@ int XrdXrootdProtocol::do_Open()
}
if (opts & kXR_compress)
{openopts |= SFS_O_RAWIO; *op++ = 'c'; compchk = 1;}
if (opts & kXR_force) {*op++ = 'f'; doforce = 1;}
if (opts & kXR_force) {*op++ = 'f'; doforce = true;}
if ((opts & kXR_async || as_force) && !as_noaio)
{*op++ = 'a'; isAsync = '1';}
{*op++ = 'a'; isAsync = true;}
if (opts & kXR_refresh) {*op++ = 's'; openopts |= SFS_O_RESET;
SI->Bump(SI->Refresh);
}
Expand Down Expand Up @@ -1268,6 +1291,27 @@ int XrdXrootdProtocol::do_Open()
//
if (popt & XROOTDXP_NOMWCHK) openopts |= SFS_O_MULTIW;

// Construct an open helper to release resources should we exit due to an error.
//
OpenHelper oHelp(Locker, fn);

// Lock this file
//
if (!(popt & XROOTDXP_NOLK))
{if ((rc = Locker->Lock(fn, usage, doforce)))
{const char *who;
if (rc > 0) who = (rc > 1 ? "readers" : "reader");
else { rc = -rc;
who = (rc > 1 ? "writers" : "writer");
}
snprintf(ebuff, sizeof(ebuff)-1,
"%s file %s is already opened by %d %s; open denied.",
('r' == usage ? "Input" : "Output"), fn, rc, who);
eDest.Emsg("Xeq", ebuff);
return Response.Send(kXR_FileLocked, ebuff);
} else oHelp.mode = usage;
}

// Get a file object
//
if (doDig) fp = digFS->newFile(Link->ID, Monitor.Did);
Expand All @@ -1280,6 +1324,7 @@ int XrdXrootdProtocol::do_Open()
eDest.Emsg("Xeq", ebuff);
return Response.Send(kXR_NoMemory, ebuff);
}
oHelp.fp = fp;

// The open is elegible for a defered response, indicate we're ok with that
//
Expand All @@ -1290,47 +1335,31 @@ int XrdXrootdProtocol::do_Open()
//
if ((rc = fp->open(fn, (XrdSfsFileOpenMode)openopts,
(mode_t)mode, CRED, opaque)))
{rc = fsError(rc, opC, fp->error, fn, opaque); delete fp; return rc;}
{rc = fsError(rc, opC, fp->error, fn, opaque); return rc;}

// Obtain a hyper file object
//
if (!(xp=new XrdXrootdFile(Link->ID,fp,usage,isAsync,Link->sfOK,&statbuf)))
{delete fp;
snprintf(ebuff, sizeof(ebuff)-1, "Insufficient memory to open %s", fn);
xp = new XrdXrootdFile(Link->ID,fn,fp,usage,isAsync,Link->sfOK,&statbuf);
if (!xp)
{snprintf(ebuff, sizeof(ebuff)-1, "Insufficient memory to open %s", fn);
eDest.Emsg("Xeq", ebuff);
return Response.Send(kXR_NoMemory, ebuff);
}
oHelp.xp = xp;

// Serialize the link
//
Link->Serialize();
*ebuff = '\0';

// Lock this file
//
if (!(popt & XROOTDXP_NOLK) && (rc = Locker->Lock(xp, doforce)))
{const char *who;
if (rc > 0) who = (rc > 1 ? "readers" : "reader");
else { rc = -rc;
who = (rc > 1 ? "writers" : "writer");
}
snprintf(ebuff, sizeof(ebuff)-1,
"%s file %s is already opened by %d %s; open denied.",
('r' == usage ? "Input" : "Output"), fn, rc, who);
delete fp; xp->XrdSfsp = 0; delete xp;
eDest.Emsg("Xeq", ebuff);
return Response.Send(kXR_FileLocked, ebuff);
}

// Create a file table for this link if it does not have one
//
if (!FTab) FTab = new XrdXrootdFileTable(Monitor.Did);

// Insert this file into the link's file table
//
if (!FTab || (fhandle = FTab->Add(xp)) < 0)
{delete xp;
snprintf(ebuff, sizeof(ebuff)-1, "Insufficient memory to open %s", fn);
{snprintf(ebuff, sizeof(ebuff)-1, "Insufficient memory to open %s", fn);
eDest.Emsg("Xeq", ebuff);
return Response.Send(kXR_NoMemory, ebuff);
}
Expand All @@ -1339,7 +1368,7 @@ int XrdXrootdProtocol::do_Open()
//
if (doforce)
{int rdrs, wrtrs;
Locker->numLocks(xp, rdrs, wrtrs);
Locker->numLocks(fn, rdrs, wrtrs);
if (('r' == usage && wrtrs) || ('w' == usage && rdrs) || wrtrs > 1)
{snprintf(ebuff, sizeof(ebuff)-1,
"%s file %s forced opened with %d reader(s) and %d writer(s).",
Expand Down Expand Up @@ -1389,8 +1418,9 @@ int XrdXrootdProtocol::do_Open()
memcpy((void *)myResp.fhandle,(const void *)&fhandle,sizeof(myResp.fhandle));
numFiles++;

// Respond
// Respond (failure is not an option now)
//
oHelp.isOK = true;
if (retStat) return Response.Send(IOResp, 3, resplen);
else return Response.Send((void *)&myResp, resplen);
}
Expand Down

0 comments on commit 2c16914

Please sign in to comment.