Skip to content

Commit

Permalink
Review open issues, make it compile.
Browse files Browse the repository at this point in the history
  • Loading branch information
osschar committed Mar 9, 2016
1 parent 673e7e5 commit f884835
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 134 deletions.
65 changes: 32 additions & 33 deletions src/XrdFileCache/XrdFileCacheFactory.cc
Expand Up @@ -413,52 +413,51 @@ bool Factory::ConfigParameters(std::string part, XrdOucStream& config )
}

//______________________________________________________________________________
//namespace {

class FPurgeState
namespace
{
public:
struct FS
class FPurgeState
{
std::string path;
long long nByte;
public:
struct FS
{
std::string path;
long long nByte;

FS(const char* p, long long n) : path(p), nByte(n) {}
};
FS(const char* p, long long n) : path(p), nByte(n) {}
};

typedef std::multimap<time_t, FS> map_t;
typedef map_t::iterator map_i;
typedef std::multimap<time_t, FS> map_t;
typedef map_t::iterator map_i;

FPurgeState(long long iNByteReq) : nByteReq(iNByteReq), nByteAccum(0) {}
FPurgeState(long long iNByteReq) : nByteReq(iNByteReq), nByteAccum(0) {}

map_t fmap;
map_t fmap;

void checkFile (time_t iTime, const char* iPath, long long iNByte)
{
if (nByteAccum < nByteReq || iTime < fmap.rbegin()->first)
void checkFile (time_t iTime, const char* iPath, long long iNByte)
{
fmap.insert(std::pair<const time_t, FS> (iTime, FS(iPath, iNByte)));
nByteAccum += iNByte;

// remove newest files from map if necessary
while (nByteAccum > nByteReq)
if (nByteAccum < nByteReq || iTime < fmap.rbegin()->first)
{
time_t nt = fmap.begin()->first;
std::pair<map_i, map_i> ret = fmap.equal_range(nt);
for (map_i it2 = ret.first; it2 != ret.second; ++it2)
nByteAccum -= it2->second.nByte;
fmap.erase(ret.first, ret.second);
fmap.insert(std::pair<const time_t, FS> (iTime, FS(iPath, iNByte)));
nByteAccum += iNByte;

// remove newest files from map if necessary
while (nByteAccum > nByteReq)
{
time_t nt = fmap.begin()->first;
std::pair<map_i, map_i> ret = fmap.equal_range(nt);
for (map_i it2 = ret.first; it2 != ret.second; ++it2)
nByteAccum -= it2->second.nByte;
fmap.erase(ret.first, ret.second);
}
}
}
}

private:
long long nByteReq;
long long nByteAccum;
};


//}
private:
long long nByteReq;
long long nByteAccum;
};
}

void FillFileMapRecurse( XrdOssDF* iOssDF, const std::string& path, FPurgeState& purgeState)
{
Expand Down
123 changes: 56 additions & 67 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -76,7 +76,7 @@ File::File(XrdOucCacheIO &inputIO, std::string& disk_file_path, long long iOffse
m_input(inputIO),
m_output(NULL),
m_infoFile(NULL),

m_cfi(Factory::GetInstance().RefConfiguration().m_bufferSize),
m_temp_filename(disk_file_path),
m_offset(iOffset),
m_fileSize(iFileSize),
Expand Down Expand Up @@ -178,11 +178,7 @@ File::~File()
Sync();
}
// write statistics in *cinfo file

// AMT append IO stat --- look new interface in master branch
// XXXX MT -- OK, what needs to be here?
AppendIOStatToFileInfo();
// XXXX MT END

clLog()->Info(XrdCl::AppMsg, "File::~File close data file %p",(void*)this , lPath());
if (m_output)
Expand Down Expand Up @@ -485,7 +481,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
{
clLog()->Dump(XrdCl::AppMsg, "--- File::Read() idx %d %s \n", block_idx, lPath());
clLog()->Dump(XrdCl::AppMsg, "--- File::Read() idx %d %s \n", block_idx, lPath());
BlockMap_i bi = m_block_map.find(block_idx);

// In RAM or incoming?
Expand Down Expand Up @@ -516,8 +512,8 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
Block *b = RequestBlock(block_idx, false);
// assert(b);
if (!b) {
preProcOK = false;
break;
preProcOK = false;
break;
}
inc_ref_count(b);
blks_to_process.push_back(b);
Expand All @@ -534,19 +530,14 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

}


m_downloadCond.UnLock();




if (!preProcOK) {
if (!preProcOK) {
for (BlockList_i i = blks_to_process.begin(); i!= blks_to_process.end(); ++i )
dec_ref_count(*i);
return -1; // AMT ???
}


long long bytes_read = 0;

// First, send out any direct requests.
Expand All @@ -571,78 +562,78 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

// Second, read blocks from disk.
if ((!blks_on_disk.empty()) && (bytes_read >= 0)) {
int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize);
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from disk %d. %s", (void*)iUserBuff, rc, lPath());
if (rc >= 0)
{
bytes_read += rc;
}
else
{
bytes_read = rc;
clLog()->Error(XrdCl::AppMsg, "File::Read() failed to read from disk. %s", lPath());
// AMT commented line below should not be an immediate return, can have block refcount increased and map increased
// return rc;
}
int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize);
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from disk %d. %s", (void*)iUserBuff, rc, lPath());
if (rc >= 0)
{
bytes_read += rc;
}
else
{
bytes_read = rc;
clLog()->Error(XrdCl::AppMsg, "File::Read() failed to read from disk. %s", lPath());
// AMT commented line below should not be an immediate return, can have block refcount increased and map increased
// return rc;
}
}

