Skip to content

Commit

Permalink
[Proxy] Refactor proxy server caching implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Oct 24, 2019
1 parent 3b45832 commit 1339e37
Show file tree
Hide file tree
Showing 35 changed files with 2,458 additions and 1,046 deletions.
27 changes: 12 additions & 15 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -67,13 +67,17 @@ void *PrefetchThread(void* ptr)

extern "C"
{
XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger,
const char *config_filename,
const char *parameters)
XrdOucCache *XrdOucGetCache(XrdSysLogger *logger,
const char *config_filename,
const char *parameters,
XrdOucEnv *envP)
{
XrdSysError err(logger, "");
err.Say("++++++ Proxy file cache initialization started.");

if (envP)
XrdFileCache::Cache::schedP = (XrdScheduler *)envP->GetPtr("XrdScheduler*");

Cache &factory = Cache::CreateInstance(logger);

if (! factory.Config(config_filename, parameters))
Expand Down Expand Up @@ -160,7 +164,7 @@ bool Cache::Decide(XrdOucCacheIO* io)
}

Cache::Cache(XrdSysLogger *logger) :
XrdOucCache2(),
XrdOucCache(),
m_log(logger, "XrdFileCache_"),
m_trace(new XrdSysTrace("XrdFileCache", logger)),
m_traceID("Manager"),
Expand All @@ -178,7 +182,7 @@ Cache::Cache(XrdSysLogger *logger) :
}


XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
{
const char* tpfx = "Cache::Attach() ";

Expand Down Expand Up @@ -221,13 +225,6 @@ XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
}


int Cache::isAttached()
{
// virutal function of XrdOucCache, don't see it used in pfc or posix layer
return true;
}


void Cache::AddWriteTask(Block* b, bool fromRead)
{
TRACE(Dump, "Cache::AddWriteTask() bOff=%ld " << b->m_offset);
Expand Down Expand Up @@ -675,7 +672,7 @@ void Cache::Prefetch()


//==============================================================================
//=== Virtuals from XrdOucCache2
//=== Virtuals from XrdOucCache
//==============================================================================

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -801,7 +798,7 @@ int Cache::LocalFilePath(const char *curl, char *buff, int blen,
//! @return <0 Error has occurred, return value is -errno; fail open request.
//! =0 Continue with open() request.
//! >0 Defer open but treat the file as actually being open. Use the
//! XrdOucCacheIO2::Open() method to open the file at a later time.
//! XrdOucCacheIO::Open() method to open the file at a later time.
//------------------------------------------------------------------------------

int Cache::Prepare(const char *curl, int oflags, mode_t mode)
Expand Down Expand Up @@ -853,7 +850,7 @@ int Cache::Prepare(const char *curl, int oflags, mode_t mode)
}

//______________________________________________________________________________
// virtual method of XrdOucCache2.
// virtual method of XrdOucCache.
//!
//! @return <0 - Stat failed, value is -errno.
//! =0 - Stat succeeded, sbuff holds stat information.
Expand Down
26 changes: 9 additions & 17 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -25,7 +25,7 @@
#include "Xrd/XrdScheduler.hh"
#include "XrdVersion.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucCallBack.hh"
#include "XrdCl/XrdClDefaultEnv.hh"

Expand Down Expand Up @@ -276,7 +276,7 @@ struct PathTokenizer : private SplitParser
//----------------------------------------------------------------------------
//! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
//----------------------------------------------------------------------------
class Cache : public XrdOucCache2
class Cache : public XrdOucCache
{
public:
//---------------------------------------------------------------------
Expand All @@ -287,30 +287,21 @@ public:
//---------------------------------------------------------------------
//! Obtain a new IO object that fronts existing XrdOucCacheIO.
//---------------------------------------------------------------------
using XrdOucCache2::Attach;
using XrdOucCache::Attach;

virtual XrdOucCacheIO2 *Attach(XrdOucCacheIO2 *, int Options = 0);
virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);

//---------------------------------------------------------------------
//! Number of cache-io objects atteched through this cache.
//---------------------------------------------------------------------
virtual int isAttached();

//---------------------------------------------------------------------
// Virtual function of XrdOucCache2. Used to pass environmental info.
virtual void EnvInfo(XrdOucEnv &theEnv);

//---------------------------------------------------------------------
// Virtual function of XrdOucCache2. Used for redirection to a local
// Virtual function of XrdOucCache. Used for redirection to a local
// file on a distributed FS.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
LFP_Reason why=ForAccess, bool forall=false);

