Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update xrootfs to work with XrdEC faster #1617

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/XrdCl/XrdClPlugInManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ namespace XrdCl
#ifdef WITH_XRDEC
if( lib == "XrdEcDefault" )
{
setenv("XRDCL_EC", "True", 1);
auto itr = config.find( "nbdta" );
if( itr == config.end() )
return std::make_pair<XrdOucPinLoader*, PlugInFactory*>( nullptr, nullptr );
Expand All @@ -372,6 +371,11 @@ namespace XrdCl
if( itr != config.end() )
Utils::splitString( plgr, itr->second, "," );

std::string xrdclECenv = std::to_string(nbdta) + "," +
std::to_string(nbprt) + "," +
std::to_string(chsz);
setenv("XRDCL_EC", xrdclECenv.c_str(), 1);

EcPlugInFactory *ecHandler = new EcPlugInFactory( nbdta, nbprt, chsz, std::move( plgr ) );
return std::make_pair<XrdOucPinLoader*, PlugInFactory*>( nullptr, ecHandler );
}
Expand Down
22 changes: 21 additions & 1 deletion src/XrdFfs/XrdFfsPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
#include "XrdFfs/XrdFfsDent.hh"
#include "XrdFfs/XrdFfsQueue.hh"

#include "XrdCl/XrdClFileSystem.hh"
#include "XrdCl/XrdClFile.hh"
#include "XrdCl/XrdClURL.hh"
#include "XrdCl/XrdClXRootDResponses.hh"

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -91,7 +96,22 @@ int XrdFfsPosix_closedir(DIR *dirp)

int XrdFfsPosix_mkdir(const char *path, mode_t mode)
{
return XrdPosixXrootd::Mkdir(path, mode);
XrdCl::URL url(path);
std::string dir = url.GetPath();
XrdCl::LocationInfo *info = nullptr;
XrdCl::FileSystem fs(path);

XrdCl::XRootDStatus st = fs.DeepLocate("*", XrdCl::OpenFlags::None, info );
std::unique_ptr<XrdCl::LocationInfo> ptr( info );

if( !st.IsOK() )
{
errno = ENOENT;
return -1;
}
std::string nodeUrl = "root://" + info->At(0).GetAddress() + "/" + dir;

return XrdPosixXrootd::Mkdir(nodeUrl.c_str(), mode);
}

int XrdFfsPosix_rmdir(const char *path)
Expand Down
135 changes: 131 additions & 4 deletions src/XrdFfs/XrdFfsWcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
a big_writes option to allow > 4KByte writing. It will make this
smiple write caching obsolete.
*/
#define XrdFfsWcacheBufsize 131072

#if defined(__linux__)
/* For pread()/pwrite() */
Expand All @@ -63,10 +62,14 @@
extern "C" {
#endif

ssize_t XrdFfsRcacheBufsize;
ssize_t XrdFfsWcacheBufsize = 131072;

struct XrdFfsWcacheFilebuf {
off_t offset;
size_t len;
char *buf;
size_t bufsize;
pthread_mutex_t *mlock;
};

Expand Down Expand Up @@ -100,9 +103,23 @@ void XrdFfsWcache_init(int basefd, int maxfd)
XrdFfsWcacheFbufs[fd].buf = NULL;
XrdFfsWcacheFbufs[fd].mlock = NULL;
}
if (!getenv("XRDCL_EC"))
{
XrdFfsRcacheBufsize = 1024 * 128;
}
else
{
char *savptr;
int nbdat = atoi(strtok_r(getenv("XRDCL_EC"), ",", &savptr));
strtok_r(NULL, ",", &savptr);
int chsz = atoi(strtok_r(NULL, ",", &savptr));
XrdFfsRcacheBufsize = nbdat * chsz;
}
if (getenv("XROOTDFS_WCACHESZ"))
XrdFfsRcacheBufsize = atoi(getenv("XROOTDFS_WCACHESZ"));
}

