Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing EC enhancement #1563

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/XrdCl/XrdClEcHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,49 @@ namespace XrdCl
inline XRootDStatus LoadPlacement( const std::string &path )
{
LocationInfo *info = nullptr;
XRootDStatus st = fs.DeepLocate( path, OpenFlags::None, info );
XRootDStatus st = fs.DeepLocate( "*", OpenFlags::None, info );
std::unique_ptr<LocationInfo> ptr( info );
if( !st.IsOK() ) return st;
// The following check become meaningless
if( info->GetSize() < objcfg->nbdata )
return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );

uint64_t verNumMax = 0;
std::vector<uint64_t> verNums;
std::vector<std::string> xattrkeys;
std::vector<XrdCl::XAttr> xattrvals;
xattrkeys.push_back("xrdec.chunkver");
for( size_t i = 0; i < info->GetSize(); ++i )
{
FileSystem *fs_i = new FileSystem(info->At( i ).GetAddress());
xattrvals.clear();
st = fs_i->GetXAttr(path, xattrkeys, xattrvals, 0);
if (! xattrvals[0].value.empty())
{
std::stringstream sstream(xattrvals[0].value);
uint64_t verNum;
sstream >> verNum;
verNums.push_back(verNum);
if (verNum > verNumMax)
verNumMax = verNum;
}
else
verNums.push_back(0);
delete fs_i;
}

int n = 0;
for( size_t i = 0; i < info->GetSize(); ++i )
{
if ( verNums.at(i) != verNumMax )
continue;
else
n++;
auto &location = info->At( i );
objcfg->plgr.emplace_back( "root://" + location.GetAddress() + '/' );
}
if (n < objcfg->nbdata )
return XRootDStatus( stError, errInvalidOp, 0, "Too few data servers." );
return XRootDStatus();
}

