Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pfc-V2: Recycle File object during delayed destruction #350

Merged
merged 15 commits into from
Apr 15, 2016
Merged
85 changes: 57 additions & 28 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ int Cache::isAttached()
void Cache::Detach(XrdOucCacheIO* io)
{
clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path());
std::map<std::string, DiskNetIO>::iterator it = m_active.begin();
while (it != m_active.end() )
{
if (it->second.io == io) {
m_active.erase(it++);
}
else {
++it;
}
}

delete io;
}
Expand All @@ -176,11 +186,11 @@ bool
Cache::HaveFreeWritingSlots()
{
const static size_t maxWriteWaits=100000;
if ( s_writeQ.size < maxWriteWaits) {
if ( m_writeQ.size < maxWriteWaits) {
return true;
}
else {
XrdCl::DefaultEnv::GetLog()->Info(XrdCl::AppMsg, "Cache::HaveFreeWritingSlots() negative", s_writeQ.size);
XrdCl::DefaultEnv::GetLog()->Info(XrdCl::AppMsg, "Cache::HaveFreeWritingSlots() negative", m_writeQ.size);
return false;
}
}
Expand All @@ -189,38 +199,38 @@ void
Cache::AddWriteTask(Block* b, bool fromRead)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::AddWriteTask() bOff=%ld", b->m_offset);
s_writeQ.condVar.Lock();
m_writeQ.condVar.Lock();
if (fromRead)
s_writeQ.queue.push_back(b);
m_writeQ.queue.push_back(b);
else
s_writeQ.queue.push_front(b); // AMT should this not be the opposite
s_writeQ.size++;
s_writeQ.condVar.Signal();
s_writeQ.condVar.UnLock();
m_writeQ.queue.push_front(b); // AMT should this not be the opposite
m_writeQ.size++;
m_writeQ.condVar.Signal();
m_writeQ.condVar.UnLock();
}

//______________________________________________________________________________
void Cache::RemoveWriteQEntriesFor(File *iFile)
{
s_writeQ.condVar.Lock();
std::list<Block*>::iterator i = s_writeQ.queue.begin();
while (i != s_writeQ.queue.end())
m_writeQ.condVar.Lock();
std::list<Block*>::iterator i = m_writeQ.queue.begin();
while (i != m_writeQ.queue.end())
{
if ((*i)->m_file == iFile)
{

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Remove entries for %p path %s", (void*)(*i), iFile->lPath());
std::list<Block*>::iterator j = i++;
iFile->BlockRemovedFromWriteQ(*j);
s_writeQ.queue.erase(j);
--s_writeQ.size;
m_writeQ.queue.erase(j);
--m_writeQ.size;
}
else
{
++i;
}
}
s_writeQ.condVar.UnLock();
m_writeQ.condVar.UnLock();
}

//______________________________________________________________________________
Expand All @@ -229,16 +239,16 @@ Cache::ProcessWriteTasks()
{
while (true)
{
s_writeQ.condVar.Lock();
while (s_writeQ.queue.empty())
m_writeQ.condVar.Lock();
while (m_writeQ.queue.empty())
{
s_writeQ.condVar.Wait();
m_writeQ.condVar.Wait();
}
Block* block = s_writeQ.queue.front(); // AMT should not be back ???
s_writeQ.queue.pop_front();
s_writeQ.size--;
Block* block = m_writeQ.queue.front(); // AMT should not be back ???
m_writeQ.queue.pop_front();
m_writeQ.size--;
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::ProcessWriteTasks for %p path %s", (void*)(block), block->m_file->lPath());
s_writeQ.condVar.UnLock();
m_writeQ.condVar.UnLock();

block->m_file->WriteBlockToDisk(block);
}
Expand All @@ -265,6 +275,25 @@ Cache::RAMBlockReleased()
m_RAMblocks_used--;
}

//==============================================================================
//======================= File relinquish at process of dying ===================
//======================================================================
File* Cache::GetFileForLocalPath(std::string path, IO* io)
{
typedef std::map<std::string, DiskNetIO> ActiveMap_t;
ActiveMap_t::iterator it = m_active.find(path);
if (it == m_active.end())
{
return 0;
}
else {
File* file = it->second.file;
it->second.io->RelinquishFile(file);
return file;
}
}



