Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pfc-V2 Implement deferred open for file block mode #383

Merged
merged 1 commit into from
Jun 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 23 additions & 24 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)
{
std::string curl(url);
XrdCl::URL xx(curl);
const std::string& spath = xx.GetPath();
std::string spath = xx.GetPath();
spath += ".cinfo";

struct stat buf;
int res = m_output_fs->Stat(spath.c_str(), &buf);
Expand All @@ -422,34 +423,32 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)

int Cache::Stat(const char *curl, struct stat &sbuff)
{
if (m_configuration.m_hdfsmode == false)
{
XrdCl::URL url(curl);
std::string name = url.GetPath();
XrdCl::URL url(curl);
std::string name = url.GetPath();
name += ".cinfo";

if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
name += ".cinfo";
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(m_trace, 0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
}
if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(m_trace, 0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
}

return 1;
}

Expand Down
3 changes: 0 additions & 3 deletions src/XrdFileCache/XrdFileCacheIO.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ namespace XrdFileCache
//! Original data source.
virtual XrdOucCacheIO *Base() { return m_io; }

//! Original data source URL.
virtual long long FSize() { return m_io->FSize(); }

//! Original data source URL.
virtual const char *Path() { return m_io->Path(); }

Expand Down
97 changes: 95 additions & 2 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <stdio.h>
#include <iostream>
#include <assert.h>
#include <fcntl.h>

#include "XrdFileCacheIOFileBlock.hh"
#include "XrdFileCache.hh"
Expand All @@ -30,15 +31,17 @@
#include "XrdSys/XrdSysError.hh"
#include "XrdSfs/XrdSfsInterface.hh"

#include "XrdOuc/XrdOucEnv.hh"

using namespace XrdFileCache;

//______________________________________________________________________________
IOFileBlock::IOFileBlock(XrdOucCacheIO2 *io, XrdOucCacheStats &statsGlobal, Cache & cache)
: IO(io, statsGlobal, cache)
: IO(io, statsGlobal, cache), m_localStat(0)
{
m_blocksize = Cache::GetInstance().RefConfiguration().m_hdfsbsize;
GetBlockSizeFromPath();
initLocalStat();
}

//______________________________________________________________________________
Expand Down Expand Up @@ -102,10 +105,100 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize)
file = new File(this, fname, off, blocksize);
Cache::GetInstance().AddActive(this, file);
}
else {
file->WakeUp();
}

return file;
}


//______________________________________________________________________________
int IOFileBlock::FStat(struct stat &sbuff)
{
// local stat is create in constructor. if file was on disk before
// attach that the only way stat was not successful is becuse there
// were info file read errors
if (!m_localStat) return -1;

memcpy(&sbuff, m_localStat, sizeof(struct stat));
return 0;
}

//______________________________________________________________________________
long long IOFileBlock::FSize()
{
if (!m_localStat) return -1;

return m_localStat->st_size;
}

//______________________________________________________________________________
int IOFileBlock::initLocalStat()
{
XrdCl::URL url(GetPath());
std::string path = url.GetPath();
path += ".cinfo";

int res = -1;
struct stat tmpStat;
XrdOucEnv myEnv;

// try to read from existing file
if (m_cache.GetOss()->Stat(path.c_str(), &tmpStat) == XrdOssOK) {
XrdOssDF* infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
if (infoFile->Open(path.c_str(), O_RDONLY, 0600, myEnv) == XrdOssOK) {
Info info(m_cache.GetTrace());
if (info.Read(infoFile) > 0) {
tmpStat.st_size = info.GetFileSize();
TRACEIO(Info, "IOFileBlock::initCachedStat successfuly read size from existing info file = " << tmpStat.st_size);
res = 0;
}
else {
// file exist but can't read it
TRACEIO(Error, "IOFileBlock::initCachedStat failed to read file size from info file");
}
}
}

// if there is no local info file, try to read from clinet and then save stat into a new *cinfo file
if (res) {
res = GetInput()->Fstat(tmpStat);
TRACEIO(Debug, "IOFileBlock::initCachedStat get stat from client res= " << res << "size = " << tmpStat.st_size);
if (res == 0) {
if (m_cache.GetOss()->Create(m_cache.RefConfiguration().m_username.c_str(), path.c_str(), 0600, myEnv, XRDOSS_mkpath) == XrdOssOK) {
XrdOssDF* infoFile = m_cache.GetOss()->newFile(m_cache.RefConfiguration().m_username.c_str());
if (infoFile->Open(path.c_str(), O_RDWR, 0600, myEnv) == XrdOssOK) {
Info cfi(m_cache.GetTrace(), false);
cfi.SetBufferSize(m_cache.RefConfiguration().m_bufferSize);
cfi.SetFileSize(tmpStat.st_size);
cfi.WriteHeader(infoFile);
infoFile->Fsync();
infoFile->Close();
}
else {
TRACEIO(Error, "IOFileBlock::initCachedStat can't open info file path");
}
delete infoFile;
}
else {
TRACEIO(Error, "IOFileBlock::initCachedStat can't create info file path");
}
}
}


if (res == 0)
{
std::cerr << "local stat created \n";
m_localStat = new struct stat;
memcpy(m_localStat, &tmpStat, sizeof(struct stat));
}

return res;
}


//______________________________________________________________________________
void IOFileBlock::RelinquishFile(File* f)
{
Expand Down Expand Up @@ -141,7 +234,7 @@ int IOFileBlock::Read (char *buff, long long off, int size)
{
// protect from reads over the file size

long long fileSize = GetInput()->FSize();
long long fileSize = FSize();

if (off >= fileSize)
return 0;
Expand Down
6 changes: 6 additions & 0 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,20 @@ namespace XrdFileCache
//! Called to check if destruction needs to be done in a separate task.
virtual bool ioActive();

virtual int FStat(struct stat &sbuff);

virtual long long FSize();

virtual void RelinquishFile(File*);

private:
long long m_blocksize; //!< size of file-block
std::map<int, File*> m_blocks; //!< map of created blocks
XrdSysMutex m_mutex; //!< map mutex
struct stat *m_localStat;

void GetBlockSizeFromPath();
int initLocalStat();
File* newBlockFile(long long off, int blocksize);
};
}
Expand Down