Skip to content

Commit

Permalink
Use XrdJob to schedule fsync.
Browse files Browse the repository at this point in the history
  • Loading branch information
osschar committed Feb 20, 2015
1 parent 2ec8505 commit bdb289b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 52 deletions.
3 changes: 1 addition & 2 deletions src/XrdFileCache/XrdFileCacheInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ void Info::AppendIOStat(const Stats* caches, XrdOssDF* fp)
{
clLog()->Info(XrdCl::AppMsg, "Info:::AppendIOStat()");


int flr = XrdOucSxeq::Serialize(fp->getFD(), 0);
if (flr) clLog()->Error(XrdCl::AppMsg, "AppendIOStat() lock failed \n");

Expand All @@ -145,7 +144,7 @@ void Info::AppendIOStat(const Stats* caches, XrdOssDF* fp)
as.BytesMissed = caches->m_BytesMissed;

flr = XrdOucSxeq::Release(fp->getFD());
if (flr) clLog()->Error(XrdCl::AppMsg, "AppendStat() un-lock failed \n");
if (flr) clLog()->Error(XrdCl::AppMsg, "AppenIOStat() un-lock failed \n");

long long ws = fp->Write(&as, off, sizeof(AStat));
if ( ws != sizeof(AStat)) { assert(0); }
Expand Down
139 changes: 95 additions & 44 deletions src/XrdFileCache/XrdFileCachePrefetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,47 @@

#include "XrdCl/XrdClLog.hh"
#include "XrdCl/XrdClConstants.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdCl/XrdClFile.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "Xrd/XrdScheduler.hh"

#include "XrdSfs/XrdSfsInterface.hh"
#include "XrdPosix/XrdPosixFile.hh"

#include "XrdFileCachePrefetch.hh"
#include "XrdFileCacheFactory.hh"
#include "XrdFileCache.hh"
const static int PREFETCH_MAX_ATTEMPTS = 10;
using namespace XrdFileCache;

bool POK = 0;
bool PFALSE = 0;
using namespace XrdFileCache;

namespace XrdPosixGlobals
{
extern XrdScheduler *schedP;
}

void *DiskSyncRunner(void * prefetch_void)
namespace
{
XrdFileCache::Prefetch *prefetch = static_cast<XrdFileCache::Prefetch *>(prefetch_void);
if (prefetch)
prefetch->Sync();
return NULL;
const int PREFETCH_MAX_ATTEMPTS = 10;

class DiskSyncer : public XrdJob
{
private:
Prefetch *m_prefetch;

public:
DiskSyncer(Prefetch *pref, const char *desc="") :
XrdJob(desc),
m_prefetch(pref)
{}

void DoIt()
{
m_prefetch->Sync();
}
};
}


Expand Down Expand Up @@ -76,6 +92,7 @@ Prefetch::Prefetch(XrdOucCacheIO &inputIO, std::string& disk_file_path, long lon
m_stopped(false),
m_stateCond(0), // We will explicitly lock the condition before use.
m_queueCond(0),
m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
m_in_sync(false)
{
Expand Down Expand Up @@ -133,19 +150,33 @@ Prefetch::~Prefetch()
m_ram.m_writeMutex.UnLock();

// disk sync
m_syncStatusMutex.Lock();
if (m_in_sync ) writewait = true;
m_syncStatusMutex.UnLock();
{
XrdSysMutexHelper _lck(&m_syncStatusMutex);

if (m_in_sync) writewait = true;
}

if (!writewait)
break;
}

XrdSysTimer::Wait(100);
}
clLog()->Debug(XrdCl::AppMsg, "Prefetch::~Prefetch finished with writing %s",lPath() );

if( m_non_flushed_cnt) {
clLog()->Info(XrdCl::AppMsg, "Prefetch sync unflushed %d\n", m_non_flushed_cnt);
bool do_sync = false;
{
XrdSysMutexHelper _lck(&m_syncStatusMutex);
if (m_non_flushed_cnt > 0)
{
do_sync = true;
m_in_sync = true;

clLog()->Info(XrdCl::AppMsg, "Prefetch::~Prefetch sync unflushed %d\n", m_non_flushed_cnt);
}
}
if (do_sync)
{
Sync();
}

Expand All @@ -168,6 +199,8 @@ Prefetch::~Prefetch()
delete m_infoFile;
m_infoFile = NULL;
}

delete m_syncer;
}

//______________________________________________________________________________
Expand Down Expand Up @@ -514,64 +547,82 @@ Prefetch::WriteBlockToDisk(int ramIdx, size_t size)
cnt++;

if (buffer_remaining)
{
clLog()->Warning(XrdCl::AppMsg, "Prefetch::WriteToDisk() reattempt[%d] writing missing %d for block %d %s",
cnt, buffer_remaining, fileIdx, lPath());

if (cnt > PREFETCH_MAX_ATTEMPTS )
}
if (cnt > PREFETCH_MAX_ATTEMPTS)
{
clLog()->Error(XrdCl::AppMsg, "Prefetch::WriteToDisk() write failes too manny attempts %s", lPath());
return;
}

}

