Skip to content

Commit

Permalink
Implement prefetch.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent a908ec8 commit eeb4840
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 55 deletions.
118 changes: 103 additions & 15 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -18,11 +18,13 @@

#include <fcntl.h>
#include <sstream>
#include <algorithm>
#include <sys/statvfs.h>

#include "XrdCl/XrdClConstants.hh"
#include "XrdCl/XrdClURL.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysTimer.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdOuc/XrdOucEnv.hh"

Expand All @@ -33,32 +35,41 @@


using namespace XrdFileCache;

void *ProcessWriteTaskThread(void* c)
{
Cache *cache = static_cast<Cache*>(c);
cache->ProcessWriteTasks();
return NULL;
}

Cache::Cache(XrdOucCacheStats & stats)
: m_attached(0),
void *PrefetchThread(void* ptr)
{
Cache* cache = static_cast<Cache*>(ptr);
cache->Prefetch();
return NULL;
}
//______________________________________________________________________________


Cache::Cache(XrdOucCacheStats & stats) : XrdOucCache(),
m_stats(stats),
m_RAMblocks_used(0)
{
pthread_t tid;
XrdSysThread::Run(&tid, ProcessWriteTaskThread, (void*)this, 0, "XrdFileCache WriteTasks ");
pthread_t tid1;
XrdSysThread::Run(&tid1, ProcessWriteTaskThread, (void*)this, 0, "XrdFileCache WriteTasks ");

pthread_t tid2;
XrdSysThread::Run(&tid2, PrefetchThread, (void*)this, 0, "XrdFileCache Prefetch ");
}

//______________________________________________________________________________

XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
{
if (Factory::GetInstance().Decide(io))
{
clLog()->Info(XrdCl::AppMsg, "Cache::Attach() %s", io->Path());
{
XrdSysMutexHelper lock(&m_io_mutex);
m_attached++;
}
IO* cio;
if (Factory::GetInstance().RefConfiguration().m_hdfsmode)
cio = new IOFileBlock(*io, m_stats, *this);
Expand All @@ -78,17 +89,13 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)

int Cache::isAttached()
{
XrdSysMutexHelper lock(&m_io_mutex);
return m_attached;
// virutal function of XrdOucCache, don't see it used in pfc or posix layer
return true;
}

void Cache::Detach(XrdOucCacheIO* io)
{
clLog()->Info(XrdCl::AppMsg, "Cache::Detach() %s", io->Path());
{
XrdSysMutexHelper lock(&m_io_mutex);
m_attached--;
}

delete io;
}
Expand Down Expand Up @@ -167,6 +174,7 @@ Cache::ProcessWriteTasks()
block->m_file->WriteBlockToDisk(block);
}
}

//______________________________________________________________________________

bool
Expand All @@ -178,7 +186,6 @@ Cache::RequestRAMBlock()
m_RAMblocks_used++;
return true;
}

return false;
}

Expand All @@ -189,3 +196,84 @@ Cache::RAMBlockReleased()
m_RAMblocks_used--;
}


//==============================================================================
//======================= PREFETCH ===================================
//==============================================================================

namespace {
struct prefetch_less_than
{
inline bool operator() (const File* struct1, const File* struct2)
{
return (struct1->GetPrefetchScore() < struct2->GetPrefetchScore());
}
}myobject;
}
//______________________________________________________________________________

void
Cache::RegisterPrefetchFile(File* file)
{
// called from File::Open()

if (Factory::GetInstance().RefConfiguration().m_prefetch)
{
if (Factory::GetInstance().RefConfiguration().m_hdfsmode) {
XrdSysMutexHelper lock(&m_prefetch_mutex);
m_files.push_back(file);
}
else
{
// don't need to lock it becuse it File object is created in constructor of IOEntireFile
m_files.push_back(file);
}
}
}
//______________________________________________________________________________

void
Cache::DeRegisterPrefetchFile(File* file)
{
// called from last line File::InitiateClose()

XrdSysMutexHelper lock(&m_prefetch_mutex);
for (FileList::iterator it = m_files.begin(); it != m_files.end(); ++it) {
if (*it == file) {
m_files.erase(it);
break;
}
}
}
//______________________________________________________________________________

File*
Cache::GetNextFileToPrefetch()
{
XrdSysMutexHelper lock(&m_prefetch_mutex);
if (m_files.empty())
return 0;

std::sort(m_files.begin(), m_files.end(), myobject);
File* f = m_files.back();
f->MarkPrefetch();
return f;
}

//______________________________________________________________________________


void
Cache::Prefetch()
{
while (true) {
File* f = GetNextFileToPrefetch();
if (f) {
f->Prefetch();
}
else {
// wait for new file, AMT should I wait for the signal instead ???
XrdSysTimer::Wait(10);
}
}
}
13 changes: 11 additions & 2 deletions src/XrdFileCache/XrdFileCache.hh
Expand Up @@ -86,6 +86,13 @@ namespace XrdFileCache

void RAMBlockReleased();

void RegisterPrefetchFile(File*);
void DeRegisterPrefetchFile(File*);

File* GetNextFileToPrefetch();

void Prefetch();

