Skip to content

Commit

Permalink
Merge pull request #398 from osschar/pssasync-mt-3
Browse files Browse the repository at this point in the history
Pssasync mt 3
  • Loading branch information
abh3 committed Jul 25, 2016
2 parents 4e299ed + b8c70c9 commit 7979386
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 42 deletions.
91 changes: 58 additions & 33 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -219,6 +219,8 @@ bool File::Open()
XrdOucEnv myEnv;

// Create the data file itself.
char size_str[16]; sprintf(size_str, "%lld", m_fileSize);
myEnv.Put("oss.asize", size_str);
if (myOss.Create(myUser, m_temp_filename.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
{
TRACEF(Error, "File::Open() Create failed for data file " << m_temp_filename
Expand All @@ -241,6 +243,7 @@ bool File::Open()
struct stat infoStat;
bool fileExisted = (myOss.Stat(ifn.c_str(), &infoStat) == XrdOssOK);

myEnv.Put("oss.asize", "64k"); // MT-XXX Calculate? Do not know length of access lists ...
if (myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath) != XrdOssOK)
{
TRACEF(Error, "File::Open() Create failed for info file " << ifn
Expand Down Expand Up @@ -320,7 +323,7 @@ bool File::overlap(int blk, // block to query

//------------------------------------------------------------------------------

Block* File::RequestBlock(int i, bool prefetch)
Block* File::PrepareBlockRequest(int i, bool prefetch)
{
// Must be called w/ block_map locked.
// Checks on size etc should be done before.
Expand All @@ -336,20 +339,32 @@ Block* File::RequestBlock(int i, bool prefetch)

Block *b = new Block(this, off, this_bs, prefetch); // should block be reused to avoid recreation

TRACEF(Dump, "File::RequestBlock() " << i << "prefetch" << prefetch << "address " << (void*)b);
BlockResponseHandler* oucCB = new BlockResponseHandler(b);
m_io->GetInput()->Read(*oucCB, (char*)b->get_buff(), off, (int)this_bs);

m_block_map[i] = b;

// Actual Read request is issued in ProcessBlockRequests().
TRACEF(Dump, "File::PrepareBlockRequest() " << i << "prefetch" << prefetch << "address " << (void*)b);

if (m_prefetchState == kOn && m_block_map.size() > Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
{
m_prefetchState = kHold;
cache()->DeRegisterPrefetchFile(this);
}

return b;
}

void File::ProcessBlockRequests(BlockList_t& blks)
{
// This *must not* be called with block_map locked.

for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
{
Block *b = *bi;
BlockResponseHandler* oucCB = new BlockResponseHandler(b);
m_io->GetInput()->Read(*oucCB, b->get_buff(), b->get_offset(), b->get_size());
}
}

//------------------------------------------------------------------------------

int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
Expand Down Expand Up @@ -445,16 +460,18 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// passing the req to client is actually better.
// unlock

bool preProcOK = true;
BlockList_t blks;
bool preProcOK = true;

m_downloadCond.Lock();

// XXX Check for blocks to free? Later ...

const int idx_first = iUserOff / BS;
const int idx_last = (iUserOff + iUserSize - 1) / BS;

BlockList_t blks_to_process, blks_processed;
IntList_t blks_on_disk, blks_direct;
BlockList_t blks_to_request, blks_to_process, blks_processed;
IntList_t blks_on_disk, blks_direct;

for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
{
Expand Down Expand Up @@ -484,14 +501,16 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
if (cache()->RequestRAMBlock())
{
TRACEF(Dump, "File::Read() inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
Block *b = RequestBlock(block_idx, false);
// assert(b);
if (!b) {
Block *b = PrepareBlockRequest(block_idx, false);
// MT XXX this can not fail (other than out of memory which we don't handle).
if ( ! b)
{
preProcOK = false;
break;
}
inc_ref_count(b);
blks_to_process.push_back(b);
blks_to_request.push_back(b);
}
// Nope ... read this directly without caching.
else
Expand All @@ -500,18 +519,19 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
blks_direct.push_back(block_idx);
}
}

}

m_downloadCond.UnLock();

if (!preProcOK)
if ( ! preProcOK)
{
for (BlockList_i i = blks_to_process.begin(); i != blks_to_process.end(); ++i)
dec_ref_count(*i);
return -1; // AMT ???
}

ProcessBlockRequests(blks_to_request);

long long bytes_read = 0;

// First, send out any direct requests.
Expand Down Expand Up @@ -578,12 +598,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

if (finished.empty())
{
TRACEF(Dump, "File::Read() wait block begin" );

m_downloadCond.Wait();

TRACEF(Dump, "File::Read() wait block end");

continue;
}
}
Expand All @@ -600,17 +615,18 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);

TRACEF(Dump, "File::Read() ub=" << (void*)iUserBuff << " from finished block " << (*bi)->m_offset/BS << " size " << size_to_copy);
TRACEF(Dump, "File::Read() ub=" << (void*)iUserBuff << " from finished block " << (*bi)->m_offset/BS << " size " << size_to_copy);
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;
m_stats.m_BytesRam += size_to_copy;
CheckPrefetchStatRAM(*bi);
}
else // it has failed ... krap up.
{
TRACEF(Error, "File::Read(), block "<< (*bi)->m_offset/BS << "finished with error ");
bytes_read = -1;
errno = (*bi)->m_errno;
TRACEF(Error, "File::Read(), block "<< (*bi)->m_offset/BS << " finished with error "
<< errno << " " << strerror(errno));
break;
}
++bi;
Expand All @@ -621,12 +637,12 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}

// Fourth, make sure all direct requests have arrived
if ((direct_handler != 0) && (bytes_read >= 0 ))
if (direct_handler != 0 && bytes_read >= 0)
{
TRACEF(Dump, "File::Read() waiting for direct requests ");
XrdSysCondVarHelper _lck(direct_handler->m_cond);

if (direct_handler->m_to_wait > 0)
while (direct_handler->m_to_wait > 0)
{
direct_handler->m_cond.Wait();
}
Expand Down Expand Up @@ -812,7 +828,6 @@ void File::free_block(Block* b)

void File::ProcessBlockResponse(Block* b, int res)
{

m_downloadCond.Lock();

TRACEF(Dump, "File::ProcessBlockResponse " << (void*)b << " " << b->m_offset/BufferSize());
Expand Down Expand Up @@ -890,30 +905,40 @@ void File::Prefetch()
if (m_prefetchState != kOn)
return;
}

// check index not on disk and not in RAM

// Check that block is not on disk and not in RAM.
// MT XXX: Could prefetch several blocks at once!

BlockList_t blks;

TRACEF(Dump, "File::Prefetch enter to check download status");
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)

m_downloadCond.Lock();

for (int f = 0; f < m_cfi.GetSizeInBits(); ++f)
{
XrdSysCondVarHelper _lck(m_downloadCond);
if (!m_cfi.TestBit(f))
if ( ! m_cfi.TestBit(f))
{
f += m_offset/m_cfi.GetBufferSize();
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
TRACEF(Dump, "File::Prefetch take block " << f);
cache()->RequestRAMBlock();
RequestBlock(f, true);
blks.push_back( PrepareBlockRequest(f, true) );
m_prefetchReadCnt++;
m_prefetchScore = float(m_prefetchHitCnt)/m_prefetchReadCnt;
found = true;
break;
}
}
}

if ( ! found)
m_downloadCond.UnLock();

if ( ! blks.empty())
{
ProcessBlockRequests(blks);
}
else
{
TRACEF(Dump, "File::Prefetch no free block found ");
m_stateCond.Lock();
Expand Down Expand Up @@ -966,7 +991,7 @@ XrdOucTrace* File::GetTrace()

void BlockResponseHandler::Done(int res)
{
m_block->m_file->ProcessBlockResponse(m_block, res);
m_block->m_file->ProcessBlockResponse(m_block, res);

delete this;
}
Expand Down
8 changes: 6 additions & 2 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -80,7 +80,9 @@ namespace XrdFileCache
m_buff.resize(size);
}

const char* get_buff(long long pos = 0) const { return &m_buff[pos]; }
char* get_buff(long long pos = 0) { return &m_buff[pos]; }
int get_size() { return (int) m_buff.size(); }
long long get_offset() { return m_offset; }

bool is_finished() { return m_downloaded || m_errno != 0; }
bool is_ok() { return m_downloaded; }
Expand All @@ -93,6 +95,7 @@ namespace XrdFileCache
}
};


