Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[XCache] Improve handling of failures during opening of local data and cinfo files. #940

Merged
merged 1 commit into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 28 additions & 7 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,43 @@ Cache::Cache(XrdSysLogger *logger) :

XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
{
const char* tpfx = "Cache::Attach() ";

if (Cache::GetInstance().Decide(io))
{
TRACE(Info, "Cache::Attach() " << io->Path());
IO* cio;
TRACE(Info, tpfx << io->Path());

IO *cio;

if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
{
cio = new IOFileBlock(io, m_stats, *this);
}
else
cio = new IOEntireFile(io, m_stats, *this);
{
// TODO if overloaded, redirect !!!

IOEntireFile *ioef = new IOEntireFile(io, m_stats, *this);

TRACE_PC(Debug, const char* loc = io->Location(),
"Cache::Attach() " << io->Path() << " location: " <<
if ( ! ioef->HasFile())
{
delete ioef;
// TODO redirect instead !!!
TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
return io;
}

cio = ioef;
}

TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
((loc && loc[0] != 0) ? loc : "<deferred open>"));

return cio;
}
else
{
TRACE(Info, "Cache::Attach() decision decline " << io->Path());
TRACE(Info, tpfx << "decision decline " << io->Path());
}
return io;
}
Expand Down Expand Up @@ -335,8 +355,9 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f
filesize = st.st_size;
}

File* file = new File(path, off, filesize);
File *file = File::FileOpen(path, off, filesize);

if (file)
{
XrdSysCondVarHelper lock(&m_active_cond);

Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ bool Cache::Config(const char *config_filename, const char *parameters)
if (m_configuration.m_RamAbsAvailable == 0)
{
m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
char buff2[1024];
snprintf(buff2, sizeof(buff2), "RAM usage is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
TRACE(Warning, buff2);
char buff[1024];
snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
m_log.Say("Config info: ", buff);
}
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize);

Expand Down Expand Up @@ -354,7 +354,7 @@ bool Cache::Config(const char *config_filename, const char *parameters)
loff += snprintf(buff + loff, sizeof(buff) - loff, "%s", unameBuff);
}

m_log.Say( buff);
m_log.Say(buff);
}

m_log.Say("------ File Caching Proxy interface initialization ", retval ? "completed" : "failed");
Expand Down
57 changes: 45 additions & 12 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ File::File(const std::string& path, long long iOffset, long long iFileSize) :
m_prefetchScore(1),
m_detachTimeIsLogged(false)
{
Open();
}

File::~File()
Expand All @@ -94,6 +93,17 @@ File::~File()
TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore);
}

File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
{
File *file = new File(path, offset, fileSize);
if ( ! file->Open())
{
delete file;
file = 0;
}
return file;
}

//------------------------------------------------------------------------------