// Third, loop over blocks that are available or incoming
while ( (! blks_to_process.empty()) && (bytes_read >= 0))
{
BlockList_t finished;
BlockList_t finished;

{
XrdSysCondVarHelper _lck(m_downloadCond);
{
XrdSysCondVarHelper _lck(m_downloadCond);

BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
// clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for block %p finished", (void*)(*bi));
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block %p %s", (void*)(*bi), lPath());
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
}
else
{
++bi;
}
}
BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
// clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for block %p finished", (void*)(*bi));
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block %p %s", (void*)(*bi), lPath());
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
}
else
{
++bi;
}
}

if (finished.empty())
{
if (finished.empty())
{

clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin %s", lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin %s", lPath());

m_downloadCond.Wait();
m_downloadCond.Wait();

clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end %s", lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end %s", lPath());

continue;
}
}
continue;
}
}

clLog()->Dump(XrdCl::AppMsg, "File::Read() bytes read before processing blocks %d %s\n", bytes_read, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() bytes read before processing blocks %d %s\n", bytes_read, lPath());

BlockList_i bi = finished.begin();
while (bi != finished.end())
{
if ((*bi)->is_ok())
{
long long user_off; // offset in user buffer
long long off_in_block; // offset in block
long long size_to_copy; // size to copy
long long user_off; // offset in user buffer
long long off_in_block; // offset in block
long long size_to_copy; // size to copy

// clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);
// clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);

clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from finished block %d , size %d end %s", (void*)iUserBuff, (*bi)->m_offset/BS, size_to_copy, lPath());
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from finished block %d , size %d end %s", (void*)iUserBuff, (*bi)->m_offset/BS, size_to_copy, lPath());
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;

CheckPrefetchStatRAM(*bi);
CheckPrefetchStatRAM(*bi);
}
else // it has failed ... krap up.
{
Expand Down Expand Up @@ -683,7 +674,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

delete direct_handler;
}
clLog()->Debug(XrdCl::AppMsg, "File::Read() before assert %s.", lPath());
clLog()->Debug(XrdCl::AppMsg, "File::Read() before assert %s.", lPath());
assert(iUserSize >= bytes_read);

// Last, stamp and release blocks, release file.
Expand Down Expand Up @@ -747,7 +738,6 @@ void File::WriteBlockToDisk(Block* b)
m_cfi.SetBitFetched(pfIdx);
m_downloadCond.UnLock();


{
XrdSysCondVarHelper _lck(m_downloadCond);
// clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() dec_ref_count %d %s", pfIdx, lPath());
Expand Down Expand Up @@ -901,7 +891,6 @@ const char* File::lPath() const
return m_temp_filename.c_str();
}

// XXXX MT: is this needed ????
//______________________________________________________________________________
void File::AppendIOStatToFileInfo()
{
Expand Down
30 changes: 13 additions & 17 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -138,7 +138,6 @@ namespace XrdFileCache
int m_prefetchCurrentCnt;

public:

//------------------------------------------------------------------------
//! Constructor.
//------------------------------------------------------------------------
Expand Down Expand Up @@ -175,7 +174,6 @@ namespace XrdFileCache
//----------------------------------------------------------------------
Stats& GetStats() { return m_stats; }


void ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status);
void WriteBlockToDisk(Block* b);

Expand All @@ -185,20 +183,18 @@ namespace XrdFileCache

void MarkPrefetch();



//! Log path
const char* lPath() const;

private:

bool overlap(int blk, // block to query
long long blk_size, //
long long req_off, // offset of user request
int req_size, // size of user request
// output:
long long &off, // offset in user buffer
long long &blk_off, // offset in block
long long &size);
long long blk_size, //
long long req_off, // offset of user request
int req_size, // size of user request
// output:
long long &off, // offset in user buffer
long long &blk_off, // offset in block
long long &size);
// Read
Block* RequestBlock(int i, bool prefetch);

Expand All @@ -210,11 +206,12 @@ namespace XrdFileCache

// VRead
bool VReadPreProcess(const XrdOucIOVec *readV, int n, ReadVBlockListRAM& blks_to_process, ReadVBlockListDisk& blks_on_disk, XrdCl::ChunkList& chunkVec);
int VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blks_on_disk);
int VReadProcessBlocks(const XrdOucIOVec *readV, int n, std::vector<ReadVChunkListRAM>& blks_to_process, std::vector<ReadVChunkListRAM>& blks_rocessed);
int VReadFromDisk(const XrdOucIOVec *readV, int n, ReadVBlockListDisk& blks_on_disk);
int VReadProcessBlocks(const XrdOucIOVec *readV, int n, std::vector<ReadVChunkListRAM>& blks_to_process, std::vector<ReadVChunkListRAM>& blks_rocessed);



long long BufferSize();
long long BufferSize();
void AppendIOStatToFileInfo();

void CheckPrefetchStatRAM(Block* b);
void CheckPrefetchStatDisk(int idx);
Expand All @@ -224,7 +221,6 @@ namespace XrdFileCache
//! Short log alias.
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }


void inc_ref_count(Block*);
void dec_ref_count(Block*);
void free_block(Block*);
Expand Down

0 comments on commit f884835

Please sign in to comment.