Skip to content

Commit

Permalink
Propagate input change from IO to File. This problem solves warning X…
Browse files Browse the repository at this point in the history
…rdPosix: Unexpected PrepIO call.
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent dc19b05 commit ffaeaa3
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 33 deletions.
9 changes: 5 additions & 4 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -18,6 +18,7 @@


#include "XrdFileCacheFile.hh"
#include "XrdFileCacheIO.hh"
#include "XrdFileCacheTrace.hh"

#include <stdio.h>
Expand Down Expand Up @@ -71,8 +72,8 @@ namespace
Cache* cache() { return &Cache::GetInstance(); }
}

File::File(XrdOucCacheIO2 *inputIO, std::string& disk_file_path, long long iOffset, long long iFileSize) :
m_input(inputIO),
File::File(IO *io, std::string& disk_file_path, long long iOffset, long long iFileSize) :
m_io(io),
m_output(NULL),
m_infoFile(NULL),
m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
Expand Down Expand Up @@ -338,7 +339,7 @@ Block* File::RequestBlock(int i, bool prefetch)

TRACEF(Dump, "File::RequestBlock() " << i << "prefetch" << prefetch << "address " << (void*)b);
BlockResponseHandler* oucCB = new BlockResponseHandler(b);
m_input->Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs);
m_io->GetInput()->Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs);

m_block_map[i] = b;

Expand Down Expand Up @@ -370,7 +371,7 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,

overlap(*ii, BS, req_off, req_size, off, blk_off, size);

m_input->Read( *handler, req_buf + off, *ii * BS + blk_off, size);
m_io->GetInput()->Read( *handler, req_buf + off, *ii * BS + blk_off, size);
TRACEF(Dump, "RequestBlockDirect success, idx = " << *ii << " size = " << size);

