Skip to content

Commit

Permalink
Merge pull request #371 from alja/hdfs-healing
Browse files Browse the repository at this point in the history
pfc-V2: transition of hdfs healing code from V1 to V2
  • Loading branch information
abh3 committed May 24, 2016
2 parents 4bfa65b + 03d1dd6 commit 5370374
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 65 deletions.
90 changes: 51 additions & 39 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,24 @@ int Cache::isAttached()
void Cache::Detach(XrdOucCacheIO* io)
{
clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path());
std::map<std::string, DiskNetIO>::iterator it = m_active.begin();
while (it != m_active.end() )

// Cache owns File objects
XrdSysMutexHelper lock(&m_active_mutex);
std::vector<DiskNetIO>::iterator it = m_active.begin();
while ( it != m_active.end() )
{
if (it->second.io == io) {
m_active.erase(it++);
if (it->io == io) {
it->io->RelinquishFile(it->file);
delete it->file;
m_active.erase(it);
}
else {
else
++it;
}
}

delete io;
}

//______________________________________________________________________________
bool
Cache::HaveFreeWritingSlots()
Expand Down Expand Up @@ -276,26 +281,31 @@ Cache::RAMBlockReleased()
m_RAMblocks_used--;
}

void
Cache::AddActive(IO* io, File* file)
{
XrdSysMutexHelper lock(&m_active_mutex);
m_active.push_back(DiskNetIO(io, file));
}

//==============================================================================
//======================= File relinquish at process of dying ===================
//======================================================================
File* Cache::GetFileForLocalPath(std::string path, IO* io)
File* Cache::GetFileWithLocalPath(std::string path, IO* iIo)
{
typedef std::map<std::string, DiskNetIO> ActiveMap_t;
ActiveMap_t::iterator it = m_active.find(path);
if (it == m_active.end())
XrdSysMutexHelper lock(&m_active_mutex);
for ( std::vector<DiskNetIO>::iterator it = m_active.begin(); it != m_active.end(); ++it)
{
return 0;
}
else {
File* file = it->second.file;
it->second.io->RelinquishFile(file);
return file;
if (!strcmp(path.c_str(), it->file->lPath()))
{
it->io->RelinquishFile(it->file);
it->io = iIo;
return it->file;
}
}
return 0;
}



//==============================================================================
//======================= PREFETCH ===================================
//==============================================================================
Expand Down Expand Up @@ -403,32 +413,34 @@ Cache::Prepare(const char *url, int oflags, mode_t mode)

