Skip to content

Commit

Permalink
Fix double read size from direct reads.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent ca271c8 commit 47328dc
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions src/XrdFileCache/XrdFileCacheFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <stdio.h>
#include <sstream>
#include <fcntl.h>
#include <assert.h>
#include "XrdCl/XrdClLog.hh"
#include "XrdCl/XrdClConstants.hh"
#include "XrdCl/XrdClFile.hh"
Expand Down Expand Up @@ -315,6 +316,7 @@ namespace
blk_off = ovlp_beg - beg;
size = ovlp_end - ovlp_beg;

assert(size <= blk_size);
return true;
}
else
Expand Down Expand Up @@ -363,8 +365,6 @@ Block* File::RequestBlock(int i, bool prefetch)
int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
char* req_buf, long long req_off, long long req_size)
{

clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect %ld ", blocks.size());
XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

const long long BS = m_cfi.GetBufferSize();
Expand All @@ -386,9 +386,12 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
if (!status.IsOK())
{
clLog()->Error(XrdCl::AppMsg, "File::RequestBlocksDirect error %s\n", lPath());
return total;
//return total;
return -1; // AMT all reads should be canceled in this case
}
else {
clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect success %d %ld %s", *ii, size, lPath());
}

total += size;
}

Expand All @@ -401,7 +404,7 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,
char* req_buf, long long req_off, long long req_size)
{

clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk %ld ", blocks.size());
clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk %ld %s", blocks.size(), lPath());
const long long BS = m_cfi.GetBufferSize();

long long total = 0;
Expand All @@ -418,12 +421,24 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,
overlap(*ii, BS, req_off, req_size, off, blk_off, size);

long long rs = m_output->Read(req_buf + off, *ii * BS + blk_off, size);
clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk block %d size %d %s", *ii, size, lPath());


if (rs < 0)
if (rs < 0) {
clLog()->Error(XrdCl::AppMsg, "File::ReadBlocksFromDisk neg retval %ld (%ld@%d) %s", rs, *ii * BS + blk_off, lPath());
return rs;
}


// AMT I think we should exit in this case too
if (rs !=size) {
clLog()->Error(XrdCl::AppMsg, "File::ReadBlocksFromDisk incomplete %ld (%ld@%d) %s", rs, *ii * BS + blk_off, lPath());
return -1;
}

total += rs;


CheckPrefetchStatDisk(*ii);
}

Expand Down Expand Up @@ -457,6 +472,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
{
clLog()->Dump(XrdCl::AppMsg, "--- File::Read() idx %d %s \n", block_idx, lPath());
BlockMap_i bi = m_block_map.find(block_idx);

// In RAM or incoming?
Expand All @@ -466,13 +482,14 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// XXXX Or just push it and handle errors in one place later?

inc_ref_count(bi->second);
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count for existing block %p %d %s", (void*)bi->second, block_idx, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p inc_ref_count for existing block %p %d %s", (void*)iUserBuff, (void*)bi->second, block_idx, lPath());
blks_to_process.push_front(bi->second);
m_stats.m_BytesRam++; // AMT what if block fails
}
// On disk?
else if (m_cfi.TestBit(block_idx))
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p read from disk %d %s", (void*)iUserBuff, block_idx, lPath());
blks_on_disk.push_back(block_idx);
m_stats.m_BytesDisk++;
}
Expand All @@ -482,7 +499,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Is there room for one more RAM Block?
if ( cache()->HaveFreeWritingSlots() && cache()->RequestRAMBlock())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() inc_ref_count new %d %s", block_idx, lPath());
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p inc_ref_count new %d %s", (void*)iUserBuff, block_idx, lPath());
Block *b = RequestBlock(block_idx, false);
assert(b);
inc_ref_count(b);
Expand All @@ -492,7 +509,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Nope ... read this directly without caching.
else
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() direct block %d", block_idx);
clLog()->Debug(XrdCl::AppMsg, "File::Read() direct block %d %s", block_idx, lPath());
blks_direct.push_back(block_idx);
m_stats.m_BytesMissed++;
}
Expand All @@ -513,24 +530,26 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
direct_handler = new DirectResponseHandler(blks_direct.size());

direct_size = RequestBlocksDirect(direct_handler, blks_direct, iUserBuff, iUserOff, iUserSize);
bytes_read += direct_size; // AMT added
clLog()->Dump(XrdCl::AppMsg, "File::Read() direct read %d. %s", direct_size, lPath());
}

// Second, read blocks from disk.
if (!blks_on_disk.empty()) {
if ((!blks_on_disk.empty()) && (bytes_read >= 0)) {
int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize);
clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from disk %d. %s", (void*)iUserBuff, rc, lPath());
if (rc >= 0)
{
bytes_read += rc;
}
else
{
bytes_read = rc;
clLog()->Error(XrdCl::AppMsg, "File::Read() failed to read from disk.");
clLog()->Error(XrdCl::AppMsg, "File::Read() failed to read from disk. %s", lPath());
// AMT commented line below should not be an immediate return, can have block refcount increased and map increased
// return rc;
}
}

// Third, loop over blocks that are available or incoming
while ( (! blks_to_process.empty()) && (bytes_read >= 0))
{
Expand All @@ -545,7 +564,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for block %p finished", (void*)(*bi));
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block %p", (void*)(*bi));
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block %p %s", (void*)(*bi), lPath());
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
Expand All @@ -559,16 +578,18 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
if (finished.empty())
{

clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin");
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin %s", lPath());

m_downloadCond.Wait();

clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end");
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end %s", lPath());

continue;
}
}

clLog()->Dump(XrdCl::AppMsg, "File::Read() bytes read before processing blocks %d %s\n", bytes_read, lPath());

BlockList_i bi = finished.begin();
while (bi != finished.end())
{
Expand All @@ -580,14 +601,16 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

// clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);

clLog()->Dump(XrdCl::AppMsg, "File::Read() u=%p, from finished block %d , size %d end %s", (void*)iUserBuff, (*bi)->m_offset/BS, size_to_copy, lPath());
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;

CheckPrefetchStatRAM(*bi);
}
else // it has failed ... krap up.
{
clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with error.");
clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with error %s.", lPath());
bytes_read = -1;
errno = (*bi)->m_errno;
break;
Expand All @@ -599,10 +622,12 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
finished.clear();
}

clLog()->Dump(XrdCl::AppMsg, "File::Read() bytes read after processing blocks %d %s\n", bytes_read, lPath());

// Fourth, make sure all direct requests have arrived
if (direct_handler != 0)
if ((direct_handler != 0) && (bytes_read >= 0 ))
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() waiting for direct requests.");
clLog()->Debug(XrdCl::AppMsg, "File::Read() waiting for direct requests %s.", lPath());
XrdSysCondVarHelper _lck(direct_handler->m_cond);

if (direct_handler->m_to_wait > 0)
Expand All @@ -622,12 +647,14 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)

delete direct_handler;
}
clLog()->Debug(XrdCl::AppMsg, "File::Read() before assert %s.", lPath());
assert(iUserSize >= bytes_read);

// Last, stamp and release blocks, release file.
{
XrdSysCondVarHelper _lck(m_downloadCond);

// XXXX stamp file ???
// AMT what is stamp block ???

// blks_to_process can be non-empty, if we're exiting with an error.
std::copy(blks_to_process.begin(), blks_to_process.end(), std::back_inserter(blks_processed));
Expand Down

0 comments on commit 47328dc

Please sign in to comment.