//==============================================================================
//======================= PREFETCH ===================================
Expand All @@ -291,7 +320,7 @@ Cache::RegisterPrefetchFile(File* file)

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file BEGIN");
m_prefetch_condVar.Lock();
m_files.push_back(file);
m_prefetchList.push_back(file);
m_prefetch_condVar.Signal();
m_prefetch_condVar.UnLock();
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file End");
Expand All @@ -306,9 +335,9 @@ Cache::DeRegisterPrefetchFile(File* file)
// called from last line File::InitiateClose()

m_prefetch_condVar.Lock();
for (FileList::iterator it = m_files.begin(); it != m_files.end(); ++it) {
for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it) {
if (*it == file) {
m_files.erase(it);
m_prefetchList.erase(it);
break;
}
}
Expand All @@ -321,15 +350,15 @@ File*
Cache::GetNextFileToPrefetch()
{
m_prefetch_condVar.Lock();
if (m_files.empty()) {
if (m_prefetchList.empty()) {
m_prefetch_condVar.Wait();
}

// std::sort(m_files.begin(), m_files.end(), myobject);
// std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);

size_t l = m_files.size();
size_t l = m_prefetchList.size();
int idx = rand() % l;
File* f = m_files[idx];
File* f = m_prefetchList[idx];
f->MarkPrefetch();
m_prefetch_condVar.UnLock();
return f;
Expand Down
26 changes: 18 additions & 8 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ namespace XrdFileCache
m_diskUsageLWM(-1),
m_diskUsageHWM(-1),
m_bufferSize(1024*1024),
m_NRamBuffers(8000),
m_prefetch(false),
m_RamAbsAvailable(8*1024*1024),
m_NRamBuffers(-1),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024) {}

Expand All @@ -65,7 +65,8 @@ namespace XrdFileCache
long long m_diskUsageHWM; //!< cache purge high water mark

long long m_bufferSize; //!< prefetch buffer size, default 1MB
int m_NRamBuffers; //!< number of total in-memory cache blocks
long long m_RamAbsAvailable; //!< available from configuration
int m_NRamBuffers; //!< number of total in-memory cache blocks, cached
bool m_prefetch; //!< prefetch enable state
size_t m_prefetch_max_blocks;//!< maximum number of blocks to prefetch per file

Expand Down Expand Up @@ -185,13 +186,14 @@ namespace XrdFileCache
XrdOss* GetOss() const { return m_output_fs; }

XrdSysError& GetSysError() { return m_log; }


File* GetFileForLocalPath(std::string, IO*);

private:
bool ConfigParameters(std::string, XrdOucStream&);
bool ConfigXeq(char *, XrdOucStream &);
bool xdlib(XrdOucStream &);
static Cache *m_factory; //!< this object
static Cache *m_factory; //!< this object

XrdSysError m_log; //!< XrdFileCache namespace logger
XrdOucCacheStats m_stats; //!<
Expand Down Expand Up @@ -219,11 +221,19 @@ namespace XrdFileCache
std::list<Block*> queue; //!< container
};

WriteQ s_writeQ;
WriteQ m_writeQ;

struct DiskNetIO
{
IO* io;
File* file;
};

std::map<std::string, DiskNetIO> m_active;

// prefetching
typedef std::vector<File*> FileList;
FileList m_files;
typedef std::vector<File*> PrefetchList;
PrefetchList m_prefetchList;
};

}
Expand Down
43 changes: 28 additions & 15 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ bool Cache::Config(XrdSysLogger *logger, const char *config_filename, const char
}
}

// get number of available RAM blocks after process configuration
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable/ m_configuration.m_bufferSize);
if (retval)
{
int loff = 0;
char buff[2048];
loff = snprintf(buff, sizeof(buff), "result\n"
"\tpfc.cachedir %s\n"
"\tpfc.blocksize %lld\n"
"\tpfc.prefetch %d\n"
"\tpfc.nram %d\n\n",
m_configuration.m_cache_dir.c_str() ,
"\tpfc.nramblocks %d\n\n",
m_configuration.m_bufferSize,
m_configuration.m_prefetch, // AMT not sure what parsing should be
m_configuration.m_NRamBuffers );
Expand Down Expand Up @@ -227,12 +227,12 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config )
errno = 0;
float lwmf = strtod(minV.c_str(), &eP);
if (errno || eP == minV.c_str()) {
m_log.Emsg("Cache::ConfigParameters() error parsing diskusage parameter ", minV.c_str());
m_log.Emsg("Factory::ConfigParameters() error parsing diskusage parameter ", minV.c_str());
return false;
}
float hwmf = strtod(maxV.c_str(), &eP);
if (errno || eP == maxV.c_str()) {
m_log.Emsg("Cache::ConfigParameters() error parsing diskusage parameter ", maxV.c_str());
m_log.Emsg("Factory::ConfigParameters() error parsing diskusage parameter ", maxV.c_str());
return false;
}