Expand Down
1 change: 1 addition & 0 deletions src/XrdCl/XrdClPlugInManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ namespace XrdCl
#ifdef WITH_XRDEC
if( lib == "XrdEcDefault" )
{
setenv("XRDCL_EC", "True", 1);
auto itr = config.find( "nbdta" );
if( itr == config.end() )
return std::make_pair<XrdOucPinLoader*, PlugInFactory*>( nullptr, nullptr );
Expand Down
1 change: 1 addition & 0 deletions src/XrdEc/XrdEcReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ namespace XrdEc
{
// generate the file name (blknb/strpnb)
std::string fn = objcfg.GetFileName( blknb, strpnb );
fn.replace(0, fn.rfind("/")+1, "");
// if the block/stripe does not exist it means we are reading passed the end of the file
auto itr = urlmap.find( fn );
if( itr == urlmap.end() )
Expand Down
12 changes: 9 additions & 3 deletions src/XrdEc/XrdEcStrmWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ namespace XrdEc
//-----------------------------------------------------------------------
// Create the Write request
//-----------------------------------------------------------------------
XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >>
std::string basefn = fn;
basefn.replace(0,basefn.rfind("/")+1,"");
XrdCl::Pipeline p = XrdCl::AppendFile( zip, basefn, crc32c, strpsize, strpbuff ) >>
[=]( XrdCl::XRootDStatus &st ) mutable
{
//------------------------------------------------
Expand Down Expand Up @@ -237,10 +239,12 @@ namespace XrdEc
std::string fn = std::to_string( i ); // file name (URL of the data archive)
buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive)
uint32_t cksum = objcfg.digest( 0, buff.data(), buff.size() ); // digest (crc) of the buffer
lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
std::string basefn = fn;
basefn.replace(0,basefn.rfind("/")+1,"");
lfhs.emplace_back( basefn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
LFH &lfh = lfhs.back();
cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer
offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset
offset += LFH::lfhBaseSize + basefn.size() + buff.size(); // shift the offset
cdsize += cdfhs.back().cdfhSize; // update central directory size
buffs.emplace_back( std::move( buff ) ); // keep the buffer for later
}
Expand Down Expand Up @@ -294,6 +298,7 @@ namespace XrdEc
std::vector<XrdCl::Pipeline> closes;
std::vector<XrdCl::Pipeline> save_metadata;
closes.reserve( size );
std::string closeTime = std::to_string( time(NULL) );
for( size_t i = 0; i < size; ++i )
{
//-----------------------------------------------------------------------
Expand All @@ -303,6 +308,7 @@ namespace XrdEc
{
std::string size( std::to_string( GetSize() ) );
XrdCl::Pipeline p = XrdCl::SetXAttr( dataarchs[i]->GetFile(), "xrdec.filesize", size )
| XrdCl::SetXAttr( dataarchs[i]->GetFile(), "xrdec.chunkver", closeTime.c_str() )
| XrdCl::CloseArchive( *dataarchs[i] );
closes.emplace_back( std::move( p ) );
}
Expand Down
4 changes: 4 additions & 0 deletions src/XrdFfs/XrdFfsPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,11 @@ int XrdFfsPosix_statall(const char *rdrurl, const char *path, struct stat *stbuf
res = -1;
errno = ENOENT;
for (i = 0; i < nurls; i++)
{
time_t max_mtime = 0;
if (res_i[i] == 0)
{
if (stbuf_i[i].st_mtime <= max_mtime) continue;
res = 0;
errno = 0;
memcpy((void*)stbuf, (void*)(&stbuf_i[i]), sizeof(struct stat));
Expand All @@ -813,6 +816,7 @@ int XrdFfsPosix_statall(const char *rdrurl, const char *path, struct stat *stbuf
errno = ETIMEDOUT;
syslog(LOG_WARNING, "WARNING: stat(%s) failed (connection timeout)", newurls[i]);
}
}

for (i = 0; i < nurls; i++)
free(newurls[i]);
Expand Down
5 changes: 4 additions & 1 deletion src/XrdFfs/XrdFfsXrootdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,10 @@ static int xrootdfs_create(const char *path, mode_t mode, struct fuse_file_info
int res, fd;
if (!S_ISREG(mode))
return -EPERM;
res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY, true, &fd);
if (getenv("XRDCL_EC"))
res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY | O_EXCL, true, &fd);
else
res = xrootdfs_do_create(path, xrootdfs.rdr, O_CREAT | O_WRONLY, true, &fd);
if (res < 0) return res;
fi->fh = fd;
XrdFfsWcache_create(fd); // Unlike mknod and like open, prepare wcache.
Expand Down
5 changes: 4 additions & 1 deletion src/XrdHttp/XrdHttpReq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1367,7 +1367,10 @@ int XrdHttpReq::ProcessHTTPReq() {
l = resourceplusopaque.length() + 1;
xrdreq.open.dlen = htonl(l);
xrdreq.open.mode = htons(kXR_ur | kXR_uw | kXR_gw | kXR_gr | kXR_or);
xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_delete);
if (! getenv("XRDCL_EC"))
xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_delete);
else
xrdreq.open.options = htons(kXR_mkpath | kXR_open_wrto | kXR_new);

if (!prot->Bridge->Run((char *) &xrdreq, (char *) resourceplusopaque.c_str(), l)) {
prot->SendSimpleResp(404, NULL, NULL, (char *) "Could not run request.", 0, keepalive);
Expand Down
146 changes: 143 additions & 3 deletions src/XrdPosix/XrdPosixXrootd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ int XrdPosixXrootd::Stat(const char *path, struct stat *buf)

// Check if we can get the stat informatation from the cache
//
if (XrdPosixGlobals::theCache)
if (! getenv("XRDCL_EC") && XrdPosixGlobals::theCache)
{LfnPath statX("stat", path, false);
if (!statX.path) return -1;
int rc = XrdPosixGlobals::theCache->Stat(statX.path, *buf);
Expand All @@ -1157,7 +1157,107 @@ int XrdPosixXrootd::Stat(const char *path, struct stat *buf)

// Issue the stat and verify that all went well
//
if (!admin.Stat(*buf)) return -1;
if (! getenv("XRDCL_EC"))
{
if (!admin.Stat(*buf)) return -1;
}
else
{
XrdCl::URL url(path);
std::string file = url.GetPath();
XrdCl::LocationInfo *info = nullptr;
XrdCl::FileSystem fs(path);

std::vector<std::string> xattrkeys;
std::vector<XrdCl::XAttr> xattrvals;
xattrkeys.push_back("xrdec.chunkver");
xattrkeys.push_back("xrdec.filesize");

XrdCl::Buffer queryArgs(5), *queryResp;
queryArgs.FromString("role");
XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, queryArgs, queryResp);
// xrootdfs call this function with individual servers. but we can only do
// // fs.DeepLocate("*"...) agaist a redirector
if (!st.IsOK() || queryResp->ToString() == "server")
{
delete queryResp;
if (!admin.Stat(*buf))
return -1;
else
{
st = fs.GetXAttr(file, xattrkeys, xattrvals, 0);
if (! xattrvals[0].value.empty())
{
std::stringstream sstream0(xattrvals[0].value);
sstream0 >> buf->st_mtime;
std::stringstream sstream1(xattrvals[0].value);
sstream1 >> buf->st_size;
}
return 0;
}
}
else
delete queryResp;

st = fs.DeepLocate("*", XrdCl::OpenFlags::None, info );
std::unique_ptr<XrdCl::LocationInfo> ptr( info );
if( !st.IsOK() )
{
errno = ENOENT;
return -1;
}

int found = 0;
uint64_t verNumMax = 0;
struct stat buf_i;
XrdPosixConfig::initStat(&buf_i);
for( size_t i = 0; i < info->GetSize(); ++i )
{
std::string url_i = "root://" + info->At(i).GetAddress() + "/" + file;
XrdPosixAdmin *admin_i = new XrdPosixAdmin(url_i.c_str());

if (admin_i->Stat(buf_i))
{
if (! S_ISREG(buf_i.st_mode))
{
memcpy(buf, &buf_i, sizeof(struct stat));
delete admin_i;
return 0;
}
else
{
if (verNumMax == 0) memcpy(buf, &buf_i, sizeof(struct stat));
found = 1;
}
XrdCl::FileSystem *fs_i = new XrdCl::FileSystem(info->At( i ).GetAddress());

xattrvals.clear();
st = fs_i->GetXAttr(file, xattrkeys, xattrvals, 0);
if (! xattrvals[0].value.empty())
{
std::stringstream sstream(xattrvals[0].value);
uint64_t verNum;
sstream >> verNum;
if ( verNum > verNumMax )
{
verNumMax = verNum;
memcpy(buf, &buf_i, sizeof(struct stat));
buf->st_mtime = verNumMax; // assume verNum is mtime
std::stringstream sstream(xattrvals[1].value);
sstream >> buf->st_size;
}
}
delete fs_i;
}
delete admin_i;
}
if (! found)
{
errno = ENOENT;
return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -1334,7 +1434,47 @@ int XrdPosixXrootd::Unlink(const char *path)

// Issue the UnLink
//
return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams()));
if (! getenv("XRDCL_EC"))
return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams()));
else
{
XrdCl::URL url(path);
std::string file = url.GetPath();
XrdCl::LocationInfo *info = nullptr;
XrdCl::FileSystem fs(path);

XrdCl::Buffer queryArgs(5), *queryResp;
queryArgs.FromString("role");
XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, queryArgs, queryResp);
// xrootdfs call this function with individual servers. but we can only do
// fs.DeepLocate("*"...) agaist a redirector
if (!st.IsOK() || queryResp->ToString() == "server")
{
delete queryResp;
return XrdPosixMap::Result(admin.Xrd.Rm(admin.Url.GetPathWithParams()));
}
else
delete queryResp;

st = fs.DeepLocate("*", XrdCl::OpenFlags::None, info );
std::unique_ptr<XrdCl::LocationInfo> ptr( info );
if( !st.IsOK() )
{
errno = ENOENT;
return -1;
}
int rc = -ENOENT;
for( size_t i = 0; i < info->GetSize(); ++i )
{
std::string url_i = "root://" + info->At(i).GetAddress() + "/" + file;
XrdPosixAdmin *admin_i = new XrdPosixAdmin(url_i.c_str());
int x = XrdPosixMap::Result(admin_i->Xrd.Rm(admin_i->Url.GetPathWithParams()));
if (x != -ENOENT && rc != 0)
rc = x;
delete admin_i;
}
return rc;
}
}

/******************************************************************************/
Expand Down