total += size;
Expand Down
7 changes: 4 additions & 3 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -42,7 +42,8 @@ namespace XrdFileCache
{
class BlockResponseHandler;
class DirectResponseHandler;

class IO;

struct ReadVBlockListRAM;
struct ReadVChunkListRAM;
struct ReadVBlockListDisk;
Expand Down Expand Up @@ -97,7 +98,7 @@ namespace XrdFileCache
private:
enum PrefetchState_e { kOn, kHold, kStopped, kComplete };

XrdOucCacheIO2 *m_input; //!< original data source
IO *m_io; //!< original data source
XrdOssDF *m_output; //!< file handle for data file on disk
XrdOssDF *m_infoFile; //!< file handle for data-info file on disk
Info m_cfi; //!< download status of file blocks and access statistics
Expand Down Expand Up @@ -144,7 +145,7 @@ namespace XrdFileCache
//------------------------------------------------------------------------
//! Constructor.
//------------------------------------------------------------------------
File(XrdOucCacheIO2 *io, std::string &path,
File(IO *io, std::string &path,
long long offset, long long fileSize);

//------------------------------------------------------------------------
Expand Down
31 changes: 31 additions & 0 deletions src/XrdFileCache/XrdFileCacheIO.cc
@@ -0,0 +1,31 @@
#include "XrdFileCacheIO.hh"
#include "XrdSys/XrdSysAtomics.hh"
#include "XrdPosix/XrdPosixFile.hh"

using namespace XrdFileCache;

IO::IO(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache):
m_statsGlobal(stats), m_cache(cache), m_traceID("IO"), m_io(io)
{
m_path = m_io->Path();
}

void IO::Update(XrdOucCacheIO2 &iocp)
{
SetInput(&iocp);
}


void IO::SetInput(XrdOucCacheIO2* x)
{
updMutex.Lock();
m_io = x;
updMutex.UnLock();
}

XrdOucCacheIO2* IO::GetInput()
{
AtomicBeg(updMutex);
return m_io;
AtomicEnd(updMutex);
}
19 changes: 13 additions & 6 deletions src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -6,7 +6,7 @@ class XrdOucTrace;
#include "XrdFileCache.hh"
#include "XrdOuc/XrdOucCache2.hh"
#include "XrdCl/XrdClDefaultEnv.hh"

#include "XrdSys/XrdSysPthread.hh"

namespace XrdFileCache
{
Expand All @@ -16,8 +16,7 @@ namespace XrdFileCache
class IO : public XrdOucCacheIO2
{
public:
IO (XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache) :
m_io(io), m_statsGlobal(stats), m_cache(cache), m_traceID("IO"){}
IO (XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &cache);

//! Original data source.
virtual XrdOucCacheIO *Base() { return m_io; }
Expand All @@ -35,18 +34,26 @@ namespace XrdFileCache
virtual int Write(char *Buffer, long long Offset, int Length)
{ errno = ENOTSUP; return -1; }

virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; }
virtual void Update(XrdOucCacheIO2 &iocp);

virtual void RelinquishFile(File*) = 0;

XrdOucTrace* GetTrace() {return m_cache.GetTrace();}

XrdOucCacheIO2* GetInput();

protected:
XrdOucCacheIO2 *m_io; //!< original data source
protected:
XrdOucCacheStats &m_statsGlobal; //!< reference to Cache statistics
Cache &m_cache; //!< reference to Cache needed in detach

const char* m_traceID;
std::string m_path;
const char* GetPath() { return m_path.c_str(); }

private:
XrdOucCacheIO2 *m_io; //!< original data source
XrdSysRecMutex updMutex;
void SetInput(XrdOucCacheIO2*);
};
}

Expand Down
16 changes: 8 additions & 8 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -39,7 +39,7 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
m_file(0),
m_localStat(0)
{
XrdCl::URL url(m_io->Path());
XrdCl::URL url(GetInput()->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();

if ((m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
Expand All @@ -55,10 +55,11 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
if (res)
TRACEIO(Error, "IOEntireFile::IOEntireFile, could not get valid stat");

m_file = new File(io, fname, 0, st.st_size);
m_file = new File(this, fname, 0, st.st_size);
}

Cache::GetInstance().AddActive(this, m_file);
std::cout << " IOEntireFile::IOEntireFile " << this << std::endl;
}


Expand All @@ -70,7 +71,7 @@ IOEntireFile::~IOEntireFile()

int IOEntireFile::Fstat(struct stat &sbuff)
{
XrdCl::URL url(m_io->Path());
XrdCl::URL url(GetPath());
std::string name = url.GetPath();
name += ".cinfo";

Expand Down Expand Up @@ -108,9 +109,8 @@ int IOEntireFile::initCachedStat(const char* path)
if (m_cache.GetOss()->Stat(path, &tmpStat) == XrdOssOK) {
XrdOssDF* infoFile = m_cache.GetOss()->newFile(Cache::GetInstance().RefConfiguration().m_username.c_str());
XrdOucEnv myEnv;
if (infoFile->Open(path, O_RDONLY, 0600, myEnv) > 0) {
if (infoFile->Open(path, O_RDONLY, 0600, myEnv) == XrdOssOK) {
Info info(m_cache.GetTrace());
printf("reading info file ..\n");
if (info.Read(infoFile) > 0) {
tmpStat.st_size = info.GetFileSize();
TRACEIO(Info, "IOEntireFile::initCachedStat successfuly read size from info file = " << tmpStat.st_size);
Expand All @@ -126,7 +126,7 @@ int IOEntireFile::initCachedStat(const char* path)
}

if (res) {
res = m_io->Fstat(tmpStat);
res = GetInput()->Fstat(tmpStat);
TRACEIO(Debug, "IOEntireFile::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
}

Expand All @@ -148,7 +148,7 @@ bool IOEntireFile::ioActive()

XrdOucCacheIO *IOEntireFile::Detach()
{
XrdOucCacheIO * io = m_io;
XrdOucCacheIO * io = GetInput();

// This will delete us!
m_cache.Detach(this);
Expand Down Expand Up @@ -203,6 +203,6 @@ int IOEntireFile::Read (char *buff, long long off, int size)
*/
int IOEntireFile::ReadV (const XrdOucIOVec *readV, int n)
{
TRACE(Dump, "IO::ReadV(), get " << n << " requests, " << m_io->Path());
TRACEIO(Dump, "IO::ReadV(), get " << n << " requests" );
return m_file->ReadV(readV, n);
}
1 change: 0 additions & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.hh
Expand Up @@ -101,7 +101,6 @@ namespace XrdFileCache

virtual void RelinquishFile(File*);


private:
File* m_file;
struct stat *m_localStat;
Expand Down
21 changes: 12 additions & 9 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Expand Up @@ -45,7 +45,7 @@ IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cach
XrdOucCacheIO* IOFileBlock::Detach()
{
TRACEIO(Info, "IOFileBlock::Detach() " );
XrdOucCacheIO * io = m_io;
XrdOucCacheIO * io = GetInput();


for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
Expand All @@ -62,7 +62,7 @@ XrdOucCacheIO* IOFileBlock::Detach()
void IOFileBlock::GetBlockSizeFromPath()
{
const static std::string tag = "hdfsbsize=";
std::string path= m_io->Path();
std::string path= GetInput()->Path();
size_t pos1 = path.find(tag);
size_t t = tag.length();
if ( pos1 != path.npos)
Expand All @@ -85,7 +85,7 @@ void IOFileBlock::GetBlockSizeFromPath()
//______________________________________________________________________________
File* IOFileBlock::newBlockFile(long long off, int blocksize)
{
XrdCl::URL url(m_io->Path());
XrdCl::URL url(GetInput()->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();

std::stringstream ss;
Expand All @@ -101,7 +101,7 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize)
File* file;
if (!(file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
{
file = new File(m_io, fname, off, blocksize);
file = new File(this, fname, off, blocksize);
Cache::GetInstance().AddActive(this, file);
}

Expand Down Expand Up @@ -146,15 +146,18 @@ bool IOFileBlock::ioActive()
int IOFileBlock::Read (char *buff, long long off, int size)
{
// protect from reads over the file size
if (off >= m_io->FSize())

long long fileSize = GetInput()->FSize();

if (off >= fileSize)
return 0;
if (off < 0)
{
errno = EINVAL;
return -1;
}
if (off + size > m_io->FSize())
size = m_io->FSize() - off;
if (off + size > fileSize)
size = fileSize - off;

long long off0 = off;
int idx_first = off0/m_blocksize;
Expand All @@ -176,10 +179,10 @@ int IOFileBlock::Read (char *buff, long long off, int size)
{
size_t pbs = m_blocksize;
// check if this is last block
int lastIOFileBlock = (m_io->FSize()-1)/m_blocksize;
int lastIOFileBlock = (fileSize-1)/m_blocksize;
if (blockIdx == lastIOFileBlock )
{
pbs = m_io->FSize() - blockIdx*m_blocksize;
pbs = fileSize - blockIdx*m_blocksize;
// TRACEIO(Dump, "IOFileBlock::Read() last block, change output file size to " << pbs);
}

Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheTrace.hh
Expand Up @@ -33,7 +33,7 @@

#define TRACEIO(act, x) \
if (XRD_TRACE What >= TRACE_ ## act) \
{XRD_TRACE Beg(m_traceID); cerr << TRACE_STR_##act <<x << " " << m_io->Path(); XRD_TRACE End();}
{XRD_TRACE Beg(m_traceID); cerr << TRACE_STR_##act <<x << " " << GetPath(); XRD_TRACE End();}

#define TRACEF(act, x) \
if (XRD_TRACE What >= TRACE_ ## act) \
Expand Down
3 changes: 2 additions & 1 deletion src/XrdFileCache/XrdFileCacheVRead.cc
Expand Up @@ -4,6 +4,7 @@

#include "XrdFileCacheInfo.hh"
#include "XrdFileCacheStats.hh"
#include "XrdFileCacheIO.hh"

#include "XrdOss/XrdOss.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
Expand Down Expand Up @@ -89,7 +90,7 @@ int File::ReadV (const XrdOucIOVec *readV, int n)
direct_handler = new DirectResponseHandler(1);
// TODO check interface in the client file
// m_input.VectorRead(chunkVec, (void*) 0, direct_handler);
m_input->ReadV(*direct_handler, &chunkVec[0], chunkVec.size());
m_io->GetInput()->ReadV(*direct_handler, &chunkVec[0], chunkVec.size());
}
}

Expand Down

0 comments on commit ffaeaa3

Please sign in to comment.