//---------------------------------------------------------------------
// Virtual function of XrdOucCache2. Used for deferred open.
// Virtual function of XrdOucCache. Used for deferred open.
virtual int Prepare(const char *url, int oflags, mode_t mode);

// virtual function of XrdOucCache2.
// virtual function of XrdOucCache.
virtual int Stat(const char *url, struct stat &sbuff);

// virtual function of XrdOucCache.
Expand Down Expand Up @@ -409,6 +400,8 @@ public:

void ExecuteCommandUrl(const std::string& command_url);

static XrdScheduler *schedP;

private:
bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
bool ConfigXeq(char *, XrdOucStream &);
Expand All @@ -420,7 +413,6 @@ private:
int UnlinkCommon(const std::string& f_name, bool fail_if_open);

static Cache *m_factory; //!< this object
static XrdScheduler *schedP;

XrdSysError m_log; //!< XrdFileCache namespace logger
XrdSysTrace *m_trace;
Expand Down
14 changes: 2 additions & 12 deletions src/XrdFileCache/XrdFileCacheConfiguration.cc
Expand Up @@ -18,7 +18,7 @@

using namespace XrdFileCache;

XrdVERSIONINFO(XrdOucGetCache2, XrdFileCache);
XrdVERSIONINFO(XrdOucGetCache, XrdFileCache);

bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
{
Expand Down Expand Up @@ -174,7 +174,7 @@ bool Cache::Config(const char *config_filename, const char *parameters)

// Obtain OFS configurator for OSS plugin.
XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
&XrdVERSIONINFOVAR(XrdOucGetCache2));
&XrdVERSIONINFOVAR(XrdOucGetCache));
if (! ofsCfg) return false;

TmpConfiguration tmpc;
Expand Down Expand Up @@ -650,13 +650,3 @@ bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfigur

return true;
}

//______________________________________________________________________________


void Cache::EnvInfo(XrdOucEnv &theEnv)
{
// Extract out the pointer to the scheduler
//
schedP = (XrdScheduler *) theEnv.GetPtr("XrdScheduler*");
}
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -21,7 +21,7 @@
#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClDefaultEnv.hh"

#include "XrdOuc/XrdOucCache2.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucIOVec.hh"

#include "XrdFileCacheInfo.hh"
Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheIO.cc
Expand Up @@ -3,27 +3,27 @@

using namespace XrdFileCache;

IO::IO(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache) :
IO::IO(XrdOucCacheIO *io, XrdOucCacheStats &stats, Cache &cache) :
m_statsGlobal(stats), m_cache(cache), m_traceID("IO"), m_io(io)
{
m_path = m_io->Path();
}

void IO::Update(XrdOucCacheIO2 &iocp)
void IO::Update(XrdOucCacheIO &iocp)
{
SetInput(&iocp);
TRACE_PC(Info, const char* loc = m_io->Location(),
"IO::Update() " << Path() << " location: " <<
((loc && loc[0] != 0) ? loc : "<not set>"));
}

void IO::SetInput(XrdOucCacheIO2* x)
void IO::SetInput(XrdOucCacheIO* x)
{
XrdSysMutexHelper lock(&updMutex);
m_io = x;
}

XrdOucCacheIO2* IO::GetInput()
XrdOucCacheIO* IO::GetInput()
{
XrdSysMutexHelper lock(&updMutex);
return m_io;
Expand Down
20 changes: 10 additions & 10 deletions src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -4,7 +4,7 @@
class XrdSysTrace;

#include "XrdFileCache.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdSys/XrdSysPthread.hh"

Expand All @@ -13,34 +13,34 @@ namespace XrdFileCache
//----------------------------------------------------------------------------
//! Base cache-io class that implements XrdOucCacheIO abstract methods.
//----------------------------------------------------------------------------
class IO : public XrdOucCacheIO2
class IO : public XrdOucCacheIO
{
public:
IO (XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache);
IO (XrdOucCacheIO *io, XrdOucCacheStats &stats, Cache &cache);

//! Original data source.
virtual XrdOucCacheIO *Base() { return m_io; }

//! Original data source URL.
virtual const char *Path() { return m_io->Path(); }

using XrdOucCacheIO2::Sync;
using XrdOucCacheIO::Sync;

virtual int Sync() { return 0; }

using XrdOucCacheIO2::Trunc;
using XrdOucCacheIO::Trunc;

virtual int Trunc(long long Offset) { return -ENOTSUP; }

using XrdOucCacheIO2::Write;
using XrdOucCacheIO::Write;

virtual int Write(char *Buffer, long long Offset, int Length) { return -ENOTSUP; }

virtual void Update(XrdOucCacheIO2 &iocp);
virtual void Update(XrdOucCacheIO &iocp);

XrdSysTrace* GetTrace() { return m_cache.GetTrace(); }

XrdOucCacheIO2* GetInput();
XrdOucCacheIO* GetInput();

protected:
XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics
Expand All @@ -51,9 +51,9 @@ protected:
const char* GetPath() { return m_path.c_str(); }

private:
XrdOucCacheIO2 *m_io; //!< original data source
XrdOucCacheIO *m_io; //!< original data source
XrdSysMutex updMutex;
void SetInput(XrdOucCacheIO2*);
void SetInput(XrdOucCacheIO*);
};
}

Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -32,7 +32,7 @@
using namespace XrdFileCache;