int XrdFfsWcache_create(int fd)
int XrdFfsWcache_create(int fd, int flags)
/* Create a write cache buffer for a given file descriptor
*
* fd: file descriptor
Expand All @@ -116,12 +133,30 @@ int XrdFfsWcache_create(int fd)

XrdFfsWcacheFbufs[fd].offset = 0;
XrdFfsWcacheFbufs[fd].len = 0;
XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
// "flag & O_RDONLY" is not equivalant to ! (flags & O_RDWR) && ! (flags & O_WRONLY)
if ( ! (flags & O_RDWR) &&
! (flags & O_WRONLY) &&
(flags & O_DIRECT) ) // Limit the usage scenario of the read cache
{
XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsRcacheBufsize);
XrdFfsWcacheFbufs[fd].bufsize = XrdFfsRcacheBufsize;
}
else
{
XrdFfsWcacheFbufs[fd].buf = (char*)malloc(XrdFfsWcacheBufsize);
XrdFfsWcacheFbufs[fd].bufsize = XrdFfsWcacheBufsize;
}
if (XrdFfsWcacheFbufs[fd].buf == NULL)
{
errno = ENOMEM;
return 0;
}
XrdFfsWcacheFbufs[fd].mlock = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
if (XrdFfsWcacheFbufs[fd].mlock == NULL)
{
errno = ENOMEM;
return 0;
}
errno = pthread_mutex_init(XrdFfsWcacheFbufs[fd].mlock, NULL);
if (errno)
return 0;
Expand Down Expand Up @@ -164,6 +199,98 @@ ssize_t XrdFfsWcache_flush(int fd)
return rc;
}

/*
struct fd_n_offset {
int fd;
off_t offset;
fd_n_offset(int myfd, off_t myoffset) : fd(myfd), offset(myoffset) {}
};

void *XrdFfsWcache_updateReadCache(void *x)
{
struct fd_n_offset *a = (struct fd_n_offset*) x;
size_t bufsize = XrdFfsWcacheFbufs[a->fd].bufsize;

pthread_mutex_lock(XrdFfsWcacheFbufs[a->fd].mlock);
XrdFfsWcacheFbufs[a->fd].offset = (a->offset / bufsize) * bufsize;
XrdFfsWcacheFbufs[a->fd].len = XrdFfsPosix_pread(a->fd + XrdFfsPosix_baseFD,
XrdFfsWcacheFbufs[a->fd].buf,
bufsize,
XrdFfsWcacheFbufs[a->fd].offset);
pthread_mutex_unlock(XrdFfsWcacheFbufs[a->fd].mlock);
return NULL;
}
*/

// this is a read cache
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset)
{
ssize_t rc;
fd -= XrdFfsPosix_baseFD;
if (fd < 0)
{
errno = EBADF;
return -1;
}

char *bufptr;
size_t bufsize = XrdFfsWcacheFbufs[fd].bufsize;

pthread_mutex_lock(XrdFfsWcacheFbufs[fd].mlock);

// identity which block to cache
if (XrdFfsWcacheFbufs[fd].len == 0 ||
(offset / bufsize != XrdFfsWcacheFbufs[fd].offset / bufsize))
{
XrdFfsWcacheFbufs[fd].offset = (offset / bufsize) * bufsize;
XrdFfsWcacheFbufs[fd].len = XrdFfsPosix_pread(fd + XrdFfsPosix_baseFD,
XrdFfsWcacheFbufs[fd].buf,
bufsize,
XrdFfsWcacheFbufs[fd].offset);
} // when XrdFfsWcacheFbufs[fd].len < bufsize, the block is partially cached.


// fetch data from the cache, up to the block's upper boundary.
if (XrdFfsWcacheFbufs[fd].offset <= offset &&
offset < XrdFfsWcacheFbufs[fd].offset + (off_t)XrdFfsWcacheFbufs[fd].len)
{ // read from cache,
//----------------------------------------------------------
// FUSE doesn't like this block of the code, unless direct_io is enabled, or
// O_DIRECT flags is used. Otherwise, FUSES will stop reading prematurely
// when two processes read the same file at the same time.
bufptr = &XrdFfsWcacheFbufs[fd].buf[offset - XrdFfsWcacheFbufs[fd].offset];
rc = (len < XrdFfsWcacheFbufs[fd].len - (offset - XrdFfsWcacheFbufs[fd].offset))?
len : XrdFfsWcacheFbufs[fd].len - (offset - XrdFfsWcacheFbufs[fd].offset);
memcpy(buf, bufptr, rc);
//----------------------------------------------------------
}
else
{ // offset fall into the uncached part of the partically cached block
rc = XrdFfsPosix_pread(fd + XrdFfsPosix_baseFD, buf, len, offset);
}
pthread_mutex_unlock(XrdFfsWcacheFbufs[fd].mlock);
/*
// prefetch the next block
if ( (offset + rc) ==
(XrdFfsWcacheFbufs[fd].offset + bufsize) )
{
pthread_t thread;
pthread_attr_t attr;
//size_t stacksize = 4*1024*1024;

pthread_attr_init(&attr);
//pthread_attr_setstacksize(&attr, stacksize);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

struct fd_n_offset nextblock(fd, (offset + bufsize));
if (! pthread_create(&thread, &attr, XrdFfsWcache_updateReadCache, &nextblock))
pthread_detach(thread);
pthread_attr_destroy(&attr);
}
*/
return rc;
}

ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
{
ssize_t rc;
Expand All @@ -176,7 +303,7 @@ ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset)
}

