Skip to content

Commit

Permalink
Add changed files missing in the previous commit c00a774.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 0a0b3e9 commit fdd85ea
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 37 deletions.
3 changes: 2 additions & 1 deletion src/XrdFileCache.cmake
Expand Up @@ -17,7 +17,8 @@ add_library(
${LIB_XRD_FILECACHE}
MODULE
XrdFileCache/XrdFileCache.cc XrdFileCache/XrdFileCache.hh
XrdFileCache/XrdFileCacheFactory.cc XrdFileCache/XrdFileCacheFactory.hh
XrdFileCache/XrdFileCacheConfiguration.cc
XrdFileCache/XrdFileCachePurge.cc
XrdFileCache/XrdFileCacheFile.cc XrdFileCache/XrdFileCacheFile.hh
XrdFileCache/XrdFileCacheVRead.cc
XrdFileCache/XrdFileCacheStats.hh
Expand Down
89 changes: 81 additions & 8 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -27,15 +27,27 @@
#include "XrdSys/XrdSysTimer.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucUtils.hh"

#include "XrdFileCache.hh"
#include "XrdFileCacheIOEntireFile.hh"
#include "XrdFileCacheIOFileBlock.hh"
#include "XrdFileCacheFactory.hh"
//#include "XrdFileCacheConfiguration.cc"
//#include "XrdFileCachePurge.cc"


using namespace XrdFileCache;



Cache * Cache::m_factory = NULL;

void *CacheDirCleanupThread(void* cache_void)
{
Cache::GetInstance().CacheDirCleanup();
return NULL;
}

void *ProcessWriteTaskThread(void* c)
{
Cache *cache = static_cast<Cache*>(c);
Expand All @@ -49,12 +61,73 @@ void *PrefetchThread(void* ptr)
cache->Prefetch();
return NULL;
}


extern "C"
{
XrdOucCache *XrdOucGetCache(XrdSysLogger *logger,
const char *config_filename,
const char *parameters)
{
XrdSysError err(0, "");
err.logger(logger);
err.Emsg("Retrieve", "Retrieving a caching proxy factory.");
Cache &factory = Cache::GetInstance();
if (!factory.Config(logger, config_filename, parameters))
{
err.Emsg("Retrieve", "Error - unable to create a factory.");
return NULL;
}
err.Emsg("Retrieve", "Success - returning a factory.");


pthread_t tid;
XrdSysThread::Run(&tid, CacheDirCleanupThread, NULL, 0, "XrdFileCache CacheDirCleanup");
return &factory;
}
}

Cache &Cache::GetInstance()
{
if (m_factory == NULL)
m_factory = new Cache();
return *m_factory;
}

// !AMT will be obsolete in future
XrdOucCache *Cache::Create(Parms & parms, XrdOucCacheIO::aprParms * prParms)
{
return this;
}


//______________________________________________________________________________

bool Cache::Decide(XrdOucCacheIO* io)
{
if (!m_decisionpoints.empty())
{
std::string filename = io->Path();
std::vector<Decision*>::const_iterator it;
for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
{
XrdFileCache::Decision *d = *it;
if (!d) continue;
if (!d->Decide(filename, *m_output_fs))
{
return false;
}
}
}

return true;
}
//______________________________________________________________________________


Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(),
Cache::Cache() : XrdOucCache(),
m_log(0, "XrdFileCache_"),
m_prefetch_condVar(0),
m_stats(stats),
m_RAMblocks_used(0)
{
pthread_t tid1;
Expand All @@ -68,11 +141,11 @@ Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(),

XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
{
if (Factory::GetInstance().Decide(io))
if (Cache::GetInstance().Decide(io))
{
clLog()->Info(XrdCl::AppMsg, "Cache::Attach() %s", io->Path());
IO* cio;
if (Factory::GetInstance().RefConfiguration().m_hdfsmode)
if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
cio = new IOFileBlock(*io, m_stats, *this);
else
cio = new IOEntireFile(*io, m_stats, *this);
Expand Down Expand Up @@ -179,7 +252,7 @@ bool
Cache::RequestRAMBlock()
{
XrdSysMutexHelper lock(&m_RAMblock_mutex);
if ( m_RAMblocks_used < Factory::GetInstance().RefConfiguration().m_NRamBuffers )
if ( m_RAMblocks_used < Cache::GetInstance().RefConfiguration().m_NRamBuffers )
{
m_RAMblocks_used++;
return true;
Expand Down Expand Up @@ -215,7 +288,7 @@ Cache::RegisterPrefetchFile(File* file)
{
// called from File::Open()

if (Factory::GetInstance().RefConfiguration().m_prefetch)
if (Cache::GetInstance().RefConfiguration().m_prefetch)
{

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Register new file BEGIN");
Expand Down Expand Up @@ -269,7 +342,7 @@ Cache::GetNextFileToPrefetch()
void
Cache::Prefetch()
{
const static int limitRAM= Factory::GetInstance().RefConfiguration().m_NRamBuffers * 0.7;
const static int limitRAM= Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7;

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch thread start");

Expand Down
105 changes: 99 additions & 6 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -20,10 +20,15 @@
#include <string>
#include <list>

#include "XrdVersion.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdFileCacheFile.hh"
#include "XrdFileCacheDecision.hh"

class XrdOucStream;
class XrdSysError;

namespace XrdCl {
class Log;
Expand All @@ -35,6 +40,38 @@ class IO;

namespace XrdFileCache
{
//----------------------------------------------------------------------------
//! Contains parameters configurable from the xrootd config file.
//----------------------------------------------------------------------------
struct Configuration
{
Configuration() :
m_hdfsmode(false),
m_diskUsageLWM(-1),
m_diskUsageHWM(-1),
m_bufferSize(1024*1024),
m_NRamBuffers(8000),
m_prefetch(false),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024) {}

bool m_hdfsmode; //!< flag for enabling block-level operation
std::string m_cache_dir; //!< path of disk cache
std::string m_username; //!< username passed to oss plugin

long long m_diskUsageLWM; //!< cache purge low water mark
long long m_diskUsageHWM; //!< cache purge high water mark

long long m_bufferSize; //!< prefetch buffer size, default 1MB
int m_NRamBuffers; //!< number of total in-memory cache blocks
bool m_prefetch; //!< prefetch enable state
size_t m_prefetch_max_blocks;//!< maximum number of blocks to prefetch per file

long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
};



//----------------------------------------------------------------------------
//! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
//----------------------------------------------------------------------------
Expand All @@ -44,7 +81,7 @@ namespace XrdFileCache
//---------------------------------------------------------------------
//! Constructor
//---------------------------------------------------------------------
Cache(XrdOucCacheStats&);
Cache();

//---------------------------------------------------------------------
//! Obtain a new IO object that fronts existing XrdOucCacheIO.
Expand All @@ -57,11 +94,49 @@ namespace XrdFileCache
virtual int isAttached();

//---------------------------------------------------------------------
//! \brief Unused abstract method. Plugin instantiation role is given
//! to the Factory class.
// this is an obsolete method
virtual XrdOucCache* Create(XrdOucCache::Parms&, XrdOucCacheIO::aprParms*);

//--------------------------------------------------------------------
//! \brief Makes decision if the original XrdOucCacheIO should be cached.
//!
//! @param & URL of file
//!
//! @return decision if IO object will be cached.
//--------------------------------------------------------------------
bool Decide(XrdOucCacheIO*);

//------------------------------------------------------------------------
//! Reference XrdFileCache configuration
//------------------------------------------------------------------------
const Configuration& RefConfiguration() const { return m_configuration; }


//---------------------------------------------------------------------
//! \brief Parse configuration file
//!
//! @param logger xrootd logger
//! @param config_filename path to configuration file
//! @param parameters optional parameters to be passed
//!
//! @return parse status
//---------------------------------------------------------------------
bool Config(XrdSysLogger *logger, const char *config_filename, const char *parameters);

//---------------------------------------------------------------------
//! Singleton access.
//---------------------------------------------------------------------
static Cache &GetInstance();

//---------------------------------------------------------------------
//! Version check.
//---------------------------------------------------------------------
static bool VCheck(XrdVersionInfo &urVersion) { return true; }

//---------------------------------------------------------------------
//! Thread function running disk cache purge periodically.
//---------------------------------------------------------------------
virtual XrdOucCache* Create(XrdOucCache::Parms&, XrdOucCacheIO::aprParms*)
{ return NULL; }
void CacheDirCleanup();

//---------------------------------------------------------------------
//! Add downloaded block in write queue.
Expand Down Expand Up @@ -98,13 +173,31 @@ namespace XrdFileCache
//! Decrease attached count. Called from IO::Detach().
void Detach(XrdOucCacheIO *);

XrdOss* GetOss() const { return m_output_fs; }

XrdSysError& GetSysError() { return m_log; }


private:
bool ConfigParameters(std::string, XrdOucStream&);
bool ConfigXeq(char *, XrdOucStream &);
bool xdlib(XrdOucStream &);
static Cache *m_factory; //!< this object

XrdSysError m_log; //!< XrdFileCache namespace logger
XrdOucCacheStats m_stats; //!<
XrdOss *m_output_fs; //!< disk cache file system

std::vector<XrdFileCache::Decision*> m_decisionpoints; //!< decision plugins

std::map<std::string, long long> m_filesInQueue;

Configuration m_configuration; //!< configurable parameters

//! Short log alias.
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }

XrdSysCondVar m_prefetch_condVar; //!< central lock for this class
XrdOucCacheStats &m_stats; //!< global cache usage statistics

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
int m_RAMblocks_used;
Expand Down

0 comments on commit fdd85ea

Please sign in to comment.