Skip to content

Commit

Permalink
Moving to XrdOucCache2.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja committed Mar 18, 2016
1 parent d89de8d commit d02b22c
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 81 deletions.
4 changes: 2 additions & 2 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -65,7 +65,7 @@ void *PrefetchThread(void* ptr)

extern "C"
{
XrdOucCache *XrdOucGetCache(XrdSysLogger *logger,
XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger,
const char *config_filename,
const char *parameters)
{
Expand Down Expand Up @@ -139,7 +139,7 @@ Cache::Cache() : XrdOucCache(),

//______________________________________________________________________________

XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
XrdOucCacheIO2 *Cache::Attach(XrdOucCacheIO2 *io, int Options)
{
if (Cache::GetInstance().Decide(io))
{
Expand Down
8 changes: 5 additions & 3 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -22,7 +22,8 @@

#include "XrdVersion.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdOuc/XrdOucCallBack.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdFileCacheFile.hh"
#include "XrdFileCacheDecision.hh"
Expand All @@ -38,6 +39,7 @@ class File;
class IO;
}


namespace XrdFileCache
{
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -75,7 +77,7 @@ namespace XrdFileCache
//----------------------------------------------------------------------------
//! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
//----------------------------------------------------------------------------
class Cache : public XrdOucCache
class Cache : public XrdOucCache2
{
public:
//---------------------------------------------------------------------
Expand All @@ -86,7 +88,7 @@ namespace XrdFileCache
//---------------------------------------------------------------------
//! Obtain a new IO object that fronts existing XrdOucCacheIO.
//---------------------------------------------------------------------
virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options=0);
virtual XrdOucCacheIO2 *Attach(XrdOucCacheIO2 *, int Options=0);

//---------------------------------------------------------------------
//! Number of cache-io objects atteched through this cache.
Expand Down
69 changes: 24 additions & 45 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -70,7 +70,7 @@ namespace
Cache* cache() { return &Cache::GetInstance(); }
}

File::File(XrdOucCacheIO &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) :
File::File(XrdOucCacheIO2 &inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) :
m_input(inputIO),
m_output(NULL),
m_infoFile(NULL),
Expand Down Expand Up @@ -334,7 +334,6 @@ Block* File::RequestBlock(int i, bool prefetch)
// catch the block while still in memory.
clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d pOn=(%d)", i, prefetch);

XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

const long long BS = m_cfi.GetBufferSize();
const int last_block = m_cfi.GetSizeInBits() - 1;
Expand All @@ -344,32 +343,25 @@ Block* File::RequestBlock(int i, bool prefetch)

Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation

XrdCl::XRootDStatus status = client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b));
if (status.IsOK()) {
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath());
m_block_map[i] = b;
BlockResponseHandler* oucCB = new BlockResponseHandler(b);
m_input.Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs);

if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
{
m_prefetchState = kHold;
cache()->DeRegisterPrefetchFile(this);
}
return b;
}
else {
clLog()->Error(XrdCl::AppMsg, "File::RequestBlock() error %d, this = %p, b=%p, this idx=%d pOn=(%d) %s", status.code, (void*)this, (void*)b, i, prefetch, lPath());
XrdPosixMap::Result(status);
return 0;
clLog()->Dump(XrdCl::AppMsg, "File::RequestBlock() this = %p, b=%p, this idx=%d pOn=(%d) %s", (void*)this, (void*)b, i, prefetch, lPath());
m_block_map[i] = b;

if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
{
m_prefetchState = kHold;
cache()->DeRegisterPrefetchFile(this);
}
return b;
}

//------------------------------------------------------------------------------

int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
char* req_buf, long long req_off, long long req_size)
{
XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

const long long BS = m_cfi.GetBufferSize();

// XXX Use readv to load more at the same time.
Expand All @@ -385,16 +377,9 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,

overlap(*ii, BS, req_off, req_size, off, blk_off, size);

XrdCl::Status status = client.Read(*ii * BS + blk_off, size, req_buf + off, handler);
if (!status.IsOK())
{
clLog()->Error(XrdCl::AppMsg, "File::RequestBlocksDirect error %s\n", lPath());
XrdPosixMap::Result(status);
return -1; // AMT all reads should be canceled in this case
}
else {
clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect success %d %ld %s", *ii, size, lPath());
}
m_input.Read( *handler, req_buf + off, *ii * BS + blk_off, size);
clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect success %d %ld %s", *ii, size, lPath());

total += size;
}

