Skip to content

Commit

Permalink
Implement write queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and abh3 committed Jun 30, 2016
1 parent f95eb91 commit dbcdaee
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 95 deletions.
29 changes: 12 additions & 17 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "XrdOuc/XrdOucEnv.hh"

#include "XrdFileCache.hh"
#include "XrdFileCacheFile.hh"
#include "XrdFileCacheIOEntireFile.hh"
#include "XrdFileCacheIOFileBlock.hh"
#include "XrdFileCacheFactory.hh"
Expand Down Expand Up @@ -115,30 +114,29 @@ Cache::HaveFreeWritingSlots()

//______________________________________________________________________________
void
Cache::AddWriteTask(File* p, int ri, size_t s, bool fromRead)
Cache::AddWriteTask(Block* b, bool fromRead)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::AddWriteTask() wqsize = %d, bi=%d", s_writeQ.size, ri);
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::AddWriteTask() bOff=%ld", b->m_offset);
s_writeQ.condVar.Lock();
if (fromRead)
s_writeQ.queue.push_back(WriteTask(p, ri, s));
s_writeQ.queue.push_back(b);
else
s_writeQ.queue.push_front(WriteTask(p, ri, s));
s_writeQ.queue.push_front(b); // AMT should this not be the opposite
s_writeQ.size++;
s_writeQ.condVar.Signal();
s_writeQ.condVar.UnLock();
}

//______________________________________________________________________________
void Cache::RemoveWriteQEntriesFor(File *p)
{/*
void Cache::RemoveWriteQEntriesFor(File *iFile)
{
s_writeQ.condVar.Lock();
std::list<WriteTask>::iterator i = s_writeQ.queue.begin();
std::list<Block*>::iterator i = s_writeQ.queue.begin();
while (i != s_writeQ.queue.end())
{
if (i->prefetch == p)
if ((*i)->m_file == iFile)
{
std::list<WriteTask>::iterator j = i++;
j->prefetch->DecRamBlockRefCount(j->ramBlockIdx);
std::list<Block*>::iterator j = i++;
s_writeQ.queue.erase(j);
--s_writeQ.size;
}
Expand All @@ -148,28 +146,25 @@ void Cache::RemoveWriteQEntriesFor(File *p)
}
}
s_writeQ.condVar.UnLock();
*/
}

//______________________________________________________________________________
void
Cache::ProcessWriteTasks()
{
/*
while (true)
{
s_writeQ.condVar.Lock();
while (s_writeQ.queue.empty())
{
s_writeQ.condVar.Wait();
}
WriteTask t = s_writeQ.queue.front();
Block* block = s_writeQ.queue.front(); // AMT should not be back ???
s_writeQ.queue.pop_front();
s_writeQ.size--;
s_writeQ.condVar.UnLock();

t.prefetch->WriteBlockToDisk(t.ramBlockIdx, t.size);
t.prefetch->DecRamBlockRefCount(t.ramBlockIdx);
block->m_file->WriteBlockToDisk(block);
}
*/
}

14 changes: 4 additions & 10 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdFileCacheFile.hh"

namespace XrdCl {
class Log;
Expand Down Expand Up @@ -67,7 +68,7 @@ namespace XrdFileCache
//---------------------------------------------------------------------
//! Add downloaded block in write queue.
//---------------------------------------------------------------------
static void AddWriteTask(File* p, int ramBlockidx, size_t size, bool fromRead);
static void AddWriteTask(Block* b, bool from_read);

//---------------------------------------------------------------------
//! Check write queue size is not over limit.
Expand All @@ -78,7 +79,7 @@ namespace XrdFileCache
//! \brief Remove blocks from write queue which belong to given prefetch.
//! This method is used at the time of File destruction.
//---------------------------------------------------------------------
static void RemoveWriteQEntriesFor(File *p);
static void RemoveWriteQEntriesFor(File *f);

//---------------------------------------------------------------------
//! Separate task which writes blocks from ram to disk.
Expand All @@ -99,20 +100,13 @@ namespace XrdFileCache
unsigned int m_attached; //!< number of attached IO objects
XrdOucCacheStats &m_stats; //!< global cache usage statistics

struct WriteTask
{
File* prefetch; //!< object queued for writing
int ramBlockIdx; //!< in memory cache index
size_t size; //!< write size -- block size except in case this is the end file block
WriteTask(File* p, int ri, size_t s):prefetch(p), ramBlockIdx(ri), size(s){}
};

struct WriteQ
{
WriteQ() : condVar(0), size(0) {}
XrdSysCondVar condVar; //!< write list condVar
size_t size; //!< cache size of a container
std::list<WriteTask> queue; //!< container
std::list<Block*> queue; //!< container
};

static WriteQ s_writeQ;
Expand Down
Loading

0 comments on commit dbcdaee

Please sign in to comment.