private:
//! Decrease attached count. Called from IO::Detach().
void Detach(XrdOucCacheIO *);
Expand All @@ -96,8 +103,7 @@ namespace XrdFileCache
//! Short log alias.
XrdCl::Log* clLog() const { return XrdCl::DefaultEnv::GetLog(); }

XrdSysMutex m_io_mutex; //!< central lock for this class
unsigned int m_attached; //!< number of attached IO objects
XrdSysMutex m_prefetch_mutex; //!< central lock for this class
XrdOucCacheStats &m_stats; //!< global cache usage statistics

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
Expand All @@ -113,6 +119,9 @@ namespace XrdFileCache

WriteQ s_writeQ;

// prefetching
typedef std::vector<File*> FileList;
FileList m_files;
};

//----------------------------------------------------------------------------
Expand Down
101 changes: 64 additions & 37 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -87,7 +87,8 @@ m_non_flushed_cnt(0),
m_in_sync(false),
m_downloadCond(0),
m_prefetchReadCnt(0),
m_prefetchHitCnt(0)
m_prefetchHitCnt(0),
m_prefetchCurrentCnt(0)
{
clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path());
Open();
Expand All @@ -104,14 +105,17 @@ File::~File()
{
m_stateCond.Lock();
bool isStopped = m_stopping;
bool isPrefetching = (m_prefetchCurrentCnt > 0);
m_stateCond.UnLock();
if (isStopped)
if ((isPrefetching == false) && isStopped)
{
printf("~FILE map size %ld \n", m_block_map.size());
if ( m_block_map.empty())
m_downloadCond.Lock();
bool blockMapEmpty = m_block_map.empty();
m_downloadCond.UnLock();
if ( blockMapEmpty)
break;
}
XrdSysTimer::Wait(100);
XrdSysTimer::Wait(10);
}
clLog()->Debug(XrdCl::AppMsg, "File::~File finished with writing %s",lPath() );

Expand Down Expand Up @@ -159,9 +163,12 @@ bool File::InitiateClose()
{
// Retruns true if delay is needed
clLog()->Debug(XrdCl::AppMsg, "File::Initiate close start", lPath());

cache()->DeRegisterPrefetchFile(this);

m_stateCond.Lock();
m_stopping = true;
m_stateCond.UnLock();
m_stateCond.UnLock();
if (m_cfi.IsComplete()) return false; // AMT maybe map size is here more meaningfull, but might hold block state lock
return true;
}
Expand Down Expand Up @@ -231,6 +238,8 @@ bool File::Open()
clLog()->Debug(XrdCl::AppMsg, "Info file read from disk: %s", m_input.Path());
}


cache()->RegisterPrefetchFile(this);
return true;
}

Expand Down Expand Up @@ -770,33 +779,40 @@ void File::AppendIOStatToFileInfo()
//______________________________________________________________________________
void File::Prefetch()
{
int block_idx = -1;
bool stopping = false;
m_stateCond.Lock();
stopping = m_stopping;
m_stateCond.UnLock();


XrdSysCondVarHelper _lck(m_downloadCond);
// AMT can this be sorted before calling Prefetch ??
if (m_cfi.IsComplete()) return;
if (!stopping) {
XrdSysCondVarHelper _lck(m_downloadCond);
if (m_cfi.IsComplete() == false)
{
int block_idx = -1;
// check index not on disk and not in RAM
for (int f = 0; f < m_cfi.GetSizeInBits(); ++f)
{
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(block_idx);
if (bi == m_block_map.end()) {
block_idx = f;
break;
}
}
}

// check index not on disk and not in RAM
for (int f = 0; f < m_cfi.GetSizeInBits(); ++f)
{
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(block_idx);
if (bi == m_block_map.end()) {
block_idx = f;
break;
if (cache()->RequestRAMBlock()) {
m_prefetchReadCnt++;

Block *b = RequestBlock(block_idx, true);
inc_ref_count(b);
}
}
}

assert(block_idx >= 0);

// decrease counter of globally available blocks, resources already checked in global thread
cache()->RequestRAMBlock();
m_prefetchReadCnt++;

Block *b = RequestBlock(block_idx, true);
inc_ref_count(b);

UnMarkPrefetch();
}


Expand All @@ -819,16 +835,33 @@ void File::CheckPrefetchStatDisk(int idx)
}

//______________________________________________________________________________
float File::GetPrefetchScore()
float File::GetPrefetchScore() const
{
if (m_prefetchReadCnt)
return m_prefetchHitCnt/m_prefetchReadCnt;

return 0;
return 1; // AMT not sure if this should be 0.5 ... ????
}

//==============================================================================
//______________________________________________________________________________
void File::MarkPrefetch()
{
m_stateCond.Lock();
m_prefetchCurrentCnt++;
m_stateCond.UnLock();

}

//______________________________________________________________________________
void File::UnMarkPrefetch()
{
m_stateCond.Lock();
m_prefetchCurrentCnt--;
m_stateCond.UnLock();
}

//==============================================================================
//================== RESPONSE HANDLER ==================================
//==============================================================================

void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
Expand Down Expand Up @@ -865,9 +898,3 @@ void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
}



//==============================================================================

//==============================================================================

//==============================================================================

0 comments on commit eeb4840

Please sign in to comment.