Expand All @@ -251,20 +251,33 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config )
return false;
}
}
else if (part == "prefetch" )
else if (part == "prefetch_max_blocks" )
{
int p = ::atoi(config.GetWord());
if (p > 0) {
printf("prefetch enabled, max blocks per file=%d\n", p);
m_configuration.m_prefetch = true;
m_configuration.m_prefetch_max_blocks = p;
} else {
m_configuration.m_prefetch = false;
const char* params = config.GetWord();
if (params) {
int p = ::atoi(params);
if (p > 0) {
printf("prefetch enabled, max blocks per file=%d\n", p);
m_configuration.m_prefetch_max_blocks = p;
} else {
m_log.Emsg("Config", "Prefetch is disabled");
m_configuration.m_prefetch_max_blocks = 0;
}
}
else
{
m_log.Emsg("Config", "Error setting prefetch level.");
return false;
}
}
else if (part == "nram" )
else if (part == "ram" )
{
m_configuration.m_NRamBuffers = ::atoi(config.GetWord());
long long minRAM = 1024* 1024 * 1024;;
long long maxRAM = 100 * minRAM;
if ( XrdOuca2x::a2sz(m_log, "get RAM available", config.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
{
return false;
}
}
else if ( part == "hdfsmode" )
{
Expand Down
10 changes: 5 additions & 5 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ File::File(XrdOucCacheIO2 *inputIO, std::string& disk_file_path, long long iOffs
m_input(inputIO),
m_output(NULL),
m_infoFile(NULL),
m_cfi(Cache::GetInstance().RefConfiguration().m_bufferSize, Cache::GetInstance().RefConfiguration().m_prefetch),
m_cfi(Cache::GetInstance().RefConfiguration().m_bufferSize, Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
m_temp_filename(disk_file_path),
m_offset(iOffset),
m_fileSize(iFileSize),
Expand Down Expand Up @@ -251,7 +251,7 @@ bool File::Open()
return false;
}

if (m_cfi.Read(m_infoFile, Cache::GetInstance().RefConfiguration().m_prefetch) <= 0)
if (m_cfi.Read(m_infoFile) <= 0)
{
m_fileSize = m_fileSize;
int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1;
Expand Down Expand Up @@ -726,7 +726,7 @@ void File::WriteBlockToDisk(Block* b)
++m_non_flushed_cnt;
}

if (m_non_flushed_cnt >= 100 && (m_cfi.IsComplete() && m_non_flushed_cnt > 0))
if (m_non_flushed_cnt >= 100 )
{
schedule_sync = true;
m_in_sync = true;
Expand Down Expand Up @@ -927,7 +927,7 @@ void File::Prefetch()
//______________________________________________________________________________
void File::CheckPrefetchStatRAM(Block* b)
{
if (Cache::GetInstance().RefConfiguration().m_prefetch) {
if (Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) {
if (b->m_prefetch) {
m_prefetchHitCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
Expand All @@ -938,7 +938,7 @@ void File::CheckPrefetchStatRAM(Block* b)
//______________________________________________________________________________
void File::CheckPrefetchStatDisk(int idx)
{
if (Cache::GetInstance().RefConfiguration().m_prefetch) {
if (Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) {
if (m_cfi.TestPrefetchBit(idx))
m_prefetchHitCnt++;
}
Expand Down
4 changes: 3 additions & 1 deletion src/XrdFileCache/XrdFileCacheIO.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ namespace XrdFileCache
virtual int Write(char *Buffer, long long Offset, int Length)
{ errno = ENOTSUP; return -1; }

virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; }
virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; }

virtual void RelinquishFile(File*) {}

protected:
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }
Expand Down