Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pfc: move disk sync operations out of IO::ioActive() call #498

Merged
merged 4 commits into from Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 48 additions & 4 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -61,6 +61,12 @@ void *PrefetchThread(void* ptr)
return NULL;
}

void *DyingFilesNeedSyncThread(void* ptr)
{
Cache* cache = static_cast<Cache*>(ptr);
cache->DyingFilesNeedSync();
return NULL;
}

extern "C"
{
Expand All @@ -85,9 +91,12 @@ XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger,
pthread_t tid2;
XrdSysThread::Run(&tid2, PrefetchThread, (void*)(&factory), 0, "XrdFileCache Prefetch ");


pthread_t tid;
XrdSysThread::Run(&tid, CacheDirCleanupThread, NULL, 0, "XrdFileCache CacheDirCleanup");

pthread_t tids;
XrdSysThread::Run(&tids, DyingFilesNeedSyncThread, (void*)(&factory), 0, "XrdFileCache DyingFilesNeedSyncThread");

return &factory;
}
}
Expand Down Expand Up @@ -128,7 +137,8 @@ Cache::Cache() : XrdOucCache(),
m_trace(0),
m_traceID("Manager"),
m_prefetch_condVar(0),
m_RAMblocks_used(0)
m_RAMblocks_used(0),
m_sync_condVar(0)
{
m_trace = new XrdOucTrace(&m_log);
// default log level is Warning
Expand Down Expand Up @@ -436,8 +446,6 @@ int Cache::Stat(const char *curl, struct stat &sbuff)
}

//______________________________________________________________________________


void
Cache::Prefetch()
{
Expand All @@ -460,3 +468,39 @@ Cache::Prefetch()
}
}

//______________________________________________________________________________
void
Cache::RegisterDyingFilesNeedSync(IO* io)
{
m_sync_condVar.Lock();
m_syncIOList.push_back(io);
m_sync_condVar.Signal();
m_sync_condVar.UnLock();
}

//______________________________________________________________________________
void
Cache::DyingFilesNeedSync()
{
while (true) {
m_sync_condVar.Lock();
while (m_syncIOList.empty())
{
m_sync_condVar.Wait();
}

std::vector<IO*>::iterator it = m_syncIOList.begin();
while ( it != m_syncIOList.end())
{
if ((*it)->FinalizeSyncBeforeExit() == false)
{
IO* bye = *it;
delete bye;
it = m_syncIOList.erase(it);
}
else ++it;
}
m_sync_condVar.UnLock();
XrdSysTimer::Snooze(1);
}
}
6 changes: 6 additions & 0 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -202,6 +202,9 @@ public:

XrdOucTrace* GetTrace() { return m_trace; }

void DyingFilesNeedSync();
void RegisterDyingFilesNeedSync(IO*);

private:
bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
bool ConfigXeq(char *, XrdOucStream &);
Expand Down Expand Up @@ -250,6 +253,9 @@ private:
// prefetching
typedef std::vector<File*> PrefetchList;
PrefetchList m_prefetchList;

std::vector<IO*> m_syncIOList;
XrdSysCondVar m_sync_condVar;
};

}
Expand Down
60 changes: 32 additions & 28 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -174,47 +174,51 @@ bool File::ioActive()

blockMapEmpty = m_block_map.empty();
}


return !blockMapEmpty;
}