void File::BlockRemovedFromWriteQ(Block* b)
Expand Down Expand Up @@ -127,7 +137,7 @@ bool File::ioActive(IO *io)
if (mi != m_io_map.end())
{
TRACEF(Info, "ioActive for io " << io <<
", active_prefetces " << mi->second.m_active_prefetches <<
", active_prefetches " << mi->second.m_active_prefetches <<
", allow_prefetching " << mi->second.m_allow_prefetching <<
"; (block_map.size() = " << m_block_map.size() << ").");

Expand Down Expand Up @@ -273,18 +283,31 @@ void File::RemoveIO(IO *io)

bool File::Open()
{
TRACEF(Dump, "File::Open() open file for disk cache ");
TRACEF(Dump, "File::Open() open file for disk cache");

if (m_is_open)
{
TRACEF(Error, "File::Open() file is already opened.");
return true;
}

const Configuration &conf = Cache::GetInstance().RefConfiguration();

XrdOss &myOss = * Cache::GetInstance().GetOss();
const char *myUser = conf.m_username.c_str();
XrdOucEnv myEnv;
struct stat data_stat, info_stat;

std::string ifn = m_filename + Info::m_infoExtension;

bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);

// Create the data file itself.
char size_str[32]; sprintf(size_str, "%lld", m_fileSize);
myEnv.Put("oss.asize", size_str);
myEnv.Put("oss.cgroup", conf.m_data_space.c_str());

if (myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
{
TRACEF(Error, "File::Open() Create failed for data file " << m_filename << ERRNO_AND_ERRSTR);
Expand All @@ -299,12 +322,7 @@ bool File::Open()
return false;
}

// Create the info file
std::string ifn = m_filename + Info::m_infoExtension;

struct stat infoStat;
bool fileExisted = (myOss.Stat(ifn.c_str(), &infoStat) == XrdOssOK);

// Create the info file.
myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
if (myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
Expand All @@ -323,11 +341,26 @@ bool File::Open()
return false;
}

if (fileExisted && m_cfi.Read(m_infoFile, ifn))
bool initialize_info_file = true;

if (info_existed && m_cfi.Read(m_infoFile, ifn))
{
TRACEF(Debug, "Read existing info file.");
TRACEF(Debug, "Open - reading existing info file. (data_existed=" << data_existed <<
", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");

// Check if data file exists and is of reasonable size.
if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
{
initialize_info_file = false;
}
else
{
TRACEF(Warning, "Open - basic sanity checks on data file failed, resetting info file.");
m_cfi.ResetAllAccessStats();
}
}
else
if (initialize_info_file)
{
m_cfi.SetBufferSize(conf.m_bufferSize);
m_cfi.SetFileSize(m_fileSize);
Expand Down
7 changes: 7 additions & 0 deletions src/XrdFileCache/XrdFileCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,19 @@ public:
//------------------------------------------------------------------------
File(const std::string &path, long long offset, long long fileSize);

//------------------------------------------------------------------------
//! Static constructor that also does Open. Returns null ptr if Open fails.
//------------------------------------------------------------------------
static File* FileOpen(const std::string &path, long long offset, long long fileSize);

//------------------------------------------------------------------------
//! Destructor.
//------------------------------------------------------------------------
~File();

//! Handle removal of a block from Cache's write queue.
void BlockRemovedFromWriteQ(Block*);

//! Open file handle for data file and info file on local disk.
bool Open();

Expand Down
6 changes: 3 additions & 3 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
m_file(0),
m_localStat(0)
{
XrdCl::URL url(GetInput()->Path());
XrdCl::URL url(GetInput()->Path());
std::string fname = url.GetPath();
m_file = Cache::GetInstance().GetFile(fname, this);
}
Expand All @@ -55,14 +55,14 @@ IOEntireFile::~IOEntireFile()
//______________________________________________________________________________
int IOEntireFile::Fstat(struct stat &sbuff)
{
XrdCl::URL url(GetPath());
XrdCl::URL url(GetPath());
std::string name = url.GetPath();
name += Info::m_infoExtension;

int res = 0;
if( ! m_localStat)
{
res = initCachedStat(name.c_str());
res = initCachedStat(name.c_str());
if (res) return res;
}

Expand Down
5 changes: 5 additions & 0 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public:
//------------------------------------------------------------------------
~IOEntireFile();

//------------------------------------------------------------------------
//! Check if File was opened successfully.
//------------------------------------------------------------------------
bool HasFile() const { return m_file != 0; }

//---------------------------------------------------------------------
//! Pass Read request to the corresponding File object.
//!
Expand Down
38 changes: 25 additions & 13 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ XrdOucCacheIO* IOFileBlock::Detach()
XrdSysMutexHelper lock(&m_mutex);
for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
it->second->RequestSyncOfDetachStats();
m_cache.ReleaseFile(it->second, this);
if (it->second)
{
it->second->RequestSyncOfDetachStats();
m_cache.ReleaseFile(it->second, this);
}
}
}
XrdOucCacheIO *io = GetInput();
Expand Down Expand Up @@ -101,13 +104,15 @@ void IOFileBlock::CloseInfoFile()
void IOFileBlock::GetBlockSizeFromPath()
{
const static std::string tag = "hdfsbsize=";

std::string path = GetInput()->Path();
size_t pos1 = path.find(tag);
size_t t = tag.length();
if ( pos1 != path.npos)
size_t pos1 = path.find(tag);
size_t t = tag.length();

if (pos1 != path.npos)
{
pos1 += t;
size_t pos2 = path.find("&", pos1 );
size_t pos2 = path.find("&", pos1);
if (pos2 != path.npos )
{
std::string bs = path.substr(pos1, pos2 - pos1);
Expand All @@ -125,14 +130,16 @@ void IOFileBlock::GetBlockSizeFromPath()
//______________________________________________________________________________
File* IOFileBlock::newBlockFile(long long off, int blocksize)
{
// NOTE: Can return 0 if opening of a local file fails!

XrdCl::URL url(GetInput()->Path());
std::string fname = url.GetPath();

std::stringstream ss;
ss << fname;
char offExt[64];
// filename like <origpath>___<size>_<offset>
sprintf(&offExt[0],"___%lld_%lld", m_blocksize, off );
sprintf(&offExt[0], "___%lld_%lld", m_blocksize, off);
ss << &offExt[0];
fname = ss.str();

Expand Down Expand Up @@ -241,11 +248,13 @@ int IOFileBlock::initLocalStat()
bool IOFileBlock::ioActive()
{
XrdSysMutexHelper lock(&m_mutex);

bool active = false;

for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
// Need to initiate stop on all File / block objects.
if (it->second->ioActive(this))
if (it->second && it->second->ioActive(this))
{
active = true;
}
Expand Down Expand Up @@ -280,7 +289,7 @@ int IOFileBlock::Read(char *buff, long long off, int size)
for (int blockIdx = idx_first; blockIdx <= idx_last; ++blockIdx)
{
// locate block
File* fb;
File *fb;
m_mutex.Lock();
std::map<int, File*>::iterator it = m_blocks.find(blockIdx);
if (it != m_blocks.end())
Expand All @@ -298,8 +307,9 @@ int IOFileBlock::Read(char *buff, long long off, int size)
// TRACEIO(Dump, "IOFileBlock::Read() last block, change output file size to " << pbs);
}

// Note: File* can be 0 and stored as 0 if local open fails!
fb = newBlockFile(blockIdx*m_blocksize, pbs);
m_blocks.insert(std::pair<int,File*>(blockIdx, (File*) fb));
m_blocks.insert(std::make_pair(blockIdx, fb));
}
m_mutex.UnLock();

Expand All @@ -309,12 +319,12 @@ int IOFileBlock::Read(char *buff, long long off, int size)
{
if (blockIdx == idx_first)
{
readBlockSize = (blockIdx + 1) *m_blocksize - off0;
readBlockSize = (blockIdx + 1) * m_blocksize - off0;
TRACEIO(Dump, "Read partially till the end of the block");
}
else if (blockIdx == idx_last)
{
readBlockSize = (off0+size) - blockIdx*m_blocksize;
readBlockSize = (off0 + size) - blockIdx * m_blocksize;
TRACEIO(Dump, "Read partially till the end of the block %s");
}
else
Expand All @@ -325,7 +335,9 @@ int IOFileBlock::Read(char *buff, long long off, int size)

TRACEIO(Dump, "IOFileBlock::Read() block[ " << blockIdx << "] read-block-size[" << readBlockSize << "], offset[" << readBlockSize << "] off = " << off );

int retvalBlock = fb->Read(this, buff, off, readBlockSize);
int retvalBlock = (fb != 0) ?
fb->Read(this, buff, off, readBlockSize) :
GetInput()->Read(buff, off, readBlockSize);

TRACEIO(Dump, "IOFileBlock::Read() Block read returned " << retvalBlock);
if (retvalBlock == readBlockSize)
Expand Down