From ac72f29bc8e04254fe447b4c7554854974f5be3a Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Wed, 1 Dec 2021 02:04:07 -0800 Subject: [PATCH 1/2] trying EC enhancement --- src/XrdCl/XrdClEcHandler.hh | 35 +++++++- src/XrdCl/XrdClPlugInManager.cc | 1 + src/XrdEc/XrdEcReader.cc | 1 + src/XrdEc/XrdEcStrmWriter.cc | 12 ++- src/XrdFfs/XrdFfsPosix.cc | 4 + src/XrdFfs/XrdFfsXrootdfs.cc | 5 +- src/XrdPosix/XrdPosixXrootd.cc | 146 +++++++++++++++++++++++++++++++- 7 files changed, 196 insertions(+), 8 deletions(-) diff --git a/src/XrdCl/XrdClEcHandler.hh b/src/XrdCl/XrdClEcHandler.hh index f324036cf0a..79a624fc980 100644 --- a/src/XrdCl/XrdClEcHandler.hh +++ b/src/XrdCl/XrdClEcHandler.hh @@ -243,16 +243,49 @@ namespace XrdCl inline XRootDStatus LoadPlacement( const std::string &path ) { LocationInfo *info = nullptr; - XRootDStatus st = fs.DeepLocate( path, OpenFlags::None, info ); + XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info ); std::unique_ptr ptr( info ); if( !st.IsOK() ) return st; + // The following check become meaningless if( info->GetSize() < objcfg->nbdata ) return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." ); + + uint64_t verNumMax = 0; + std::vector verNums; + std::vector xattrkeys; + std::vector xattrvals; + xattrkeys.push_back("xrdec.chunkver"); for( size_t i = 0; i < info->GetSize(); ++i ) { + FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress()); + xattrvals.clear(); + st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0); + if (! xattrvals[0].value.empty()) + { + std::stringstream sstream(xattrvals[0].value); + uint64_t verNum; + sstream >> verNum; + verNums.push_back(verNum); + if (verNum > verNumMax) + verNumMax = verNum; + } + else + verNums.push_back(0); + delete fs_i; + } + + int n = 0; + for( size_t i = 0; i < info->GetSize(); ++i ) + { + if ( verNums.at(i) != verNumMax ) + continue; + else + n++; auto &location = info->At( i ); objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' ); } + if (n < objcfg->nbdata ) + return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." ); return XRootDStatus(); } diff --git a/src/XrdCl/XrdClPlugInManager.cc b/src/XrdCl/XrdClPlugInManager.cc index deb40f507dd..0e9672d4fa3 100644 --- a/src/XrdCl/XrdClPlugInManager.cc +++ b/src/XrdCl/XrdClPlugInManager.cc @@ -354,6 +354,7 @@ namespace XrdCl #ifdef WITH_XRDEC if( lib == "XrdEcDefault" ) { + setenv("XRDCL_EC", "True", 1); auto itr = config.find( "nbdta" ); if( itr == config.end() ) return std::make_pair( nullptr, nullptr ); diff --git a/src/XrdEc/XrdEcReader.cc b/src/XrdEc/XrdEcReader.cc index 589e0200bc4..b33e69aca35 100644 --- a/src/XrdEc/XrdEcReader.cc +++ b/src/XrdEc/XrdEcReader.cc @@ -594,6 +594,7 @@ namespace XrdEc { // generate the file name (blknb/strpnb) std::string fn = objcfg.GetFileName( blknb, strpnb ); + fn.replace(0, fn.rfind("/")+1, ""); // if the block/stripe does not exist it means we are reading passed the end of the file auto itr = urlmap.find( fn ); if( itr == urlmap.end() ) diff --git a/src/XrdEc/XrdEcStrmWriter.cc b/src/XrdEc/XrdEcStrmWriter.cc index 4f02d043e6b..aab62baf754 100644 --- a/src/XrdEc/XrdEcStrmWriter.cc +++ b/src/XrdEc/XrdEcStrmWriter.cc @@ -184,7 +184,9 @@ namespace XrdEc //----------------------------------------------------------------------- // Create the Write request //----------------------------------------------------------------------- - XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >> + std::string basefn = fn; + basefn.replace(0,basefn.rfind("/")+1,""); + XrdCl::Pipeline p = XrdCl::AppendFile( zip, basefn, crc32c, strpsize, strpbuff ) >> [=]( XrdCl::XRootDStatus &st ) mutable { //------------------------------------------------ @@ -237,10 +239,12 @@ namespace XrdEc std::string fn = std::to_string( i ); // file name (URL of the data archive) buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive) uint32_t cksum = objcfg.digest( 0, buff.data(), buff.size() ); // digest (crc) of the buffer - lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer + std::string basefn = fn; + basefn.replace(0,basefn.rfind("/")+1,""); + lfhs.emplace_back( basefn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer LFH &lfh = lfhs.back(); cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer - offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset + offset += LFH::lfhBaseSize + basefn.size() + buff.size(); // shift the offset cdsize += cdfhs.back().cdfhSize; // update central directory size buffs.emplace_back( std::move( buff ) ); // keep the buffer for later } @@ -294,6 +298,7 @@ namespace XrdEc std::vector closes; std::vector save_metadata; closes.reserve( size ); + std::string closeTime = std::to_string( time(NULL) ); for( size_t i = 0; i < size; ++i ) { //----------------------------------------------------------------------- @@ -303,6 +308,7 @@ namespace XrdEc { std::string size( std::to_string( GetSize() ) ); XrdCl::Pipeline p = XrdCl::SetXAttr( dataarchs[i]->GetFile(), "xrdec.filesize", size ) + | XrdCl::SetXAttr( dataarchs[i]->GetFile(), "xrdec.chunkver", closeTime.c_str() ) | XrdCl::CloseArchive( *dataarchs[i] ); closes.emplace_back( std::move( p ) ); } diff --git a/src/XrdFfs/XrdFfsPosix.cc b/src/XrdFfs/XrdFfsPosix.cc index b7cd20185c4..a8157970a33 100644 --- a/src/XrdFfs/XrdFfsPosix.cc +++ b/src/XrdFfs/XrdFfsPosix.cc @@ -800,8 +800,11 @@ int XrdFfsPosix_statall(const char *rdrurl, const char *path, struct stat *stbuf res = -1; errno = ENOENT; for (i = 0; i < nurls; i++) + { + time_t max_mtime = 0; if (res_i[i] == 0) { + if (stbuf_i[i].st_mtime <= max_mtime) continue; res = 0; errno = 0; memcpy((void*)stbuf, (void*)(&stbuf_i[i]), sizeof(struct stat)); @@ -813,6 +816,7 @@ int XrdFfsPosix_statall(const char *rdrurl, const char *path, struct stat *stbuf errno = ETIMEDOUT; syslog(LOG_WARNING, "WARNING: stat(%s) failed (connection timeout)", newurls[i]); } + } for (i = 0; i < nurls; i++) free(newurls[i]); diff --git a/src/XrdFfs/XrdFfsXrootdfs.cc b/src/XrdFfs/XrdFfsXrootdfs.cc index 1c684fdf770..8afb5d5374a 100644 --- a/src/XrdFfs/XrdFfsXrootdfs.cc +++ b/src/XrdFfs/XrdFfsXrootdfs.cc @@ -442,7 +442,10 @@ static int xrootdfs_create(const char *path, mode_t mode, struct fuse_file_info int res, fd; if (!S_ISREG(mode)) return -EPERM; - res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY, true, &fd); + if (getenv("XRDCL_EC")) + res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY | O_EXCL, true, &fd); + else + res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY, true, &fd); if (res < 0) return res; fi->fh = fd; XrdFfsWcache_create(fd); // Unlike mknod and like open, prepare wcache. diff --git a/src/XrdPosix/XrdPosixXrootd.cc b/src/XrdPosix/XrdPosixXrootd.cc index 517dc07ecb4..3635e3b8d40 100644 --- a/src/XrdPosix/XrdPosixXrootd.cc +++ b/src/XrdPosix/XrdPosixXrootd.cc @@ -1147,7 +1147,7 @@ int XrdPosixXrootd::Stat(const char *path, struct stat *buf) // Check if we can get the stat informatation from the cache // - if (XrdPosixGlobals::theCache) + if (! getenv("XRDCL_EC") && XrdPosixGlobals::theCache) {LfnPath statX("stat", path, false); if (!statX.path) return -1; int rc = XrdPosixGlobals::theCache->Stat(statX.path, *buf); @@ -1157,7 +1157,107 @@ int XrdPosixXrootd::Stat(const char *path, struct stat *buf) // Issue the stat and verify that all went well // - if (!admin.Stat(*buf)) return -1; + if (! getenv("XRDCL_EC")) + { + if (!admin.Stat(*buf)) return -1; + } + else + { + XrdCl::URL url(path); + std::string file = url.GetPath(); + XrdCl::LocationInfo *info = nullptr; + XrdCl::FileSystem fs(path); + + std::vector xattrkeys; + std::vector xattrvals; + xattrkeys.push_back("xrdec.chunkver"); + xattrkeys.push_back("xrdec.filesize"); + + XrdCl::Buffer queryArgs(5), *queryResp; + queryArgs.FromString("role"); + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, queryArgs, queryResp); + // xrootdfs call this function with individual servers. but we can only do + // // fs.DeepLocate("*"...) agaist a redirector + if (!st.IsOK() || queryResp->ToString() == "server") + { + delete queryResp; + if (!admin.Stat(*buf)) + return -1; + else + { + st = fs.GetXAttr(file, xattrkeys, xattrvals, 0); + if (! xattrvals[0].value.empty()) + { + std::stringstream sstream0(xattrvals[0].value); + sstream0 >> buf->st_mtime; + std::stringstream sstream1(xattrvals[0].value); + sstream1 >> buf->st_size; + } + return 0; + } + } + else + delete queryResp; + + st = fs.DeepLocate("*", XrdCl::OpenFlags::None, info ); + std::unique_ptr ptr( info ); + if( !st.IsOK() ) + { + errno = ENOENT; + return -1; + } + + int found = 0; + uint64_t verNumMax = 0; + struct stat buf_i; + XrdPosixConfig::initStat(&buf_i); + for( size_t i = 0; i < info->GetSize(); ++i ) + { + std::string url_i = "root://" + info->At(i).GetAddress() + "/" + file; + XrdPosixAdmin *admin_i = new XrdPosixAdmin(url_i.c_str()); + + if (admin_i->Stat(buf_i)) + { + if (! S_ISREG(buf_i.st_mode)) + { + memcpy(buf, &buf_i, sizeof(struct stat)); + delete admin_i; + return 0; + } + else + { + if (verNumMax == 0) memcpy(buf, &buf_i, sizeof(struct stat)); + found = 1; + } + XrdCl::FileSystem *fs_i = new XrdCl::FileSystem(info->At( i ).GetAddress()); + + xattrvals.clear(); + st = fs_i->GetXAttr(file, xattrkeys, xattrvals, 0); + if (! xattrvals[0].value.empty()) + { + std::stringstream sstream(xattrvals[0].value); + uint64_t verNum; + sstream >> verNum; + if ( verNum > verNumMax ) + { + verNumMax = verNum; + memcpy(buf, &buf_i, sizeof(struct stat)); + buf->st_mtime = verNumMax; // assume verNum is mtime + std::stringstream sstream(xattrvals[1].value); + sstream >> buf->st_size; + } + } + delete fs_i; + } + delete admin_i; + } + if (! found) + { + errno = ENOENT; + return -1; + } + } + return 0; } @@ -1334,7 +1434,47 @@ int XrdPosixXrootd::Unlink(const char *path) // Issue the UnLink // - return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams())); + if (! getenv("XRDCL_EC")) + return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams())); + else + { + XrdCl::URL url(path); + std::string file = url.GetPath(); + XrdCl::LocationInfo *info = nullptr; + XrdCl::FileSystem fs(path); + + XrdCl::Buffer queryArgs(5), *queryResp; + queryArgs.FromString("role"); + XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, queryArgs, queryResp); + // xrootdfs call this function with individual servers. but we can only do + // fs.DeepLocate("*"...) agaist a redirector + if (!st.IsOK() || queryResp->ToString() == "server") + { + delete queryResp; + return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams())); + } + else + delete queryResp; + + st = fs.DeepLocate("*", XrdCl::OpenFlags::None, info ); + std::unique_ptr ptr( info ); + if( !st.IsOK() ) + { + errno = ENOENT; + return -1; + } + int rc = -ENOENT; + for( size_t i = 0; i < info->GetSize(); ++i ) + { + std::string url_i = "root://" + info->At(i).GetAddress() + "/" + file; + XrdPosixAdmin *admin_i = new XrdPosixAdmin(url_i.c_str()); + int x = XrdPosixMap::Result(admin_i->Xrd.Rm(admin_i->Url.GetPathWithParams())); + if (x != -ENOENT && rc != 0) + rc = x; + delete admin_i; + } + return rc; + } } /******************************************************************************/ From b75d4ff12ff1956c6521ba3c0cc84d12b5f9d6f7 Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Wed, 15 Dec 2021 23:13:07 -0800 Subject: [PATCH 2/2] Update open flag for EC in HTTP protocol --- src/XrdHttp/XrdHttpReq.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/XrdHttp/XrdHttpReq.cc b/src/XrdHttp/XrdHttpReq.cc index 115bd9c5180..99e0197255b 100644 --- a/src/XrdHttp/XrdHttpReq.cc +++ b/src/XrdHttp/XrdHttpReq.cc @@ -1367,7 +1367,10 @@ int XrdHttpReq::ProcessHTTPReq() { l = resourceplusopaque.length() + 1; xrdreq.open.dlen = htonl(l); xrdreq.open.mode = htons(kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or); - xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_delete); + if (! getenv("XRDCL_EC")) + xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_delete); + else + xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_new); if (!prot->Bridge->Run((char *) &xrdreq, (char *) resourceplusopaque.c_str(), l)) { prot->SendSimpleResp(404, NULL, NULL, (char *) "Could not run request.", 0, keepalive);