Skip to content

Commit

Permalink
Global limit of RAM resources.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent e8e63ae commit 9593eac
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 30 deletions.
27 changes: 24 additions & 3 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -32,8 +32,6 @@
#include "XrdFileCacheFactory.hh"


XrdFileCache::Cache::WriteQ XrdFileCache::Cache::s_writeQ;

using namespace XrdFileCache;
void *ProcessWriteTaskThread(void* c)
{
Expand All @@ -44,7 +42,8 @@ void *ProcessWriteTaskThread(void* c)

Cache::Cache(XrdOucCacheStats & stats)
: m_attached(0),
m_stats(stats)
m_stats(stats),
m_RAMblocks_used(0)
{
pthread_t tid;
XrdSysThread::Run(&tid, ProcessWriteTaskThread, (void*)this, 0, "XrdFileCache WriteTasks ");
Expand Down Expand Up @@ -103,6 +102,7 @@ void Cache::getFilePathFromURL(const char* iUrl, std::string &result) const
result = Factory::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();
}

// XXXX MT: is the following needed ???
//______________________________________________________________________________
bool
Cache::HaveFreeWritingSlots()
Expand Down Expand Up @@ -167,4 +167,25 @@ Cache::ProcessWriteTasks()
block->m_file->WriteBlockToDisk(block);
}
}
//______________________________________________________________________________

bool
Cache::RequestRAMBlock()
{
XrdSysMutexHelper lock(&m_RAMblock_mutex);
if ( m_RAMblocks_used > Factory::GetInstance().RefConfiguration().m_NRamBuffers )
{
m_RAMblocks_used++;
return true;
}

return false;
}

void
Cache::RAMBlockReleased()
{
XrdSysMutexHelper lock(&m_RAMblock_mutex);
m_RAMblocks_used--;
}

18 changes: 10 additions & 8 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -68,23 +68,23 @@ namespace XrdFileCache
//---------------------------------------------------------------------
//! Add downloaded block in write queue.
//---------------------------------------------------------------------
static void AddWriteTask(Block* b, bool from_read);
void AddWriteTask(Block* b, bool from_read);

//---------------------------------------------------------------------
//! Check write queue size is not over limit.
//---------------------------------------------------------------------
static bool HaveFreeWritingSlots();

//---------------------------------------------------------------------
//! \brief Remove blocks from write queue which belong to given prefetch.
//! This method is used at the time of File destruction.
//---------------------------------------------------------------------
static void RemoveWriteQEntriesFor(File *f);
void RemoveWriteQEntriesFor(File *f);

//---------------------------------------------------------------------
//! Separate task which writes blocks from ram to disk.
//---------------------------------------------------------------------
static void ProcessWriteTasks();
void ProcessWriteTasks();

bool RequestRAMBlock();

void RAMBlockReleased();

private:
//! Decrease attached count. Called from IO::Detach().
Expand All @@ -100,6 +100,8 @@ namespace XrdFileCache
unsigned int m_attached; //!< number of attached IO objects
XrdOucCacheStats &m_stats; //!< global cache usage statistics

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
int m_RAMblocks_used;

struct WriteQ
{
Expand All @@ -109,7 +111,7 @@ namespace XrdFileCache
std::list<Block*> queue; //!< container
};

static WriteQ s_writeQ;
WriteQ s_writeQ;

};

Expand Down
19 changes: 9 additions & 10 deletions src/XrdFileCache/XrdFileCacheFactory.cc
Expand Up @@ -60,7 +60,8 @@ void *CacheDirCleanupThread(void* cache_void)


Factory::Factory()
: m_log(0, "XrdFileCache_")
: m_log(0, "XrdFileCache_"),
m_cache(0)
{}

extern "C"
Expand Down Expand Up @@ -97,7 +98,9 @@ Factory &Factory::GetInstance()
XrdOucCache *Factory::Create(Parms & parms, XrdOucCacheIO::aprParms * prParms)
{
clLog()->Info(XrdCl::AppMsg, "Factory::Create() new cache object");
return new Cache(m_stats);
assert(m_cache == 0);
m_cache = new Cache(m_stats);
return m_cache;
}


Expand Down Expand Up @@ -269,10 +272,10 @@ bool Factory::Config(XrdSysLogger *logger, const char *config_filename, const ch
loff = snprintf(buff, sizeof(buff), "result\n"
"\tpfc.cachedir %s\n"
"\tpfc.blocksize %lld\n"
"\tpfc.nramread %d\n\tpfc.nramprefetch %d\n",
"\tpfc.nram %d\n\n",
m_configuration.m_cache_dir.c_str() ,
m_configuration.m_bufferSize,
m_configuration.m_NRamBuffersRead, m_configuration.m_NRamBuffersPrefetch );
m_configuration.m_NRamBuffers );