int Cache::Stat(const char *curl, struct stat &sbuff)
{
XrdCl::URL url(curl);
std::string name = url.GetPath();
if (m_configuration.m_hdfsmode == false)
{
XrdCl::URL url(curl);
std::string name = url.GetPath();

if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
name += ".cinfo";
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
if (m_output_fs->Stat(name.c_str(), &sbuff) == XrdOssOK) {
if ( S_ISDIR(sbuff.st_mode)) {
return 0;
}
else {
bool success = false;
XrdOssDF* infoFile = m_output_fs->newFile(m_configuration.m_username.c_str());
XrdOucEnv myEnv;
name += ".cinfo";
int res = infoFile->Open(name.c_str(), O_RDONLY, 0600, myEnv);
if (res >= 0) {
Info info(0);
if (info.Read(infoFile) > 0) {
sbuff.st_size = info.GetFileSize();
success = true;
}
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
infoFile->Close();
delete infoFile;
return success ? 0 : 1;
}
}

return 1;
}

Expand Down
8 changes: 6 additions & 2 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ namespace XrdFileCache

XrdSysError& GetSysError() { return m_log; }

File* GetFileForLocalPath(std::string, IO*);
File* GetFileWithLocalPath(std::string, IO* io);

void AddActive(IO*, File*);

private:
bool ConfigParameters(std::string, XrdOucStream&);
Expand Down Expand Up @@ -224,11 +226,13 @@ namespace XrdFileCache

struct DiskNetIO
{
DiskNetIO(IO* iIO, File* iFile): io(iIO), file(iFile){}
IO* io;
File* file;
};

std::map<std::string, DiskNetIO> m_active;
std::vector<DiskNetIO> m_active;
XrdSysMutex m_active_mutex;

// prefetching
typedef std::vector<File*> PrefetchList;
Expand Down
18 changes: 12 additions & 6 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,8 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,

overlap(*ii, BS, req_off, req_size, off, blk_off, size);

long long rs = m_output->Read(req_buf + off, *ii * BS + blk_off, size);
long long rs = m_output->Read(req_buf + off, *ii * BS + blk_off -m_offset, size);
clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk block %d size %d %s", *ii, size, lPath());


if (rs < 0) {
clLog()->Error(XrdCl::AppMsg, "File::ReadBlocksFromDisk neg retval %ld (%ld@%d) %s", rs, *ii * BS + blk_off, lPath());
Expand Down Expand Up @@ -464,7 +463,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
m_stats.m_BytesRam++; // AMT what if block fails
}
// On disk?
else if (m_cfi.TestBit(block_idx))
else if (m_cfi.TestBit(offsetIdx(block_idx)))
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p read from disk %d %s", (void*)iUserBuff, block_idx, lPath());
blks_on_disk.push_back(block_idx);
Expand Down Expand Up @@ -671,7 +670,7 @@ void File::WriteBlockToDisk(Block* b)
int retval = 0;
// write block buffer into disk file
long long offset = b->m_offset - m_offset;
long long size = (b->m_offset + m_cfi.GetBufferSize()) > m_fileSize ? (m_fileSize - b->m_offset) : m_cfi.GetBufferSize();
long long size = (offset + m_cfi.GetBufferSize()) > m_fileSize ? (m_fileSize - offset) : m_cfi.GetBufferSize();
int buffer_remaining = size;
int buffer_offset = 0;
int cnt = 0;
Expand Down Expand Up @@ -752,7 +751,7 @@ void File::Sync()
XrdSysMutexHelper _lck(&m_syncStatusMutex);
for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
{
m_cfi.SetBitWriteCalled(*i);
m_cfi.SetBitWriteCalled(offsetIdx(*i));
}
written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
m_writes_during_sync.clear();
Expand Down Expand Up @@ -861,6 +860,12 @@ const char* File::lPath() const
return m_temp_filename.c_str();
}

//______________________________________________________________________________
int File::offsetIdx(int iIdx)
{
return iIdx - m_offset/m_cfi.GetBufferSize();
}

//______________________________________________________________________________
void File::AppendIOStatToFileInfo()
{
Expand Down Expand Up @@ -896,6 +901,7 @@ void File::Prefetch()
// clLog()->Dump(XrdCl::AppMsg, "File::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
clLog()->Dump(XrdCl::AppMsg, "File::Prefetch take block %d %s", f, lPath());
Expand Down Expand Up @@ -939,7 +945,7 @@ void File::CheckPrefetchStatRAM(Block* b)
void File::CheckPrefetchStatDisk(int idx)
{
if (Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks) {
if (m_cfi.TestPrefetchBit(idx))
if (m_cfi.TestPrefetchBit(offsetIdx(idx)))
m_prefetchHitCnt++;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/XrdFileCache/XrdFileCacheFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ namespace XrdFileCache
float m_prefetchScore; //cached
int m_prefetchCurrentCnt;


public:
//------------------------------------------------------------------------
//! Constructor.
Expand Down Expand Up @@ -189,6 +190,7 @@ namespace XrdFileCache
//! Log path
const char* lPath() const;

std::string GetLocalPath();
private:
bool overlap(int blk, // block to query
long long blk_size, //
Expand Down Expand Up @@ -227,7 +229,8 @@ namespace XrdFileCache
void inc_ref_count(Block*);
void dec_ref_count(Block*);
void free_block(Block*);


int offsetIdx(int idx);
};


Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIO.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace XrdFileCache

virtual void Update(XrdOucCacheIO2 &iocp) { m_io = &iocp; }

virtual void RelinquishFile(File*) {}
virtual void RelinquishFile(File*) = 0;

protected:
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }
Expand Down
10 changes: 2 additions & 8 deletions src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ IOEntireFile::IOEntireFile(XrdOucCacheIO2 *io, XrdOucCacheStats &stats, Cache &
XrdCl::URL url(m_io->Path());
std::string fname = Cache::GetInstance().RefConfiguration().m_cache_dir + url.GetPath();

if (!Cache::GetInstance().GetFileForLocalPath(fname, this))
if (!(m_file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
{
struct stat st;
Fstat(st);
m_file = new File(io, fname, 0, st.st_size);
Cache::GetInstance().AddActive(this, m_file);
}
}



IOEntireFile::~IOEntireFile()
{

delete m_localStat;
}

Expand Down Expand Up @@ -119,12 +119,6 @@ XrdOucCacheIO *IOEntireFile::Detach()
{
XrdOucCacheIO * io = m_io;

if (m_file) {
m_statsGlobal.Add(m_file->GetStats());
delete m_file;
m_file = 0;
}

// This will delete us!
m_cache.Detach(this);
return io;
Expand Down
39 changes: 33 additions & 6 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ XrdOucCacheIO* IOFileBlock::Detach()
for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
m_statsGlobal.Add(it->second->GetStats());
delete it->second;
}

m_cache.Detach(this); // This will delete us!
Expand Down Expand Up @@ -97,21 +96,49 @@ File* IOFileBlock::newBlockFile(long long off, int blocksize)
fname = ss.str();

clLog()->Debug(XrdCl::AppMsg, "FileBlock::FileBlock(), create XrdFileCacheFile. %s", m_io->Path());
File* prefetch = new File(m_io, fname, off, blocksize);

File* file;
if (!(file = Cache::GetInstance().GetFileWithLocalPath(fname, this)))
{
file = new File(m_io, fname, off, m_io->FSize());
Cache::GetInstance().AddActive(this, file);
}

return file;
}

return prefetch;
//______________________________________________________________________________
void IOFileBlock::RelinquishFile(File* f)
{
// called from Cache::Detach() or Cache::GetFileWithLocalPath()
// the object is in process of dying

XrdSysMutexHelper lock(&m_mutex);
for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it)
{
if (it->second == f)
{
m_blocks.erase(it++);
break;
}
else
{
++it;
}
}
}

//______________________________________________________________________________
bool IOFileBlock::ioActive()
{
bool res = false;
XrdSysMutexHelper lock(&m_mutex);

for (std::map<int, File*>::iterator it = m_blocks.begin(); it != m_blocks.end(); ++it) {
if (it->second->InitiateClose())
res = true;
return true;
}

return res;
return false;
}

//______________________________________________________________________________
Expand Down
2 changes: 2 additions & 0 deletions src/XrdFileCache/XrdFileCacheIOFileBlock.hh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ namespace XrdFileCache
//! Called to check if destruction needs to be done in a separate task.
virtual bool ioActive();

virtual void RelinquishFile(File*);

private:
long long m_blocksize; //!< size of file-block
std::map<int, File*> m_blocks; //!< map of created blocks
Expand Down
4 changes: 2 additions & 2 deletions src/XrdFileCache/XrdFileCacheVRead.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bool File::VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& b
inc_ref_count(bi->second);
clLog()->Debug(XrdCl::AppMsg, "VReadPreProcess block %d in map", block_idx);
}
else if (m_cfi.TestBit(block_idx))
else if (m_cfi.TestBit(offsetIdx(block_idx)))
{
blocks_on_disk.AddEntry(block_idx, iov_idx);
clLog()->Debug(XrdCl::AppMsg, "VReadPreProcess block %d , chunk idx = %d on disk", block_idx,iov_idx );
Expand Down Expand Up @@ -229,7 +229,7 @@ int File::VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blo
clLog()->Debug(XrdCl::AppMsg, "VReadFromDisk block=%d chunk=%d", blockIdx, chunkIdx);
overlap(blockIdx, m_cfi.GetBufferSize(), readV[chunkIdx].offset, readV[chunkIdx].size, off, blk_off, size);

int rs = m_output->Read(readV[chunkIdx].data + off, blockIdx*m_cfi.GetBufferSize() + blk_off, size);
int rs = m_output->Read(readV[chunkIdx].data + off, blockIdx*m_cfi.GetBufferSize() + blk_off - m_offset, size);
if (rs >=0 ) {
bytes_read += rs;
}
Expand Down

0 comments on commit 5370374

Please sign in to comment.