Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review and correct error handling in POSIX and XCache, implement XCache::Unlink() #997

Merged
merged 1 commit into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
192 changes: 159 additions & 33 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool Cache::Decide(XrdOucCacheIO* io)
}

Cache::Cache(XrdSysLogger *logger) :
XrdOucCache(),
XrdOucCache2(),
m_log(logger, "XrdFileCache_"),
m_trace(new XrdSysTrace("XrdFileCache", logger)),
m_traceID("Manager"),
Expand Down Expand Up @@ -193,14 +193,13 @@ XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
}
else
{
// TODO if overloaded, redirect !!!

IOEntireFile *ioef = new IOEntireFile(io, m_stats, *this);

if ( ! ioef->HasFile())
{
delete ioef;
// TODO redirect instead !!!
// TODO - redirect instead. But this is kind of an awkward place for it.
// errno is set during IOEntireFile construction.
TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
return io;
}
Expand Down Expand Up @@ -245,6 +244,8 @@ void Cache::AddWriteTask(Block* b, bool fromRead)

void Cache::RemoveWriteQEntriesFor(File *iFile)
{
std::list<Block*> removed_blocks;

m_writeQ.condVar.Lock();
std::list<Block*>::iterator i = m_writeQ.queue.begin();
while (i != m_writeQ.queue.end())
Expand All @@ -253,7 +254,7 @@ void Cache::RemoveWriteQEntriesFor(File *iFile)
{
TRACE(Dump, "Cache::Remove entries for " << (void*)(*i) << " path " << iFile->lPath());
std::list<Block*>::iterator j = i++;
iFile->BlockRemovedFromWriteQ(*j);
removed_blocks.push_back(*j);
m_writeQ.queue.erase(j);
--m_writeQ.size;
}
Expand All @@ -263,6 +264,8 @@ void Cache::RemoveWriteQEntriesFor(File *iFile)
}
}
m_writeQ.condVar.UnLock();

iFile->BlocksRemovedFromWriteQ(removed_blocks);
}


Expand Down Expand Up @@ -331,19 +334,21 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f
// Called from virtual IO::Attach

TRACE(Debug, "Cache::GetFile " << path << ", io " << io);


ActiveMap_i it;

{
XrdSysCondVarHelper lock(&m_active_cond);

while (true)
{
ActiveMap_i it = m_active.find(path);
it = m_active.find(path);

// File is not open or being opened. Mark it as being opened and
// proceed to opening it outside of while loop.
if (it == m_active.end())
{
m_active[path] = 0;
it = m_active.insert(std::make_pair(path, (File*) 0)).first;
break;
}

Expand All @@ -366,24 +371,38 @@ File* Cache::GetFile(const std::string& path, IO* io, long long off, long long f
{
struct stat st;
int res = io->Fstat(st);
if (res) {
if (res < 0) {
errno = res;
TRACE(Error, "Cache::GetFile, could not get valid stat");
return 0;
} else if (res > 0) {
errno = ENOTSUP;
TRACE(Error, "Cache::GetFile, stat returned positive value, this should NOT happen here");
} else {
filesize = st.st_size;
}

filesize = st.st_size;
}

File *file = File::FileOpen(path, off, filesize);
File *file = 0;

if (filesize > 0)
{
file = File::FileOpen(path, off, filesize);
}

if (file)
{
XrdSysCondVarHelper lock(&m_active_cond);

inc_ref_cnt(file, false, true);
m_active[file->GetLocalPath()] = file;
if (file)
{
inc_ref_cnt(file, false, true);
it->second = file;

file->AddIO(io);
file->AddIO(io);
}
else
{
m_active.erase(it);
}

m_active_cond.Broadcast();
}
Expand Down Expand Up @@ -498,36 +517,63 @@ void Cache::dec_ref_cnt(File* f, bool high_debug)
// Called from ReleaseFile() or DiskSync callback.

int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
int cnt;

{
XrdSysCondVarHelper lock(&m_active_cond);

m_active_cond.Lock();
int cnt = f->get_ref_cnt();
m_active_cond.UnLock();
cnt = f->get_ref_cnt();

if (f->is_in_emergency_shutdown())
{
// In this case file has been already removed from m_active map and
// does not need to be synced.

if (cnt == 1)
{
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
<< " -- deleting File object without further ado");
delete f;
}
else
{
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
<< " -- waiting");
}

return;
}
}

TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);

if (cnt == 1)
{
if (f->FinalizeSyncBeforeExit())
{
// Note, here we "reuse" the existing reference count for the
// final sync.

TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
schedule_file_sync(f, true, true);
return;
}
}

m_active_cond.Lock();
cnt = f->dec_ref_cnt();
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
if (cnt == 0)
{
ActiveMap_i it = m_active.find(f->GetLocalPath());
m_active.erase(it);
delete f;
XrdSysCondVarHelper lock(&m_active_cond);

cnt = f->dec_ref_cnt();
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
if (cnt == 0)
{
ActiveMap_i it = m_active.find(f->GetLocalPath());
m_active.erase(it);
delete f;
}
}
m_active_cond.UnLock();
}


bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
{
XrdSysCondVarHelper lock(&m_active_cond);
Expand Down Expand Up @@ -646,7 +692,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen, LFP_Reason why)

XrdCl::URL url(curl);
std::string f_name = url.GetPath();
std::string i_name = f_name + ".cinfo";
std::string i_name = f_name + Info::m_infoExtension;

if (why == ForPath)
{
Expand Down Expand Up @@ -740,7 +786,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode)
{
XrdCl::URL url(curl);
std::string f_name = url.GetPath();
std::string i_name = f_name + ".cinfo";
std::string i_name = f_name + Info::m_infoExtension;

// Do not allow write access.
if (oflags & (O_WRONLY | O_RDWR))
Expand Down Expand Up @@ -785,7 +831,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode)
}

//______________________________________________________________________________
// virtual method of XrdOucCache2::Stat()
// virtual method of XrdOucCache2.
//!
//! @return <0 - Stat failed, value is -errno.
//! =0 - Stat succeeded, sbuff holds stat information.
Expand All @@ -796,7 +842,7 @@ int Cache::Stat(const char *curl, struct stat &sbuff)
{
XrdCl::URL url(curl);
std::string f_name = url.GetPath();
std::string i_name = f_name + ".cinfo";
std::string i_name = f_name + Info::m_infoExtension;

{
XrdSysCondVarHelper lock(&m_active_cond);
Expand Down Expand Up @@ -832,3 +878,83 @@ int Cache::Stat(const char *curl, struct stat &sbuff)

return 1;
}

//______________________________________________________________________________
// virtual method of XrdOucCache.
//!
//! @return <0 - Stat failed, value is -errno.
//! =0 - Stat succeeded, sbuff holds stat information.
//------------------------------------------------------------------------------

int Cache::Unlink(const char *curl)
{
XrdCl::URL url(curl);
std::string f_name = url.GetPath();

// printf("Cache::Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());

return UnlinkCommon(f_name, false);
}


int Cache::UnlinkUnlessOpen(const std::string& f_name)
{
return UnlinkCommon(f_name, true);
}

int Cache::UnlinkCommon(const std::string& f_name, bool fail_if_open)
{
ActiveMap_i it;
File *file = 0;
{
XrdSysCondVarHelper lock(&m_active_cond);

it = m_active.find(f_name);

if (it != m_active.end())
{
if (fail_if_open)
{
TRACE(Info, "Cache::UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
return -EBUSY;
}

// Null File* in m_active map means an operation is ongoing, probably
// Attach() with possible File::Open(). Ask for retry.
if (it->second == 0)
{
TRACE(Info, "Cache::UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
return -EAGAIN;
}

file = it->second;
file->initiate_emergency_shutdown();
it->second = 0;
}
else
{
it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
}
}

if (file)
{
RemoveWriteQEntriesFor(file);
}

std::string i_name = f_name + Info::m_infoExtension;

// Unlink file & cinfo
int f_ret = m_output_fs->Unlink(f_name.c_str());
int i_ret = m_output_fs->Unlink(i_name.c_str());

TRACE(Debug, "Cache::UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);

{
XrdSysCondVarHelper lock(&m_active_cond);

m_active.erase(it);
}

return std::min(f_ret, i_ret);
}
16 changes: 13 additions & 3 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct Configuration
m_wqueue_threads(4),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024),
m_flushCnt(100)
m_flushCnt(2000)
{}

bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
Expand Down Expand Up @@ -119,7 +119,7 @@ struct TmpConfiguration

TmpConfiguration() :
m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
m_flushRaw("100")
m_flushRaw("")
{}
};

Expand Down Expand Up @@ -160,9 +160,12 @@ public:
// Virtual function of XrdOucCache2. Used for deferred open.
virtual int Prepare(const char *url, int oflags, mode_t mode);

// virtual function of XrdOucCache2::Stat()
// virtual function of XrdOucCache2.
virtual int Stat(const char *url, struct stat &sbuff);

// virtual function of XrdOucCache.
virtual int Unlink(const char *url);

//--------------------------------------------------------------------
//! \brief Makes decision if the original XrdOucCacheIO should be cached.
//!
Expand Down Expand Up @@ -207,6 +210,11 @@ public:
//---------------------------------------------------------------------
void Purge();

//---------------------------------------------------------------------
//! Remove file from cache unless it is currently open.
//---------------------------------------------------------------------
int UnlinkUnlessOpen(const std::string& f_name);

//---------------------------------------------------------------------
//! Add downloaded block in write queue.
//---------------------------------------------------------------------
Expand Down Expand Up @@ -259,6 +267,8 @@ private:

bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);

int UnlinkCommon(const std::string& f_name, bool fail_if_open);

static Cache *m_factory; //!< this object
static XrdScheduler *schedP;

Expand Down