Skip to content

Commit

Permalink
FileBlockMode: recycle File object during delayed destruction.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 162763f commit 9122bc3
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 40 deletions.
46 changes: 28 additions & 18 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -169,19 +169,24 @@ int Cache::isAttached()
void Cache::Detach(XrdOucCacheIO* io)
{
clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path());
std::map<std::string, DiskNetIO>::iterator it = m_active.begin();
while (it != m_active.end() )

// Cache owns File objects
XrdSysMutexHelper lock(&m_active_mutex);
std::vector<DiskNetIO>::iterator it = m_active.begin();
while ( it != m_active.end() )
{
if (it->second.io == io) {
m_active.erase(it++);
if (it->io == io) {
it->io->RelinquishFile(it->file);
delete it->file;
m_active.erase(it);
}
else {
else
++it;
}
}

delete io;
}

//______________________________________________________________________________
bool
Cache::HaveFreeWritingSlots()
Expand Down Expand Up @@ -276,26 +281,31 @@ Cache::RAMBlockReleased()
m_RAMblocks_used--;
}

void
Cache::AddActive(IO* io, File* file)
{
XrdSysMutexHelper lock(&m_active_mutex);
m_active.push_back(DiskNetIO(io, file));
}

//==============================================================================
//======================= File relinquish at process of dying ===================
//======================================================================
File* Cache::GetFileForLocalPath(std::string path, IO* io)
File* Cache::GetFileWithLocalPath(std::string path, IO* iIo)
{
typedef std::map<std::string, DiskNetIO> ActiveMap_t;
ActiveMap_t::iterator it = m_active.find(path);
if (it == m_active.end())
XrdSysMutexHelper lock(&m_active_mutex);
for ( std::vector<DiskNetIO>::iterator it = m_active.begin(); it != m_active.end(); ++it)
{
return 0;
}
else {
File* file = it->second.file;
it->second.io->RelinquishFile(file);
return file;
if (!strcmp(path.c_str(), it->file->lPath()))
{
it->io->RelinquishFile(it->file);
it->io = iIo;
return it->file;
}
}
return 0;
}



//==============================================================================
//======================= PREFETCH ===================================
//==============================================================================
Expand Down
8 changes: 6 additions & 2 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -186,7 +186,9 @@ namespace XrdFileCache

XrdSysError& GetSysError() { return m_log; }

File* GetFileForLocalPath(std::string, IO*);
File* GetFileWithLocalPath(std::string, IO* io);

void AddActive(IO*, File*);

private:
bool ConfigParameters(std::string, XrdOucStream&);
Expand Down Expand Up @@ -224,11 +226,13 @@ namespace XrdFileCache

struct DiskNetIO
{
DiskNetIO(IO* iIO, File* iFile): io(iIO), file(iFile){}
IO* io;
File* file;
};

std::map<std::string, DiskNetIO> m_active;
std::vector<DiskNetIO> m_active;
XrdSysMutex m_active_mutex;

// prefetching
typedef std::vector<File*> PrefetchList;
Expand Down
2 changes: 2 additions & 0 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -140,6 +140,7 @@ namespace XrdFileCache
float m_prefetchScore; //cached
int m_prefetchCurrentCnt;


public:
//------------------------------------------------------------------------
//! Constructor.
Expand Down Expand Up @@ -189,6 +190,7 @@ namespace XrdFileCache
//! Log path
const char* lPath() const;

std::string GetLocalPath();
private:
bool overlap(int blk, // block to query
long long blk_size, //
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -34,7 +34,7 @@ namespace XrdFileCache

virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; }

virtual void RelinquishFile(File*) {}
virtual void RelinquishFile(File*) = 0;

protected:
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }
Expand Down
10 changes: 2 additions & 8 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -44,19 +44,19 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
XrdCl::URL url(m_io->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();

if (!Cache::GetInstance().GetFileForLocalPath(fname, this))
if (!(m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
{
struct stat st;
Fstat(st);
m_file = new File(io, fname, 0, st.st_size);
Cache::GetInstance().AddActive(this, m_file);
}
}



IOEntireFile::~IOEntireFile()
{

delete m_localStat;
}

Expand Down Expand Up @@ -119,12 +119,6 @@ XrdOucCacheIO *IOEntireFile::Detach()
{
XrdOucCacheIO * io = m_io;

if (m_file) {
m_statsGlobal.Add(m_file->GetStats());
delete m_file;
m_file = 0;
}

// This will delete us!
m_cache.Detach(this);
return io;
Expand Down
43 changes: 33 additions & 10 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Expand Up @@ -50,18 +50,13 @@ XrdOucCacheIO* IOFileBlock::Detach()
for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
m_statsGlobal.Add(it->second->GetStats());
delete it->second;
}

m_cache.Detach(this); // This will delete us!

return io;
}

int IOFileBlock::Fstat(struct stat &sbuff)
{
return m_io->Fstat(sbuff);
}
//______________________________________________________________________________
void IOFileBlock::GetBlockSizeFromPath()
{
Expand Down Expand Up @@ -101,21 +96,49 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize)
fname = ss.str();

clLog()->Debug(XrdCl::AppMsg, "FileBlock::FileBlock(), create XrdFileCacheFile. %s", m_io->Path());
File* prefetch = new File(m_io, fname, off, blocksize);

File* file;
if (!(file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
{
file = new File(m_io, fname, off, m_io->FSize());
Cache::GetInstance().AddActive(this, file);
}

return file;
}

return prefetch;
//______________________________________________________________________________
void IOFileBlock::RelinquishFile(File* f)
{
// called from Cache::Detach() or Cache::GetFileWithLocalPath()
// the object is in process of dying

XrdSysMutexHelper lock(&m_mutex);
for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
if (it->second == f)
{
m_blocks.erase(it++);
break;
}
else
{
++it;
}
}
}

//______________________________________________________________________________
bool IOFileBlock::ioActive()
{
bool res = false;
XrdSysMutexHelper lock(&m_mutex);

for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) {
if (it->second->InitiateClose())
res = true;
return true;
}

return res;
return false;
}

//______________________________________________________________________________
Expand Down
3 changes: 2 additions & 1 deletion src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Expand Up @@ -64,7 +64,8 @@ namespace XrdFileCache
//! Called to check if destruction needs to be done in a separate task.
virtual bool ioActive();

virtual int Fstat(struct stat &sbuff);
virtual void RelinquishFile(File*);

private:
long long m_blocksize; //!< size of file-block
std::map<int, File*> m_blocks; //!< map of created blocks
Expand Down

0 comments on commit 9122bc3

Please sign in to comment.