// set bit fetched
clLog()->Dump(XrdCl::AppMsg, "Prefetch::WriteToDisk() success set bit for block [%d] size [%d] %s", fileIdx, size, lPath());
int pfIdx = fileIdx - m_offset/m_cfi.GetBufferSize();
m_downloadStatusMutex.Lock();
m_cfi.SetBitFetched( pfIdx);
m_cfi.SetBitFetched(pfIdx);
m_downloadStatusMutex.UnLock();


// set bit synced
m_syncStatusMutex.Lock();
if (m_in_sync) {
m_write_called_while_in_sync.push_back(pfIdx);
}
else {
m_cfi.SetBitWriteCalled(fileIdx);
bool schedule_sync = false;
{
XrdSysMutexHelper _lck(&m_syncStatusMutex);

if (m_in_sync)
{
m_writes_during_sync.push_back(pfIdx);
}
else
{
m_cfi.SetBitWriteCalled(pfIdx);
++m_non_flushed_cnt;
}

if (m_non_flushed_cnt >= 100)
{
schedule_sync = true;
m_in_sync = true;
m_non_flushed_cnt = 0;
}
}
++m_non_flushed_cnt;
m_syncStatusMutex.UnLock();
if (m_non_flushed_cnt > 100 ) {
pthread_t tid;
clLog()->Info(XrdCl::AppMsg, "Running sync from Prefetch::WriteBlockToDisk %s", lPath());
XrdSysThread::Run(&tid, DiskSyncRunner, (void *)(this), 0, "XrdFileCache Prefetcher");

if (schedule_sync)
{
XrdPosixGlobals::schedP->Schedule(m_syncer);
}
}

//______________________________________________________________________________
void Prefetch::Sync()
{
clLog()->Dump(XrdCl::AppMsg, "Prefetch sync %s", lPath());
m_syncStatusMutex.Lock();
m_in_sync = true;
m_syncStatusMutex.UnLock();
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Sync %s", lPath());

m_output->Fsync();
m_infoFile->Fsync();

m_syncStatusMutex.Lock();
m_in_sync = false;
m_cfi.WriteHeader(m_infoFile);
m_non_flushed_cnt = (int)m_write_called_while_in_sync.size();
for (std::vector<int>::iterator i = m_write_called_while_in_sync.begin(); i != m_write_called_while_in_sync.end(); ++i)
m_cfi.SetBitWriteCalled(*i);
m_write_called_while_in_sync.clear();

clLog()->Dump(XrdCl::AppMsg, "Prefetch sync left %d", m_non_flushed_cnt);
int written_while_in_sync;
{
XrdSysMutexHelper _lck(&m_syncStatusMutex);

m_syncStatusMutex.UnLock();
for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
{
m_cfi.SetBitWriteCalled(*i);
}
written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
m_writes_during_sync.clear();

m_in_sync = false;
}

clLog()->Dump(XrdCl::AppMsg, "Prefetch::Sync %d blocks written during sync.", written_while_in_sync);

m_infoFile->Fsync();
}

//______________________________________________________________________________
void Prefetch::DecRamBlockRefCount(int ramIdx)
{
Expand Down
15 changes: 9 additions & 6 deletions src/XrdFileCache/XrdFileCachePrefetch.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "XrdFileCacheInfo.hh"
#include "XrdFileCacheStats.hh"

class XrdJob;
class XrdOucIOVec;

namespace XrdCl
{
class Log;
Expand Down Expand Up @@ -179,18 +181,19 @@ namespace XrdFileCache
bool m_stopped; //!< prefetch is stopped
XrdSysCondVar m_stateCond; //!< state condition variable

XrdSysMutex m_downloadStatusMutex; //!< mutex locking access to m_cfi object
XrdSysMutex m_downloadStatusMutex; //!< mutex locking access to m_cfi object

std::deque<Task*> m_tasks_queue; //!< download queue
XrdSysCondVar m_queueCond; //!< m_tasks_queue condition variable

Stats m_stats; //!< cache statistics, used in IO detach
Stats m_stats; //!< cache statistics, used in IO detach

// fsync
XrdSysMutex m_syncStatusMutex; //!< mutex locking fsync status
std::vector<int> m_write_called_while_in_sync;
int m_non_flushed_cnt;
bool m_in_sync;
XrdSysMutex m_syncStatusMutex; //!< mutex locking fsync status
XrdJob *m_syncer;
std::vector<int> m_writes_during_sync;
int m_non_flushed_cnt;
bool m_in_sync;
};
}
#endif

0 comments on commit bdb289b

Please sign in to comment.