Skip to content

Commit

Permalink
Merge pull request #374 from alja/stat-defer
Browse files Browse the repository at this point in the history
pfc-V2: Fixes in reusing File object during delayed destroy
  • Loading branch information
abh3 committed Jun 10, 2016
2 parents 8350c5e + 5259757 commit afb5dcd
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 175 deletions.
2 changes: 1 addition & 1 deletion src/XrdFileCache/README
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pfc.osslib <lpath> [<params>] path to alternative plign for output file system

pfc.decisionlib <lpath> [<prams>] path to decision library and plugin parameters

pfc.trace <none|error|warning|info|debug|dump> default level is none, xrootd option -d sets debug level
pfc.trace <none|error|warning|info|debug|dump> default level is warning, xrootd option -d sets debug level

Examples

Expand Down
18 changes: 12 additions & 6 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ Cache::Cache() : XrdOucCache(),
m_RAMblocks_used(0)
{
m_trace = new XrdOucTrace(&m_log);
// default log level is Warning
m_trace->What = 2;
}

//______________________________________________________________________________
Expand Down Expand Up @@ -181,9 +183,12 @@ void Cache::Detach(XrdOucCacheIO* io)
while ( it != m_active.end() )
{
if (it->io == io) {
it->io->RelinquishFile(it->file);
delete it->file;
if (it->file) {
it->io->RelinquishFile(it->file);
delete it->file;
}
m_active.erase(it);
break;
}
else
++it;
Expand Down Expand Up @@ -302,9 +307,10 @@ File* Cache::GetFileWithLocalPath(std::string path, IO* iIo)
{
if (!strcmp(path.c_str(), it->file->lPath()))
{
it->io->RelinquishFile(it->file);
it->io = iIo;
return it->file;
File *ff = it->file;
it->io->RelinquishFile(ff);
it->file = 0;
return ff;
}
}
return 0;
Expand Down Expand Up @@ -371,7 +377,7 @@ Cache::GetNextFileToPrefetch()
size_t l = m_prefetchList.size();
int idx = rand() % l;
File* f = m_prefetchList[idx];
f->MarkPrefetch();

m_prefetch_condVar.UnLock();
return f;
}
Expand Down
225 changes: 103 additions & 122 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m
m_temp_filename(disk_file_path),
m_offset(iOffset),
m_fileSize(iFileSize),
m_stopping(false),
m_stateCond(0), // We will explicitly lock the condition before use.
m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
Expand All @@ -89,7 +88,6 @@ m_prefetchState(kOn),
m_prefetchReadCnt(0),
m_prefetchHitCnt(0),
m_prefetchScore(1),
m_prefetchCurrentCnt(0),
m_traceID("File")
{
Open();
Expand Down Expand Up @@ -117,6 +115,9 @@ File::~File()
AppendIOStatToFileInfo();
m_infoFile->Fsync();

delete m_syncer;
m_syncer = NULL;

if (m_output)
{
m_output->Close();
Expand All @@ -134,63 +135,53 @@ File::~File()
TRACEF(Debug, "File::~File() ended, prefetch score = " << m_prefetchScore);
}

bool File::InitiateClose()
bool File::ioActive()
{
// Retruns true if delay is needed

TRACEF(Debug, "File::Initiate close start");

m_stateCond.Lock();
if (!m_stopping) {
m_prefetchState = kCanceled;
cache()->DeRegisterPrefetchFile(this);
m_stopping = true;
if (m_prefetchState != kStopped) {
m_prefetchState = kStopped;
cache()->DeRegisterPrefetchFile(this);
}
m_stateCond.UnLock();

m_stateCond.Lock();
bool isPrefetching = (m_prefetchCurrentCnt > 0);
m_stateCond.UnLock();

if (isPrefetching == false)
{
m_downloadCond.Lock();

/*
// dump print
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it) {
Block* b = it->second;
TRACEF(Dump, "File::InitiateClose() block idx = " << b->m_offset/m_cfi.GetBufferSize() << " prefetch = " << b->preferch << " refcnt " << b->refcnt);

// remove failed blocks and check if map is empty
m_downloadCond.Lock();
/*
// high debug print
for (BlockMap_i it = m_block_map.begin(); it != m_block_map.end(); ++it) {
Block* b = it->second;
TRACEF(Dump, "File::InitiateClose() block idx = " << b->m_offset/m_cfi.GetBufferSize() << " prefetch = " << b->preferch << " refcnt " << b->refcnt);
}
*/
BlockMap_i itr = m_block_map.begin();
while (itr != m_block_map.end()) {
if (itr->second->is_failed() && itr->second->m_refcnt == 1) {
BlockMap_i toErase = itr;
++itr;
TRACEF(Debug, "Remove failed block " << itr->second->m_offset/m_cfi.GetBufferSize());
free_block(toErase->second);
}
*/

// remove failed blocks
BlockMap_i itr = m_block_map.begin();
while (itr != m_block_map.end()) {
if (itr->second->is_failed() && itr->second->m_refcnt == 1) {
BlockMap_i toErase = itr;
++itr;
TRACEF(Debug, "Remove failed block " << itr->second->m_offset/m_cfi.GetBufferSize());
free_block(toErase->second);
}
else {
++itr;
}
else {
++itr;
}
}

bool blockMapEmpty = m_block_map.empty();
m_downloadCond.UnLock();
bool blockMapEmpty = m_block_map.empty();
m_downloadCond.UnLock();

if ( blockMapEmpty)
{
// file is not active when block map is empty and sync is done
XrdSysMutexHelper _lck(&m_syncStatusMutex);
if (m_in_sync == false) {
delete m_syncer;
m_syncer = NULL;
return false;
}
if ( blockMapEmpty)
{
// file is not active when block map is empty and sync is done
XrdSysMutexHelper _lck(&m_syncStatusMutex);
if (m_in_sync == false) {
return false;
}
}

Expand All @@ -199,7 +190,13 @@ bool File::InitiateClose()

//______________________________________________________________________________


void File::WakeUp()
{
// called if this object is recycled by other IO
m_stateCond.Lock();
if (m_prefetchState != kComplete) m_prefetchState = kOn;
m_stateCond.UnLock();
}

//==============================================================================

Expand Down Expand Up @@ -231,10 +228,15 @@ bool File::Open()

// Create the info file
std::string ifn = m_temp_filename + Info::m_infoExtension;

struct stat infoStat;
bool fileExisted = (Cache::GetInstance().GetOss()->Stat(ifn.c_str(), &infoStat) == XrdOssOK);

m_output_fs.Create(Cache::GetInstance().RefConfiguration().m_username.c_str(), ifn.c_str(), 0600, myEnv, XRDOSS_mkpath);
m_infoFile = m_output_fs.newFile(Cache::GetInstance().RefConfiguration().m_username.c_str());
if (m_infoFile)
{
if (fileExisted) assert(infoStat.st_size > 0);
int res = m_infoFile->Open(ifn.c_str(), O_RDWR, 0600, myEnv);
if (res < 0)
{
Expand All @@ -243,28 +245,34 @@ bool File::Open()
m_infoFile = 0;
return false;
}
else {
if (fileExisted)
{
int res = m_cfi.Read(m_infoFile);
TRACEF(Debug, "Reading existing info file bytes = " << res);
m_downloadCond.Lock();
// this method is called from constructor, no need to lock downloadStaus
bool complete = m_cfi.IsComplete();
if (complete) m_prefetchState = kComplete;
m_downloadCond.UnLock();
}
else {
m_fileSize = m_fileSize;
int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1;
TRACEF(Debug, "Creating new file info, data size = " << m_fileSize << " num blocks = " << ss);
m_cfi.SetBufferSize(Cache::GetInstance().RefConfiguration().m_bufferSize);
m_cfi.SetFileSize(m_fileSize);
m_cfi.WriteHeader(m_infoFile);
m_infoFile->Fsync();
}
}
}
else
{
// this should be a rare case wher FD can't be created
return false;
}

if (m_cfi.Read(m_infoFile) <= 0)
{
m_fileSize = m_fileSize;
int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1;
TRACEF(Debug, "Creating new file info, data size = " << m_fileSize << " num blocks = " << ss);
m_cfi.SetBufferSize(Cache::GetInstance().RefConfiguration().m_bufferSize);
m_cfi.SetFileSize(m_fileSize);
m_cfi.WriteHeader(m_infoFile);
m_infoFile->Fsync();
}
else
{
TRACEF(Debug, "Successfully opened existing info file");
}

cache()->RegisterPrefetchFile(this);
if (m_prefetchState != kComplete) cache()->RegisterPrefetchFile(this);
return true;
}

Expand Down Expand Up @@ -808,17 +816,9 @@ void File::ProcessBlockResponse(Block* b, int res)
if (res >= 0)
{
b->m_downloaded = true;
TRACEF(Dump, "File::ProcessBlockResponse " << (int)(b->m_offset/BufferSize()) << " finished " << b->is_finished());
if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks
TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize()));
inc_ref_count(b);
cache()->AddWriteTask(b, true);
}
else {
// there is no refcount +/- to remove dropped prefetched blocks on destruction
if (b->m_prefetch && (b->m_refcnt == 0))
free_block(b);
}
TRACEF(Dump, "File::ProcessBlockResponse inc_ref_count " << (int)(b->m_offset/BufferSize()));
inc_ref_count(b);
cache()->AddWriteTask(b, true);
}
else
{
Expand Down Expand Up @@ -879,43 +879,42 @@ void File::AppendIOStatToFileInfo()
//______________________________________________________________________________
void File::Prefetch()
{
if (m_prefetchState == kOn)
{
TRACEF(Dump, "File::Prefetch enter to check download status");
XrdSysCondVarHelper _lck(m_stateCond);
if (m_prefetchState != kOn)
return;
}

// check index not on disk and not in RAM
TRACEF(Dump, "File::Prefetch enter to check download status");
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
XrdSysCondVarHelper _lck(m_downloadCond);
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch enter to check download status BEGIN %s \n", lPath());

// check index not on disk and not in RAM
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
TRACEF(Dump, "File::Prefetch take block " << f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
if (!m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
TRACEF(Dump, "File::Prefetch take block " << f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
}
if (!found) {
TRACEF(Dump, "File::Prefetch no free block found ");
m_cfi.CheckComplete();
// it is possible all missing blocks are in map but downlaoded status is still not complete
// assert (m_cfi.IsComplete());
// remove block from map
cache()->DeRegisterPrefetchFile(this);
}
}

UnMarkPrefetch();

if (!found) {
TRACEF(Dump, "File::Prefetch no free block found ");
m_stateCond.Lock();
m_prefetchState = kComplete;
m_stateCond.UnLock();
cache()->DeRegisterPrefetchFile(this);
}
}


Expand Down Expand Up @@ -945,24 +944,6 @@ float File::GetPrefetchScore() const
return m_prefetchScore;
}

//______________________________________________________________________________
void File::MarkPrefetch()
{
m_stateCond.Lock();
m_prefetchCurrentCnt++;
m_stateCond.UnLock();

}

//______________________________________________________________________________
void File::UnMarkPrefetch()
{
m_stateCond.Lock();
m_prefetchCurrentCnt--;
m_stateCond.UnLock();
}


XrdOucTrace* File::GetTrace()
{
return Cache::GetInstance().GetTrace();
Expand Down
Loading

0 comments on commit afb5dcd

Please sign in to comment.