Skip to content

Commit

Permalink
Fixes in destruction.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent 01c0f96 commit 39adb0b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 45 deletions.
111 changes: 68 additions & 43 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -74,20 +74,18 @@ m_temp_filename(disk_file_path),
m_offset(iOffset),
m_fileSize(iFileSize),

m_started(false),
m_failed(false),
m_stopping(false),
m_stopped(false),
m_stateCond(0), // We will explicitly lock the condition before use.

m_syncer(new DiskSyncer(this, "XrdFileCache::DiskSyncer")),
m_non_flushed_cnt(0),
m_in_sync(false),

m_block_cond(0)
// m_num_reads(0)
{
clLog()->Debug(XrdCl::AppMsg, "File::File() %s", m_input.Path());
Open();
}

File::~File()
Expand All @@ -100,16 +98,20 @@ File::~File()
while (true)
{
m_stateCond.Lock();
bool isStopped = m_stopped;
bool isStopped = m_stopping;
m_stateCond.UnLock();
if (isStopped)
{
// TODO AMT: wait for map to clear
break;
printf("~FILE map size %ld \n", m_block_map.size());
if ( m_block_map.empty())
break;
}
XrdSysTimer::Wait(100);
}
clLog()->Debug(XrdCl::AppMsg, "File::~File finished with writing %s",lPath() );


// Wait disk sync
bool do_sync = false;
{
XrdSysMutexHelper _lck(&m_syncStatusMutex);
Expand Down Expand Up @@ -147,11 +149,10 @@ bool File::InitiateClose()
{
// Retruns true if delay is needed
clLog()->Debug(XrdCl::AppMsg, "File::Initiate close start", lPath());
if (m_cfi.IsComplete()) return false;
m_stateCond.Lock();
if (m_started == false) return false;
m_stopping = true;
m_stateCond.UnLock();
if (m_cfi.IsComplete()) return false; // AMT maybe map size is here more meaningfull
return true;
}

Expand Down Expand Up @@ -272,6 +273,7 @@ Block* File::RequestBlock(int i)
//
// Reference count is 0 so increase it in calling function if you want to
// catch the block while still in memory.
clLog()->Dump(XrdCl::AppMsg, "Request block %d ", i);

XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

Expand All @@ -295,6 +297,8 @@ Block* File::RequestBlock(int i)
int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
char* req_buf, long long req_off, long long req_size)
{

clLog()->Dump(XrdCl::AppMsg, "RequestBlockDirect %ld ", blocks.size());
XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

const long long BS = m_cfi.GetBufferSize();
Expand Down Expand Up @@ -325,6 +329,8 @@ int File::RequestBlocksDirect(DirectResponseHandler *handler, IntList_t& blocks,
int File::ReadBlocksFromDisk(std::list<int>& blocks,
char* req_buf, long long req_off, long long req_size)
{

clLog()->Dump(XrdCl::AppMsg, "ReadBlocksFromDisk %ld ", blocks.size());
const long long BS = m_cfi.GetBufferSize();

long long total = 0;
Expand Down Expand Up @@ -427,7 +433,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
DirectResponseHandler *direct_handler = 0;
int direct_size = 0;

if ( ! blks_direct.empty())
if (!blks_direct.empty())
{
direct_handler = new DirectResponseHandler(blks_direct.size());

Expand All @@ -436,45 +442,54 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}

// Second, read blocks from disk.
int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize);
if (rc >= 0)
{
bytes_read += rc;
if (!blks_on_disk.empty()) {
int rc = ReadBlocksFromDisk(blks_on_disk, iUserBuff, iUserOff, iUserSize);
if (rc >= 0)
{
bytes_read += rc;
}
else
{
bytes_read = rc; // AMT ?? should there be an immediate return
}
}
else
{
bytes_read = rc; // AMT ?? should there be an immediate return
}

// Third, loop over blocks that are available or incoming
while ( ! blks_to_process.empty() && bytes_read >= 0)
while ( ! blks_to_process.empty() && bytes_read >= 0) // AMT : do I need this loop ?
{
BlockList_t finished;
BlockList_t finished;

{
XrdSysCondVarHelper _lck(m_block_cond);
{
XrdSysCondVarHelper _lck(m_block_cond);

BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
if ((*bi)->is_finished())
{
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
}
else
{
++bi;
}
}
BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
clLog()->Dump(XrdCl::AppMsg, "searcing for blocks finished");
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "found finished block");
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
}
else
{
++bi;
}
}

if (finished.empty())
{
m_block_cond.Wait();
continue;
}
}
if (finished.empty())
{

clLog()->Dump(XrdCl::AppMsg, "wait block begin");

m_block_cond.Wait();

clLog()->Dump(XrdCl::AppMsg, "wait block end");

continue;
}
}

BlockList_i bi = finished.begin();
while (bi != finished.end())
Expand All @@ -486,23 +501,30 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
long long off_in_block; // offset in block
long long size_to_copy; // size to copy

clLog()->Dump(XrdCl::AppMsg, "Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;
}
else // it has failed ... krap up.
{

clLog()->Error(XrdCl::AppMsg, "Block finished with eorror.");
bytes_read = -1;
errno = (*bi)->m_errno;
break;
}
++bi;
}

std::copy(finished.begin(), finished.end(), std::back_inserter(blks_processed));
finished.clear();
}

// Fourth, make sure all direct requests have arrived
if (direct_handler != 0)
{
clLog()->Error(XrdCl::AppMsg, "Read() waiting for direct requests.");
XrdSysCondVarHelper _lck(direct_handler->m_cond);

if (direct_handler->m_to_wait > 0)
Expand Down Expand Up @@ -572,7 +594,7 @@ void File::WriteBlockToDisk(Block* b)
}
if (cnt > PREFETCH_MAX_ATTEMPTS)
{
clLog()->Error(XrdCl::AppMsg, "File::WriteToDisk() write failes too manny attempts %s", lPath());
clLog()->Error(XrdCl::AppMsg, "File::WriteToDisk() write failed too manny attempts %s", lPath());
return;
}
}
Expand Down Expand Up @@ -663,6 +685,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"Direct block response");
XrdSysCondVarHelper _lck(m_cond);

--m_to_wait;
Expand Down Expand Up @@ -709,6 +732,8 @@ void File::dec_ref_count(Block* b)

void File::ProcessBlockResponse(Block* b, XrdCl::XRootDStatus *status)
{
clLog()->Dump(XrdCl::AppMsg, "File::ProcessBlockResponse %d ",(int)(b->m_offset/BufferSize()));

m_block_cond.Lock();

if (status->IsOK())
Expand Down
3 changes: 1 addition & 2 deletions src/XrdFileCache/XrdFileCacheFile.hh
Expand Up @@ -96,10 +96,9 @@ namespace XrdFileCache
long long m_offset; //!< offset of cached file for block-based operation
long long m_fileSize; //!< size of cached disk file for block-based operation

bool m_started; //!< state of run thread
bool m_failed; //!< reading from original source or writing to disk has failed
bool m_stopping; //!< run thread should be stopped
bool m_stopped; //!< prefetch is stopped

XrdSysCondVar m_stateCond; //!< state condition variable

XrdSysMutex m_downloadStatusMutex; //!< mutex locking access to m_cfi object
Expand Down

0 comments on commit 39adb0b

Please sign in to comment.