class File
{
private:
Expand Down Expand Up @@ -213,7 +216,8 @@ namespace XrdFileCache
long long &blk_off, // offset in block
long long &size);
// Read
Block* RequestBlock(int i, bool prefetch);
Block* PrepareBlockRequest(int i, bool prefetch);
void ProcessBlockRequests(BlockList_t& blks);

int RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
char* buff, long long req_off, long long req_size);
Expand Down
22 changes: 15 additions & 7 deletions src/XrdFileCache/XrdFileCacheVRead.cc
Expand Up @@ -139,7 +139,8 @@ int File::ReadV(const XrdOucIOVec *readV, int n)
if (bytesRead >= 0 && direct_handler != 0)
{
XrdSysCondVarHelper _lck(direct_handler->m_cond);
if (direct_handler->m_to_wait == 1)

while (direct_handler->m_to_wait > 0)
{
direct_handler->m_cond.Wait();
}
Expand Down Expand Up @@ -204,7 +205,9 @@ bool File::VReadPreProcess(const XrdOucIOVec *readV, int n,
ReadVBlockListDisk &blocks_on_disk,
std::vector<XrdOucIOVec> &chunkVec)
{
XrdSysCondVarHelper _lck(m_downloadCond);
BlockList_t blks_to_request;

m_downloadCond.Lock();

for (int iov_idx = 0; iov_idx < n; iov_idx++)
{
Expand Down Expand Up @@ -233,10 +236,12 @@ bool File::VReadPreProcess(const XrdOucIOVec *readV, int n,
{
if (Cache::GetInstance().RequestRAMBlock())
{
Block *b = RequestBlock(block_idx, false);
Block *b = PrepareBlockRequest(block_idx, false);
// MT XXX this can not fail (other than out of memory which we don't handle).
if (!b) return false;
blocks_to_process.AddEntry(b, iov_idx);
inc_ref_count(b);
blocks_to_process.AddEntry(b, iov_idx);
blks_to_request.push_back(b);

TRACEF(Dump, "VReadPreProcess request block " << block_idx);
}
Expand All @@ -255,6 +260,10 @@ bool File::VReadPreProcess(const XrdOucIOVec *readV, int n,
}
}

m_downloadCond.UnLock();

ProcessBlockRequests(blks_to_request);

return true;
}

Expand Down Expand Up @@ -323,15 +332,14 @@ int File::VReadProcessBlocks(const XrdOucIOVec *readV, int n,
++bi;
}
}

if (finished.empty())
{
m_downloadCond.Wait();
continue;
}
}



std::vector<ReadVChunkListRAM>::iterator bi = finished.begin();
while (bi != finished.end())
{
Expand Down

0 comments on commit 7979386

Please sign in to comment.