Expand Down Expand Up @@ -837,13 +822,13 @@ void File::free_block(Block* b)

//------------------------------------------------------------------------------

void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
void File::ProcessBlockResponse(Block* b, int res)
{

m_downloadCond.Lock();

clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %p, %d %s",(void*)b,(int)(b->m_offset/BufferSize()), lPath());
if (status->IsOK())
if (res >= 0)
{
b->m_downloaded = true;
clLog()->Debug(XrdCl::AppMsg, "File::ProcessBlockResponse %d finished %d %s",(int)(b->m_offset/BufferSize()), b->is_finished(), lPath());
Expand All @@ -862,8 +847,8 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
// AMT how long to keep?
// when to retry?
clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %p %d error=%d, [%s] %s",(void*)b,(int)(b->m_offset/BufferSize()), status->code, status->GetErrorMessage().c_str(), lPath());
XrdPosixMap::Result(*status);
clLog()->Error(XrdCl::AppMsg, "File::ProcessBlockResponse block %p %d error=%d, %s",(void*)b,(int)(b->m_offset/BufferSize()), res, lPath());
// XrdPosixMap::Result(*status);
// AMT could notfiy global cache we dont need RAM for that block
b->set_error_and_free(errno);
errno = 0;
Expand Down Expand Up @@ -999,32 +984,26 @@ void File::UnMarkPrefetch()
//================== RESPONSE HANDLER ==================================
//==============================================================================

void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
void BlockResponseHandler::Done(int res)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()");

m_block->m_file->ProcessBlockResponse(m_block, status);
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::Done()");

delete status;
delete response;
m_block->m_file->ProcessBlockResponse(m_block, res);

delete this;
}

//------------------------------------------------------------------------------

void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
void DirectResponseHandler::Done(int res)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::HandleRespons()");
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::Done()");
XrdSysCondVarHelper _lck(m_cond);

--m_to_wait;

if ( ! status->IsOK())
if (res < 0)
{
XrdPosixMap::Result(*status);
m_errno = errno;
}

Expand Down
21 changes: 11 additions & 10 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -21,6 +21,9 @@
#include "XrdCl/XrdClXRootDResponses.hh"
#include "XrdCl/XrdClDefaultEnv.hh"

#include "XrdOuc/XrdOucCache2.hh"
#include "XrdOuc/XrdOucIOVec.hh"

#include "XrdFileCacheInfo.hh"
#include "XrdFileCacheStats.hh"

Expand Down Expand Up @@ -94,7 +97,7 @@ namespace XrdFileCache
private:
enum PrefetchState_e { kOn, kHold, kCanceled };

XrdOucCacheIO &m_input; //!< original data source
XrdOucCacheIO2 &m_input; //!< original data source
XrdOssDF *m_output; //!< file handle for data file on disk
XrdOssDF *m_infoFile; //!< file handle for data-info file on disk
Info m_cfi; //!< download status of file blocks and access statistics
Expand Down Expand Up @@ -141,7 +144,7 @@ namespace XrdFileCache
//------------------------------------------------------------------------
//! Constructor.
//------------------------------------------------------------------------
File(XrdOucCacheIO &io, std::string &path,
File(XrdOucCacheIO2 &io, std::string &path,
long long offset, long long fileSize);

//------------------------------------------------------------------------
Expand Down Expand Up @@ -174,7 +177,7 @@ namespace XrdFileCache
//----------------------------------------------------------------------
Stats& GetStats() { return m_stats; }

void ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status);
void ProcessBlockResponse(Block* b, int res);
void WriteBlockToDisk(Block* b);

void Prefetch();
Expand Down Expand Up @@ -205,7 +208,7 @@ namespace XrdFileCache
char* req_buf, long long req_off, long long req_size);

// VRead
bool VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blks_to_process, ReadVBlockListDisk& blks_on_disk, XrdCl::ChunkList& chunkVec);
bool VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blks_to_process, ReadVBlockListDisk& blks_on_disk, std::vector<XrdOucIOVec>& chunkVec);
int VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blks_on_disk);
int VReadProcessBlocks(const XrdOucIOVec *readV, int n, std::vector<ReadVChunkListRAM>& blks_to_process, std::vector<ReadVChunkListRAM>& blks_rocessed);

