Skip to content

Commit

Permalink
Use same lock for RAM and disk download status. Add Prefetch function().
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent 9593eac commit 676ce7b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 24 deletions.
85 changes: 67 additions & 18 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -85,8 +85,7 @@ m_stateCond(0), // We will explicitly lock the condition before use.
m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
m_in_sync(false),

m_block_cond(0)
m_downloadCond(0)
{
clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path());
Open();
Expand Down Expand Up @@ -275,7 +274,7 @@ namespace

//------------------------------------------------------------------------------

Block* File::RequestBlock(int i)
Block* File::RequestBlock(int i, bool prefetch)
{
// Must be called w/ block_map locked.
// Checks on size etc should be done before.
Expand All @@ -292,7 +291,7 @@ Block* File::RequestBlock(int i)
long long off = i * BS;
long long this_bs = (i == last_block) ? m_input.FSize() - off : BS;

Block *b = new Block(this, off, this_bs); // should block be reused to avoid recreation
Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation
m_block_map[i] = b;

client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b));
Expand Down Expand Up @@ -380,7 +379,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// passing the req to client is actually better.
// unlock

m_block_cond.Lock();
m_downloadCond.Lock();

size_t msize = m_block_map.size();
// XXX Check for blocks to free? Later ...
Expand All @@ -393,7 +392,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
{
BlockMap_i bi = m_block_map.find(block_idx);
BlockMap_i bi = m_block_map.find(block_idx);

// In RAM or incoming?
if (bi != m_block_map.end())
Expand All @@ -415,7 +414,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Is there room for one more RAM Block?
if ( cache()->RequestRAMBlock())
{
Block *b = RequestBlock(block_idx);
Block *b = RequestBlock(block_idx, false);
inc_ref_count(b);
blks_to_process.push_back(b);
++msize;
Expand All @@ -428,7 +427,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}
}

m_block_cond.UnLock();
m_downloadCond.UnLock();

long long bytes_read = 0;

Expand Down Expand Up @@ -463,7 +462,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
BlockList_t finished;

{
XrdSysCondVarHelper _lck(m_block_cond);
XrdSysCondVarHelper _lck(m_downloadCond);

BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
Expand All @@ -487,7 +486,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

clLog()->Dump(XrdCl::AppMsg, "wait block begin");

m_block_cond.Wait();
m_downloadCond.Wait();

clLog()->Dump(XrdCl::AppMsg, "wait block end");

Expand Down Expand Up @@ -549,7 +548,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

// Last, stamp and release blocks, release file.
{
XrdSysCondVarHelper _lck(m_block_cond);
XrdSysCondVarHelper _lck(m_downloadCond);

// XXXX stamp file
// AMT ??? fetched status stampled in WriteDisk callback , what dies stamp mean ??
Expand Down Expand Up @@ -602,12 +601,14 @@ void File::WriteBlockToDisk(Block* b)
// set bit fetched
clLog()->Dump(XrdCl::AppMsg, "File::WriteToDisk() success set bit for block [%ld] size [%d] %s", b->m_offset, size, lPath());
int pfIdx = (b->m_offset - m_offset)/m_cfi.GetBufferSize();
m_downloadStatusMutex.Lock();

m_downloadCond.Lock();
m_cfi.SetBitFetched(pfIdx);
m_downloadStatusMutex.UnLock();
m_downloadCond.UnLock();


{
XrdSysCondVarHelper _lck(m_block_cond);
XrdSysCondVarHelper _lck(m_downloadCond);
dec_ref_count(b);
}

Expand Down Expand Up @@ -691,7 +692,7 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
clLog()->Dump(XrdCl::AppMsg, "File::ProcessBlockResponse %d ",(int)(b->m_offset/BufferSize()));

m_block_cond.Lock();
m_downloadCond.Lock();

if (status->IsOK())
{
Expand All @@ -715,9 +716,9 @@ void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
// inc_ref_count(b);
}

m_block_cond.Broadcast();
m_downloadCond.Broadcast();

m_block_cond.UnLock();
m_downloadCond.UnLock();
}


Expand All @@ -740,8 +741,56 @@ const char* File::lPath() const
return m_temp_filename.c_str();
}

// XXXX MT: is this needed ????
//______________________________________________________________________________
void File::AppendIOStatToFileInfo()
{
// lock in case several IOs want to write in *cinfo file
if (m_infoFile)
{
Info::AStat as;
as.DetachTime = time(0);
as.BytesDisk = m_stats.m_BytesDisk;
as.BytesRam = m_stats.m_BytesRam;
as.BytesMissed = m_stats.m_BytesMissed;
m_cfi.AppendIOStat(as, (XrdOssDF*)m_infoFile);
}
else
{
clLog()->Warning(XrdCl::AppMsg, "Prefetch::AppendIOStatToFileInfo() info file not opened %s", lPath());
}
}


//______________________________________________________________________________
void File::Prefetch()
{
int block_idx = -1;

XrdSysCondVarHelper _lck(m_downloadCond);
// AMT can this be sorted before calling Prefetch ??
if (m_cfi.IsComplete()) return;

// check index not on disk and not in RAM
for (int f = 0; f < m_cfi.GetSizeInBits(); ++f)
{
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(block_idx);
if (bi == m_block_map.end()) {
block_idx = f;
break;
}
}
}

assert(block_idx >= 0);

// decrease counter of globally available blocks, resources already checked in global thread
cache()->RequestRAMBlock();

Block *b = RequestBlock(block_idx, true);
inc_ref_count(b);
}


//==============================================================================
Expand Down
13 changes: 7 additions & 6 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -59,12 +59,13 @@ namespace XrdFileCache
std::vector<char> m_buff;
long long m_offset;
File *m_file;
bool m_prefetch;
int m_refcnt;
int m_errno;
bool m_downloaded;

Block(File *f, long long off, int size) :
m_offset(off), m_file(f), m_refcnt(0),
Block(File *f, long long off, int size, bool m_prefetch) :
m_offset(off), m_file(f), m_prefetch(false), m_refcnt(0),
m_errno(0), m_downloaded(false)
{
m_buff.resize(size);
Expand Down Expand Up @@ -100,8 +101,6 @@ namespace XrdFileCache

XrdSysCondVar m_stateCond; //!< state condition variable

XrdSysMutex m_downloadStatusMutex; //!< mutex locking access to m_cfi object

// fsync
XrdSysMutex m_syncStatusMutex; //!< mutex locking fsync status
XrdJob *m_syncer;
Expand All @@ -121,7 +120,7 @@ namespace XrdFileCache

BlockMap_t m_block_map;

XrdSysCondVar m_block_cond;
XrdSysCondVar m_downloadCond;

Stats m_stats; //!< cache statistics, used in IO detach

Expand Down Expand Up @@ -166,8 +165,10 @@ namespace XrdFileCache
void ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status);
void WriteBlockToDisk(Block* b);

void Prefetch();

private:
Block* RequestBlock(int i);
Block* RequestBlock(int i, bool prefetch);

int RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
char* buff, long long req_off, long long req_size);
Expand Down

0 comments on commit 676ce7b

Please sign in to comment.