//______________________________________________________________________________
IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache & cache) :
IOEntireFile::IOEntireFile(XrdOucCacheIO *io, XrdOucCacheStats &stats, Cache & cache) :
IO(io, stats, cache),
m_file(0),
m_localStat(0)
Expand Down Expand Up @@ -135,7 +135,7 @@ bool IOEntireFile::ioActive()
}

//______________________________________________________________________________
XrdOucCacheIO *IOEntireFile::Detach()
bool IOEntireFile::Detach(XrdOucCacheIOCD &iocdP)
{
// Called from XrdPosixFile destructor.

Expand All @@ -147,9 +147,9 @@ XrdOucCacheIO *IOEntireFile::Detach()
m_file->RequestSyncOfDetachStats();
Cache::GetInstance().ReleaseFile(m_file, this);
}
XrdOucCacheIO *io = GetInput();
delete this;
return io;
//???? This needs to be coordinated ith old ioActive() and return false if active
return true;
}

//______________________________________________________________________________
Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.hh
Expand Up @@ -43,7 +43,7 @@ public:
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache);
IOEntireFile(XrdOucCacheIO *io, XrdOucCacheStats &stats, Cache &cache);

//------------------------------------------------------------------------
//! Destructor
Expand All @@ -64,7 +64,7 @@ public:
//!
//! @return number of bytes read
//---------------------------------------------------------------------
using XrdOucCacheIO2::Read;
using XrdOucCacheIO::Read;

virtual int Read(char *Buffer, long long Offset, int Length);

Expand All @@ -76,7 +76,7 @@ public:
//!
//! @return total bytes read
//---------------------------------------------------------------------
using XrdOucCacheIO2::ReadV;
using XrdOucCacheIO::ReadV;

virtual int ReadV(const XrdOucIOVec *readV, int n);

Expand All @@ -85,7 +85,7 @@ public:
//!
//! @return original source \ref XrdPosixFile
//---------------------------------------------------------------------
virtual XrdOucCacheIO* Detach();
virtual bool Detach(XrdOucCacheIOCD &iocdP);

//! \brief Virtual method of XrdOucCacheIO.
//! Called to check if destruction needs to be done in a separate task.
Expand Down
10 changes: 5 additions & 5 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Expand Up @@ -36,7 +36,7 @@
using namespace XrdFileCache;

//______________________________________________________________________________
IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cache & cache) :
IOFileBlock::IOFileBlock(XrdOucCacheIO *io, XrdOucCacheStats &statsGlobal, Cache & cache) :
IO(io, statsGlobal, cache), m_localStat(0), m_info(cache.GetTrace(), false), m_infoFile(0)
{
m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize;
Expand All @@ -54,9 +54,9 @@ IOFileBlock::~IOFileBlock()
}

//______________________________________________________________________________
XrdOucCacheIO* IOFileBlock::Detach()
bool IOFileBlock::Detach(XrdOucCacheIOCD &iocdP)
{
// Called from XrdPosixFile destructor
// Called from XrdPosixFile destructor ???? Not really tue

TRACEIO(Info, "Detach IOFileBlock");

Expand All @@ -72,9 +72,9 @@ XrdOucCacheIO* IOFileBlock::Detach()
}
}
}
XrdOucCacheIO *io = GetInput();
delete this;
return io;
//???? This needs to be coordinated with old ioActive() and return false if active
return true;
}


Expand Down

0 comments on commit 1339e37

Please sign in to comment.