Skip to content

Commit

Permalink
Merge branch 'pfc-hdfs-opt' of https://github.com/alja/xrootd
Browse files Browse the repository at this point in the history
  • Loading branch information
ljanyst committed Dec 8, 2014
2 parents 66c6a41 + f6757bb commit 1718162
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
m_attached++;
}
IO* cio;
if (Factory::GetInstance().RefConfiguration().m_prefetchFileBlocks)
if (Factory::GetInstance().RefConfiguration().m_hdfsmode)
cio = new IOFileBlock(*io, m_stats, *this);
else
cio = new IOEntireFile(*io, m_stats, *this);
Expand Down
12 changes: 6 additions & 6 deletions src/XrdFileCache/XrdFileCacheFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ bool Factory::Config(XrdSysLogger *logger, const char *config_filename, const ch
m_configuration.m_NRamBuffersRead, m_configuration.m_NRamBuffersPrefetch );


if (m_configuration.m_prefetchFileBlocks)
if (m_configuration.m_hdfsmode)
{
char buff2[512];
snprintf(buff2, sizeof(buff2), "\tpfc.filefragmentmode filefragmentsize %lld \n", m_configuration.m_fileFragmentSize);
snprintf(buff2, sizeof(buff2), "\tpfc.hdfsmode hdfsbsize %lld \n", m_configuration.m_hdfsbsize);
m_log.Emsg("", buff, buff2);
}
else {
Expand Down Expand Up @@ -326,17 +326,17 @@ bool Factory::ConfigParameters(std::string part, XrdOucStream& config )
{
m_configuration.m_NRamBuffersPrefetch = ::atoi(config.GetWord());
}
else if ( part == "filefragmentmode" )
else if ( part == "hdfsmode" )
{
m_configuration.m_prefetchFileBlocks = true;
m_configuration.m_hdfsmode = true;

const char* params = config.GetWord();
if (params) {
if (!strncmp("filefragmentsize", params, 9)) {
if (!strncmp("hdfsbsize", params, 9)) {
long long minBlSize = 128 * 1024;
long long maxBlSize = 1024 * 1024 * 1024;
params = config.GetWord();
if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", params, &m_configuration.m_fileFragmentSize, minBlSize, maxBlSize))
if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", params, &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
{
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions src/XrdFileCache/XrdFileCacheFactory.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ namespace XrdFileCache
struct Configuration
{
Configuration() :
m_prefetchFileBlocks(false),
m_hdfsmode(false),
m_cache_dir("/var/tmp/xrootd-file-cache"),
m_username("nobody"),
m_lwm(0.95),
m_hwm(0.9),
m_bufferSize(1024*1024),
m_NRamBuffersRead(8),
m_NRamBuffersPrefetch(1),
m_fileFragmentSize(128*1024*1024) {}
m_hdfsbsize(128*1024*1024) {}

bool m_prefetchFileBlocks; //!< flag for enabling block-level operation
bool m_hdfsmode; //!< flag for enabling block-level operation
std::string m_cache_dir; //!< path of disk cache
std::string m_username; //!< username passed to oss plugin

Expand All @@ -65,7 +65,7 @@ namespace XrdFileCache
long long m_bufferSize; //!< prefetch buffer size, default 1MB
int m_NRamBuffersRead; //!< number of read in-memory cache blocks
int m_NRamBuffersPrefetch; //!< number of prefetch in-memory cache blocks
long long m_fileFragmentSize; //!< used with m_prefetchFileBlocks, default 128MB
long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
};


Expand Down
51 changes: 38 additions & 13 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void *PrefetchRunnerBl(void * prefetch_void)
IOFileBlock::IOFileBlock(XrdOucCacheIO &io, XrdOucCacheStats &statsGlobal, Cache & cache)
: IO(io, statsGlobal, cache)
{
m_fileFragmentSize = Factory::GetInstance().RefConfiguration().m_fileFragmentSize;
m_blocksize = Factory::GetInstance().RefConfiguration().m_hdfsbsize;
GetBlockSizeFromPath();
}

//______________________________________________________________________________
Expand All @@ -65,6 +66,30 @@ XrdOucCacheIO* IOFileBlock::Detach()
return io;
}

//______________________________________________________________________________
void IOFileBlock::GetBlockSizeFromPath()
{
const static std::string tag = "hdfsbsize=";
std::string path= m_io.Path();
size_t pos1 = path.find(tag);
size_t t = tag.length();
if ( pos1 != path.npos)
{
pos1 += t;
size_t pos2 = path.find("&", pos1 );
if (pos2 != path.npos )
{
std::string bs = path.substr(pos1, pos2 - pos1);
m_blocksize = atoi(bs.c_str());
}
else {
m_blocksize = atoi(path.substr(pos1).c_str());
}

clLog()->Debug(XrdCl::AppMsg, "FileBlock::GetBlockSizeFromPath(), blocksize = %lld. %s", m_blocksize, m_io.Path());
}
}

//______________________________________________________________________________
Prefetch* IOFileBlock::newBlockPrefetcher(long long off, int blocksize, XrdOucCacheIO* io)
{
Expand All @@ -74,7 +99,7 @@ Prefetch* IOFileBlock::newBlockPrefetcher(long long off, int blocksize, XrdOucCa
ss << fname;
char offExt[64];
// filename like <origpath>___<size>_<offset>
sprintf(&offExt[0],"___%lld_%lld", m_fileFragmentSize, off );
sprintf(&offExt[0],"___%lld_%lld", m_blocksize, off );
ss << &offExt[0];
fname = ss.str();

Expand Down Expand Up @@ -102,8 +127,8 @@ bool IOFileBlock::ioActive()
int IOFileBlock::Read (char *buff, long long off, int size)
{
long long off0 = off;
int idx_first = off0/m_fileFragmentSize;
int idx_last = (off0+size-1)/m_fileFragmentSize;
int idx_first = off0/m_blocksize;
int idx_last = (off0+size-1)/m_blocksize;
int bytes_read = 0;
clLog()->Debug(XrdCl::AppMsg, "IOFileBlock::Read() %lld@%d block range [%d-%d] \n %s", off, size, idx_first, idx_last, m_io.Path());

Expand All @@ -119,16 +144,16 @@ int IOFileBlock::Read (char *buff, long long off, int size)
}
else
{
size_t pbs = m_fileFragmentSize;
size_t pbs = m_blocksize;
// check if this is last block
int lastIOFileBlock = (m_io.FSize()-1)/m_fileFragmentSize;
int lastIOFileBlock = (m_io.FSize()-1)/m_blocksize;
if (blockIdx == lastIOFileBlock )
{
pbs = m_io.FSize() - blockIdx*m_fileFragmentSize;
pbs = m_io.FSize() - blockIdx*m_blocksize;
clLog()->Debug(XrdCl::AppMsg, "IOFileBlock::Read() last block, change output file size to %lld \n %s", pbs, m_io.Path());
}

fb = newBlockPrefetcher(blockIdx*m_fileFragmentSize, pbs, &m_io);
fb = newBlockPrefetcher(blockIdx*m_blocksize, pbs, &m_io);
m_blocks.insert(std::pair<int,Prefetch*>(blockIdx, (Prefetch*) fb));
}
m_mutex.UnLock();
Expand All @@ -139,26 +164,26 @@ int IOFileBlock::Read (char *buff, long long off, int size)
{
if (blockIdx == idx_first)
{
readBlockSize = (blockIdx + 1) *m_fileFragmentSize - off0;
readBlockSize = (blockIdx + 1) *m_blocksize - off0;
clLog()->Debug(XrdCl::AppMsg, "Read partially till the end of the block %s", m_io.Path());
}
else if (blockIdx == idx_last)
{
readBlockSize = (off0+size) - blockIdx*m_fileFragmentSize;
readBlockSize = (off0+size) - blockIdx*m_blocksize;
clLog()->Debug(XrdCl::AppMsg, "Read partially from beginning of block %s", m_io.Path());
}
else
{
readBlockSize = m_fileFragmentSize;
readBlockSize = m_blocksize;
}
}
assert(readBlockSize);

clLog()->Info(XrdCl::AppMsg, "IOFileBlock::Read() block[%d] read-block-size[%d], offset[%lld] %s", blockIdx, readBlockSize, off, m_io.Path());

long long min = blockIdx*m_fileFragmentSize;
long long min = blockIdx*m_blocksize;
if ( off < min) { assert(0); }
assert(off+readBlockSize <= (min + m_fileFragmentSize));
assert(off+readBlockSize <= (min + m_blocksize));
int retvalBlock = fb->Read(buff, off, readBlockSize);

clLog()->Debug(XrdCl::AppMsg, "IOFileBlock::Read() Block read returned %d %s", retvalBlock , m_io.Path());
Expand Down
3 changes: 2 additions & 1 deletion src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ namespace XrdFileCache
virtual bool ioActive();

private:
long long m_fileFragmentSize; //!< size of file-block
long long m_blocksize; //!< size of file-block
std::map<int, Prefetch*> m_blocks; //!< map of created blocks
XrdSysMutex m_mutex; //!< map mutex

void GetBlockSizeFromPath();
Prefetch* newBlockPrefetcher(long long off, int blocksize, XrdOucCacheIO* io);
};
}
Expand Down

0 comments on commit 1718162

Please sign in to comment.