if (m_configuration.m_hdfsmode)
{
Expand Down Expand Up @@ -360,13 +363,9 @@ bool Factory::ConfigParameters(std::string part, XrdOucStream& config )
return false;
}
}
else if (part == "nramread")
else if (part == "nram")
{
m_configuration.m_NRamBuffersRead = ::atoi(config.GetWord());
}
else if (part == "nramprefetch")
{
m_configuration.m_NRamBuffersPrefetch = ::atoi(config.GetWord());
m_configuration.m_NRamBuffers = ::atoi(config.GetWord());
}
else if ( part == "hdfsmode" )
{
Expand Down
14 changes: 10 additions & 4 deletions src/XrdFileCache/XrdFileCacheFactory.hh
Expand Up @@ -37,6 +37,10 @@ namespace XrdCl
class Log;
}

namespace XrdFileCache {
class Cache;
}

namespace XrdFileCache
{
//----------------------------------------------------------------------------
Expand All @@ -49,8 +53,7 @@ namespace XrdFileCache
m_diskUsageLWM(-1),
m_diskUsageHWM(-1),
m_bufferSize(1024*1024),
m_NRamBuffersRead(8),
m_NRamBuffersPrefetch(1),
m_NRamBuffers(8000),
m_hdfsbsize(128*1024*1024) {}

bool m_hdfsmode; //!< flag for enabling block-level operation
Expand All @@ -61,8 +64,7 @@ namespace XrdFileCache
long long m_diskUsageHWM; //!< cache purge high water mark

long long m_bufferSize; //!< prefetch buffer size, default 1MB
int m_NRamBuffersRead; //!< number of read in-memory cache blocks
int m_NRamBuffersPrefetch; //!< number of prefetch in-memory cache blocks
int m_NRamBuffers; //!< number of total in-memory cache blocks
long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
};

Expand Down Expand Up @@ -143,6 +145,8 @@ namespace XrdFileCache
//---------------------------------------------------------------------
void CacheDirCleanup();


Cache* GetCache() { return m_cache; }
private:
bool ConfigParameters(std::string, XrdOucStream&);
bool ConfigXeq(char *, XrdOucStream &);
Expand All @@ -161,6 +165,8 @@ namespace XrdFileCache
std::map<std::string, long long> m_filesInQueue;

Configuration m_configuration; //!< configurable parameters

Cache* m_cache;
};
}

Expand Down
15 changes: 10 additions & 5 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -64,6 +64,10 @@ class DiskSyncer : public XrdJob
};
}

namespace
{
Cache* cache() {return Factory::GetInstance().GetCache();}
}

File::File(XrdOucCacheIO &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) :
m_input(inputIO),
Expand Down Expand Up @@ -91,7 +95,7 @@ m_block_cond(0)
File::~File()
{
clLog()->Debug(XrdCl::AppMsg, "File::~File() %p %s", (void*)this, lPath());
Cache::RemoveWriteQEntriesFor(this);
cache()->RemoveWriteQEntriesFor(this);
clLog()->Info(XrdCl::AppMsg, "File::~File() check write queues ...%s", lPath());

// can I do anythong to stop waiting for asyc read callbacks ?
Expand Down Expand Up @@ -376,8 +380,6 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// passing the req to client is actually better.
// unlock

const int MaxBlocksForRead = 16; // AMT Should be config var! Or even mutable for low client counts.

m_block_cond.Lock();

size_t msize = m_block_map.size();
Expand Down Expand Up @@ -411,7 +413,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
else
{
// Is there room for one more RAM Block?
if ( msize < MaxBlocksForRead)
if ( cache()->RequestRAMBlock())
{
Block *b = RequestBlock(block_idx);
inc_ref_count(b);
Expand Down Expand Up @@ -677,6 +679,9 @@ void File::dec_ref_count(Block* b)
if (ret != 1) {
clLog()->Error(XrdCl::AppMsg, "File::OnBlockZeroRefCount did not erase %d from map.", i);
}
else {
cache()->RAMBlockReleased();
}
}
}

Expand All @@ -693,7 +698,7 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
b->m_downloaded = true;
if (!m_stopping) { // AMT theoretically this should be under state lock, but then are double locks
inc_ref_count(b);
XrdFileCache::Cache::AddWriteTask(b, true);
cache()->AddWriteTask(b, true);
}
}
else
Expand Down

0 comments on commit 9593eac

Please sign in to comment.