bool File::FinalizeSyncBeforeExit()
{
// returns true if sync is required
// this method is called after corresponding IO is detached from PosixCache

if (blockMapEmpty)
bool schedule_sync = false;
{
// file is not active when block map is empty and sync is done
bool schedule_sync = false;

{
XrdSysCondVarHelper _lck(m_downloadCond);
XrdSysCondVarHelper _lck(m_downloadCond);

if (m_in_sync) return true;
if (m_in_sync) return true;

if (m_writes_during_sync.empty() && m_non_flushed_cnt == 0)
{
if (! m_detachTimeIsLogged)
{
m_cfi.WriteIOStatDetach(m_stats);
m_detachTimeIsLogged = true;
schedule_sync = true;
}
}
else
if (m_writes_during_sync.empty() && m_non_flushed_cnt == 0)
{
if (! m_detachTimeIsLogged)
{
// write leftovers
m_cfi.WriteIOStatDetach(m_stats);
m_detachTimeIsLogged = true;
schedule_sync = true;
}

if (schedule_sync)
m_in_sync = true;
}

if (schedule_sync)
{
XrdPosixGlobals::schedP->Schedule(m_syncer);
}
else
{
return false;
// write leftovers
schedule_sync = true;
}

if (schedule_sync)
m_in_sync = true;
}

if (schedule_sync)
{
XrdPosixGlobals::schedP->Schedule(m_syncer);
return true;
}
else
{
return false;
}

return true;
}

//------------------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -155,7 +155,13 @@ public:
//! Used in XrdPosixXrootd::Close()
//----------------------------------------------------------------------
bool ioActive();


//----------------------------------------------------------------------
//! \brief I. Return true if any of blocks need sync.
//! Called from IO::DyingFilesNeedSync()
//----------------------------------------------------------------------
bool FinalizeSyncBeforeExit();

//----------------------------------------------------------------------
//! Sync file cache inf o and output data with disk
//----------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -35,6 +35,8 @@ public:

virtual void RelinquishFile(File*) = 0;

virtual bool FinalizeSyncBeforeExit() = 0;

XrdOucTrace* GetTrace() {return m_cache.GetTrace(); }

XrdOucCacheIO2* GetInput();
Expand All @@ -47,6 +49,7 @@ protected:
std::string m_path;
const char* GetPath() { return m_path.c_str(); }

bool m_syncDiskAfterDetach;
private:
XrdOucCacheIO2 *m_io; //!< original data source
XrdSysMutex updMutex;
Expand Down
50 changes: 41 additions & 9 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -58,12 +58,21 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
Cache::GetInstance().AddActive(m_file);
}

//______________________________________________________________________________
IOEntireFile::~IOEntireFile()
{
// called from Detach() if no sync is needed or
// from Cache's sync thread
TRACEIO(Debug, "IOEntireFile::~IOEntireFile() ");

if (m_file) {
m_cache.Detach(m_file);
m_file = 0;
}
delete m_localStat;
}

//______________________________________________________________________________
int IOEntireFile::Fstat(struct stat &sbuff)
{
XrdCl::URL url(GetPath());
Expand All @@ -81,18 +90,22 @@ int IOEntireFile::Fstat(struct stat &sbuff)
return 0;
}

//______________________________________________________________________________
long long IOEntireFile::FSize()
{
return m_file->GetFileSize();
}

//______________________________________________________________________________
void IOEntireFile::RelinquishFile(File* f)
{
TRACEIO(Info, "IOEntireFile::RelinquishFile");
assert(m_file == f);
m_file = 0;
}

//______________________________________________________________________________

int IOEntireFile::initCachedStat(const char* path)
{
// Called indirectly from the constructor.
Expand Down Expand Up @@ -141,6 +154,7 @@ int IOEntireFile::initCachedStat(const char* path)
return res;
}

//______________________________________________________________________________
bool IOEntireFile::ioActive()
{
if ( ! m_file)
Expand All @@ -149,27 +163,45 @@ bool IOEntireFile::ioActive()
}
else
{
bool active = m_file->ioActive();
if (! active && m_file)
{
TRACEIO(Debug, "IOEntireFile::ioActive() detaching file");
m_cache.Detach(m_file);
m_file = 0;
}
return active;
return m_file->ioActive();
}
}

//______________________________________________________________________________

XrdOucCacheIO *IOEntireFile::Detach()
{
// Called from XrdPosixFile destructor

TRACEIO(Debug, "IOEntireFile::Detach() ");

XrdOucCacheIO * io = GetInput();

delete this;
if ( ! FinalizeSyncBeforeExit() )
{
delete this;
}
else
{
m_cache.RegisterDyingFilesNeedSync(this);
}

return io;
}


//______________________________________________________________________________

bool IOEntireFile::FinalizeSyncBeforeExit()
{
if (m_file)
return m_file->FinalizeSyncBeforeExit();
else
return false;
}

//______________________________________________________________________________

int IOEntireFile::Read (char *buff, long long off, int size)
{
TRACEIO(Dump, "IOEntireFile::Read() "<< this << " off: " << off << " size: " << size );
Expand Down
4 changes: 3 additions & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.hh
Expand Up @@ -81,7 +81,9 @@ public:
//! \brief Virtual method of XrdOucCacheIO.
//! Called to check if destruction needs to be done in a separate task.
virtual bool ioActive();


virtual bool FinalizeSyncBeforeExit();

virtual int Fstat(struct stat &sbuff);

virtual long long FSize();
Expand Down