Skip to content

Commit

Permalink
Merge pull request #383 from alja/defer-hdfs
Browse files Browse the repository at this point in the history
pfc-V2 Implement deferred open for file block mode
  • Loading branch information
abh3 committed Jun 21, 2016
2 parents eebcab6 + 07f9041 commit 647ae2c
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 29 deletions.
47 changes: 23 additions & 24 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -399,7 +399,8 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)
{
std::string curl(url);
XrdCl::URL xx(curl);
const std::string& spath = xx.GetPath();
std::string spath = xx.GetPath();
spath += ".cinfo";

struct stat buf;
int res = m_output_fs->Stat(spath.c_str(), &buf);
Expand All @@ -422,34 +423,32 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)

int Cache::Stat(const char *curl, struct stat &sbuff)
{
if (m_configuration.m_hdfsmode == false)
{
XrdCl::URL url(curl);
std::string name = url.GetPath();
XrdCl::URL url(curl);
std::string name = url.GetPath();
name += ".cinfo";

if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
name += ".cinfo";
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(m_trace, 0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
}
if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(m_trace, 0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
}

return 1;
}

Expand Down
3 changes: 0 additions & 3 deletions src/XrdFileCache/XrdFileCacheIO.hh
Expand Up @@ -21,9 +21,6 @@ namespace XrdFileCache
//! Original data source.
virtual XrdOucCacheIO *Base() { return m_io; }

//! Original data source URL.
virtual long long FSize() { return m_io->FSize(); }

//! Original data source URL.
virtual const char *Path() { return m_io->Path(); }

Expand Down
97 changes: 95 additions & 2 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Expand Up @@ -21,6 +21,7 @@
#include <stdio.h>
#include <iostream>
#include <assert.h>
#include <fcntl.h>

#include "XrdFileCacheIOFileBlock.hh"
#include "XrdFileCache.hh"
Expand All @@ -30,15 +31,17 @@
#include "XrdSys/XrdSysError.hh"
#include "XrdSfs/XrdSfsInterface.hh"

#include "XrdOuc/XrdOucEnv.hh"

using namespace XrdFileCache;

//______________________________________________________________________________
IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cache & cache)
: IO(io, statsGlobal, cache)
: IO(io, statsGlobal, cache), m_localStat(0)
{
m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize;
GetBlockSizeFromPath();
initLocalStat();
}

//______________________________________________________________________________
Expand Down Expand Up @@ -102,10 +105,100 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize)
file = new File(this, fname, off, blocksize);
Cache::GetInstance().AddActive(this, file);
}
else {
file->WakeUp();
}

return file;
}


//______________________________________________________________________________
int IOFileBlock::FStat(struct stat &sbuff)
{
// local stat is create in constructor. if file was on disk before
// attach that the only way stat was not successful is becuse there
// were info file read errors
if (!m_localStat) return -1;

memcpy(&sbuff, m_localStat, sizeof(struct stat));
return 0;
}

//______________________________________________________________________________
long long IOFileBlock::FSize()
{
if (!m_localStat) return -1;

return m_localStat->st_size;
}

//______________________________________________________________________________
int IOFileBlock::initLocalStat()
{
XrdCl::URL url(GetPath());
std::string path = url.GetPath();
path += ".cinfo";

int res = -1;
struct stat tmpStat;
XrdOucEnv myEnv;

// try to read from existing file
if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK) {
XrdOssDF* infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
if (infoFile->Open(path.c_str(), O_RDONLY, 0600, myEnv) == XrdOssOK) {
Info info(m_cache.GetTrace());
if (info.Read(infoFile) > 0) {
tmpStat.st_size = info.GetFileSize();
TRACEIO(Info, "IOFileBlock::initCachedStat successfuly read size from existing info file = " << tmpStat.st_size);
res = 0;
}
else {
// file exist but can't read it
TRACEIO(Error, "IOFileBlock::initCachedStat failed to read file size from info file");
}
}
}

// if there is no local info file, try to read from clinet and then save stat into a new *cinfo file
if (res) {
res = GetInput()->Fstat(tmpStat);
TRACEIO(Debug, "IOFileBlock::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
if (res == 0) {
if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK) {
XrdOssDF* infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
if (infoFile->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) {
Info cfi(m_cache.GetTrace(), false);
cfi.SetBufferSize(m_cache.RefConfiguration().m_bufferSize);
cfi.SetFileSize(tmpStat.st_size);
cfi.WriteHeader(infoFile);
infoFile->Fsync();
infoFile->Close();
}
else {
TRACEIO(Error, "IOFileBlock::initCachedStat can't open info file path");
}
delete infoFile;
}
else {
TRACEIO(Error, "IOFileBlock::initCachedStat can't create info file path");
}
}
}


if (res == 0)
{
std::cerr << "local stat created \n";
m_localStat = new struct stat;
memcpy(m_localStat, &tmpStat, sizeof(struct stat));
}

return res;
}


//______________________________________________________________________________
void IOFileBlock::RelinquishFile(File* f)
{
Expand Down Expand Up @@ -141,7 +234,7 @@ int IOFileBlock::Read (char *buff, long long off, int size)
{
// protect from reads over the file size

long long fileSize = GetInput()->FSize();
long long fileSize = FSize();

if (off >= fileSize)
return 0;
Expand Down
6 changes: 6 additions & 0 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Expand Up @@ -64,14 +64,20 @@ namespace XrdFileCache
//! Called to check if destruction needs to be done in a separate task.
virtual bool ioActive();

virtual int FStat(struct stat &sbuff);

virtual long long FSize();

virtual void RelinquishFile(File*);

private:
long long m_blocksize; //!< size of file-block
std::map<int, File*> m_blocks; //!< map of created blocks
XrdSysMutex m_mutex; //!< map mutex
struct stat *m_localStat;

void GetBlockSizeFromPath();
int initLocalStat();
File* newBlockFile(long long off, int blocksize);
};
}
Expand Down

0 comments on commit 647ae2c

Please sign in to comment.