Expand All @@ -230,18 +233,17 @@ namespace XrdFileCache

// ================================================================

class BlockResponseHandler : public XrdCl::ResponseHandler
class BlockResponseHandler : public XrdOucCacheIOCB
{
public:
Block *m_block;

BlockResponseHandler(Block *b) : m_block(b) {}

void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response);
virtual void Done(int result);
};

class DirectResponseHandler : public XrdCl::ResponseHandler
class DirectResponseHandler : public XrdOucCacheIOCB
{
public:
XrdSysCondVar m_cond;
Expand All @@ -254,8 +256,7 @@ namespace XrdFileCache
bool is_ok() { XrdSysCondVarHelper _lck(m_cond); return m_to_wait == 0 && m_errno == 0; }
bool is_failed() { XrdSysCondVarHelper _lck(m_cond); return m_errno != 0; }

void HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response);
virtual void Done(int result);
};

}
Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -2,18 +2,18 @@
#define __XRDFILECACHE_CACHE_IO_HH__

#include "XrdFileCache.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdCl/XrdClDefaultEnv.hh"

namespace XrdFileCache
{
//----------------------------------------------------------------------------
//! Base cache-io class that implements XrdOucCacheIO abstract methods.
//----------------------------------------------------------------------------
class IO : public XrdOucCacheIO
class IO : public XrdOucCacheIO2
{
public:
IO (XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache) :
IO (XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache) :
m_io(io), m_statsGlobal(stats), m_cache(cache) {}

//! Original data source.
Expand All @@ -36,7 +36,7 @@ namespace XrdFileCache
protected:
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }

XrdOucCacheIO &m_io; //!< original data source
XrdOucCacheIO2 &m_io; //!< original data source
XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics
Cache &m_cache; //!< reference to Cache needed in detach
};
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -31,7 +31,7 @@ using namespace XrdFileCache;
//______________________________________________________________________________


IOEntireFile::IOEntireFile(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache & cache)
IOEntireFile::IOEntireFile(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache & cache)
: IO(io, stats, cache),
m_file(0)
{
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.hh
Expand Up @@ -43,7 +43,7 @@ namespace XrdFileCache
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
IOEntireFile(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache);
IOEntireFile(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache);

//------------------------------------------------------------------------
//! Destructor
Expand Down
4 changes: 2 additions & 2 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Expand Up @@ -33,7 +33,7 @@
using namespace XrdFileCache;

//______________________________________________________________________________
IOFileBlock::IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &statsGlobal, Cache & cache)
IOFileBlock::IOFileBlock(XrdOucCacheIO2 &io, XrdOucCacheStats &statsGlobal, Cache & cache)
: IO(io, statsGlobal, cache)
{
m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize;
Expand Down Expand Up @@ -83,7 +83,7 @@ void IOFileBlock::GetBlockSizeFromPath()
}

//______________________________________________________________________________
File* IOFileBlock::newBlockFile(long long off, int blocksize, XrdOucCacheIO* io)
File* IOFileBlock::newBlockFile(long long off, int blocksize, XrdOucCacheIO2* io)
{
XrdCl::URL url(io->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();
Expand Down
6 changes: 3 additions & 3 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Expand Up @@ -20,7 +20,7 @@
#include <map>
#include <string>

#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdSys/XrdSysPthread.hh"

#include "XrdFileCacheIO.hh"
Expand All @@ -41,7 +41,7 @@ namespace XrdFileCache
//------------------------------------------------------------------------
//! Constructor.
//------------------------------------------------------------------------
IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &stats, Cache &cache);
IOFileBlock(XrdOucCacheIO2 &io, XrdOucCacheStats &stats, Cache &cache);

//------------------------------------------------------------------------
//! Destructor.
Expand Down Expand Up @@ -70,7 +70,7 @@ namespace XrdFileCache
XrdSysMutex m_mutex; //!< map mutex

void GetBlockSizeFromPath();
File* newBlockFile(long long off, int blocksize, XrdOucCacheIO* io);
File* newBlockFile(long long off, int blocksize, XrdOucCacheIO2* io);
};
}

Expand Down

0 comments on commit d02b22c

Please sign in to comment.