/* do not use caching under these cases */
if (len > XrdFfsWcacheBufsize/2 || fd >= XrdFfsWcacheNFILES)
if (len > (size_t)(XrdFfsWcacheBufsize/2) || fd >= XrdFfsWcacheNFILES)
{
rc = XrdFfsPosix_pwrite(fd + XrdFfsPosix_baseFD, buf, len, offset);
return rc;
Expand Down
3 changes: 2 additions & 1 deletion src/XrdFfs/XrdFfsWcache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
#endif

void XrdFfsWcache_init(int basefd, int maxfd);
int XrdFfsWcache_create(int fd);
int XrdFfsWcache_create(int fd, int flags);
void XrdFfsWcache_destroy(int fd);
ssize_t XrdFfsWcache_flush(int fd);
ssize_t XrdFfsWcache_pread(int fd, char *buf, size_t len, off_t offset);
ssize_t XrdFfsWcache_pwrite(int fd, char *buf, size_t len, off_t offset);

#ifdef __cplusplus
Expand Down
35 changes: 27 additions & 8 deletions src/XrdFfs/XrdFfsXrootdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ static void* xrootdfs_init(struct fuse_conn_info *conn)
free(pwbuf);

/* put Xrootd related initialization calls here, after fuse daemonize itself. */
if (getenv("XRDCL_EC")) usingEC = true;
XrdPosixXrootd *abc = new XrdPosixXrootd(-xrootdfs.maxfd);
XrdFfsMisc_xrd_init(xrootdfs.rdr,xrootdfs.urlcachelife,0);
XrdFfsWcache_init(abc->fdOrigin(), xrootdfs.maxfd);
Expand Down Expand Up @@ -451,7 +450,7 @@ static int xrootdfs_create(const char *path, mode_t mode, struct fuse_file_info
res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY, true, &fd);
if (res < 0) return res;
fi->fh = fd;
XrdFfsWcache_create(fd); // Unlike mknod and like open, prepare wcache.
XrdFfsWcache_create(fd, fi->flags); // Unlike mknod and like open, prepare wcache.
if (xrootdfs.cns != NULL)
{
xrootdfs_do_create(path, xrootdfs.cns, O_CREAT | O_EXCL, false, &fd);
Expand Down Expand Up @@ -791,7 +790,7 @@ static int xrootdfs_open(const char *path, struct fuse_file_info *fi)

fi->fh = fd;
// be careful, 0 means error for this function
if (XrdFfsWcache_create(fi->fh))
if (XrdFfsWcache_create(fi->fh, fi->flags))
return 0;
else
return -errno;
Expand All @@ -804,7 +803,7 @@ static int xrootdfs_read(const char *path, char *buf, size_t size, off_t offset,
int res;

fd = (int) fi->fh;
XrdFfsWcache_flush(fd); /* in case is the file is reading/writing */
if (fi->flags & O_RDWR) XrdFfsWcache_flush(fd);

if (usingEC)
{
Expand All @@ -817,11 +816,18 @@ static int xrootdfs_read(const char *path, char *buf, size_t size, off_t offset,
return 0;

size = (size_t)(fsize - offset) > size ? size : fsize - offset;
// Restrict the use of read cache to O_DIRECT use case
// See comment in XRdFfsWcache_pread()
if ( ! (fi->flags & O_RDWR) && (fi->flags & O_DIRECT) )
res = XrdFfsWcache_pread(fd, buf, size, offset);
else
res = XrdFfsPosix_pread(fd, buf, size, offset);
}
else
res = XrdFfsPosix_pread(fd, buf, size, offset);

res = XrdFfsPosix_pread(fd, buf, size, offset);
if (res == -1)
res = -errno;
res = -errno;

return res;
}
Expand Down Expand Up @@ -1253,6 +1259,7 @@ static void xrootdfs_usage(const char *progname)
"\n"
"Default options:\n"
" fsname=xrootdfs,allow_other,max_write=131072,attr_timeout=10,entry_timeout=10,negative_timeout=5\n"
" In case of an Erasure Encoding storage, entry_timeout=0\n"
"\n"
"[Required]\n"
" -o rdr=redirector_url root URL of the Xrootd redirector\n"
Expand Down Expand Up @@ -1328,13 +1335,25 @@ int main(int argc, char *argv[])
/* Define XrootdFS options */
char **cmdline_opts;

if (getenv("XRDCL_EC")) usingEC = true;

cmdline_opts = (char **) malloc(sizeof(char*) * (argc -1 + 3));
cmdline_opts[0] = argv[0];
cmdline_opts[1] = strdup("-o");
if (getenv("XROOTDFS_NO_ALLOW_OTHER") != NULL && ! strcmp(getenv("XROOTDFS_NO_ALLOW_OTHER"),"1") )
cmdline_opts[2] = strdup("fsname=xrootdfs,max_write=131072,attr_timeout=10,entry_timeout=10,negative_timeout=5");
{
if (! usingEC)
cmdline_opts[2] = strdup("fsname=xrootdfs,max_write=131072,attr_timeout=10,entry_timeout=10,negative_timeout=5");
else
cmdline_opts[2] = strdup("fsname=xrootdfs,max_write=131072,attr_timeout=10,entry_timeout=0,negative_timeout=5");
}
else
cmdline_opts[2] = strdup("fsname=xrootdfs,allow_other,max_write=131072,attr_timeout=10,entry_timeout=10,negative_timeout=5");
{
if (! usingEC)
cmdline_opts[2] = strdup("fsname=xrootdfs,allow_other,max_write=131072,attr_timeout=10,entry_timeout=10,negative_timeout=5");
else
cmdline_opts[2] = strdup("fsname=xrootdfs,allow_other,max_write=131072,attr_timeout=10,entry_timeout=0,negative_timeout=5");
}

for (int i = 1; i < argc; i++)
cmdline_opts[i+2] = argv[i];
Expand Down