Skip to content

Commit

Permalink
Add dump log messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
alja authored and osschar committed Mar 9, 2016
1 parent 9703af0 commit a1fab5a
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 30 deletions.
12 changes: 9 additions & 3 deletions src/XrdFileCache/XrdFileCache.cc
Expand Up @@ -179,7 +179,7 @@ bool
Cache::RequestRAMBlock()
{
XrdSysMutexHelper lock(&m_RAMblock_mutex);
if ( m_RAMblocks_used > Factory::GetInstance().RefConfiguration().m_NRamBuffers )
if ( m_RAMblocks_used < Factory::GetInstance().RefConfiguration().m_NRamBuffers )
{
m_RAMblocks_used++;
return true;
Expand Down Expand Up @@ -249,8 +249,10 @@ File*
Cache::GetNextFileToPrefetch()
{
XrdSysMutexHelper lock(&m_prefetch_mutex);
if (m_files.empty())
if (m_files.empty()) {
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::GetNextFileToPrefetch ... no open files");
return 0;
}

std::sort(m_files.begin(), m_files.end(), myobject);
File* f = m_files.back();
Expand All @@ -264,14 +266,18 @@ Cache::GetNextFileToPrefetch()
void
Cache::Prefetch()
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch thread start");

while (true) {
File* f = GetNextFileToPrefetch();
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg, "Cache::Prefetch got file (%p)", f);

if (f) {
f->Prefetch();
}
else {
// wait for new file, AMT should I wait for the signal instead ???
XrdSysTimer::Wait(10);
XrdSysTimer::Wait(1);
}
}
}
3 changes: 3 additions & 0 deletions src/XrdFileCache/XrdFileCacheFactory.cc
Expand Up @@ -310,6 +310,7 @@ bool Factory::Config(XrdSysLogger *logger, const char *config_filename, const ch

bool Factory::ConfigParameters(std::string part, XrdOucStream& config )
{
printf("part %s \n", part.c_str());
XrdSysError err(0, "");
if ( part == "user" )
{
Expand Down Expand Up @@ -366,7 +367,9 @@ bool Factory::ConfigParameters(std::string part, XrdOucStream& config )
}
else if (part == "prefetch" )
{
printf("prefetch enabled !!!!\n");
m_configuration.m_prefetch = true;
config.GetWord();
}
else if (part == "nram" )
{
Expand Down
63 changes: 41 additions & 22 deletions src/XrdFileCache/XrdFileCacheFile.cc
Expand Up @@ -225,7 +225,7 @@ bool File::Open()
return false;
}

if (m_cfi.Read(m_infoFile) <= 0)
if (m_cfi.Read(m_infoFile, Factory::GetInstance().RefConfiguration().m_prefetch) <= 0)
{
int ss = (m_fileSize - 1)/m_cfi.GetBufferSize() + 1;
clLog()->Info(XrdCl::AppMsg, "Creating new file info with size %lld. Reserve space for %d blocks %s", m_fileSize, ss, m_input.Path());
Expand Down Expand Up @@ -291,7 +291,7 @@ Block* File::RequestBlock(int i, bool prefetch)
//
// Reference count is 0 so increase it in calling function if you want to
// catch the block while still in memory.
clLog()->Dump(XrdCl::AppMsg, "Request block %d ", i);
clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d pOn=(%d)", i, prefetch);

XrdCl::File &client = ((XrdPosixFile*)(&m_input))->clFile;

Expand All @@ -306,6 +306,7 @@ Block* File::RequestBlock(int i, bool prefetch)

client.Read(off, this_bs, (void*)b->get_buff(), new BlockResponseHandler(b));

clLog()->Debug(XrdCl::AppMsg, "RequestBlock() %d END", i);
return b;
}

Expand Down Expand Up @@ -347,7 +348,7 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,
char* req_buf, long long req_off, long long req_size)
{

clLog()->Dump(XrdCl::AppMsg, "ReadBlocksFromDisk %ld ", blocks.size());
clLog()->Dump(XrdCl::AppMsg, "File::ReadBlocksFromDisk %ld ", blocks.size());
const long long BS = m_cfi.GetBufferSize();

long long total = 0;
Expand Down Expand Up @@ -380,6 +381,8 @@ int File::ReadBlocksFromDisk(std::list<int>& blocks,

int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
{
clLog()->Dump(XrdCl::AppMsg, "File::Read() begin ");

const long long BS = m_cfi.GetBufferSize();

// lock
Expand Down Expand Up @@ -426,6 +429,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Is there room for one more RAM Block?
if ( cache()->RequestRAMBlock())
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() request block to fetch %d", block_idx);
Block *b = RequestBlock(block_idx, false);
inc_ref_count(b);
blks_to_process.push_back(b);
Expand All @@ -434,6 +438,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Nope ... read this directly without caching.
else
{
clLog()->Debug(XrdCl::AppMsg, "File::Read() direct block %d", block_idx);
blks_direct.push_back(block_idx);
}
}
Expand Down Expand Up @@ -479,10 +484,10 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
BlockList_i bi = blks_to_process.begin();
while (bi != blks_to_process.end())
{
clLog()->Dump(XrdCl::AppMsg, "searcing for blocks finished");
clLog()->Dump(XrdCl::AppMsg, "File::Read() searcing for blocks finished");
if ((*bi)->is_finished())
{
clLog()->Dump(XrdCl::AppMsg, "found finished block");
clLog()->Dump(XrdCl::AppMsg, "File::Read() found finished block");
finished.push_back(*bi);
BlockList_i bj = bi++;
blks_to_process.erase(bj);
Expand All @@ -496,11 +501,11 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
if (finished.empty())
{

clLog()->Dump(XrdCl::AppMsg, "wait block begin");
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block begin");

m_downloadCond.Wait();

clLog()->Dump(XrdCl::AppMsg, "wait block end");
clLog()->Dump(XrdCl::AppMsg, "File::Read() wait block end");

continue;
}
Expand All @@ -515,7 +520,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
long long off_in_block; // offset in block
long long size_to_copy; // size to copy

clLog()->Dump(XrdCl::AppMsg, "Block finished ok.");
clLog()->Dump(XrdCl::AppMsg, "File::Read() Block finished ok.");
overlap((*bi)->m_offset/BS, BS, iUserOff, iUserSize, user_off, off_in_block, size_to_copy);
memcpy(&iUserBuff[user_off], &((*bi)->m_buff[off_in_block]), size_to_copy);
bytes_read += size_to_copy;
Expand All @@ -524,7 +529,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
}
else // it has failed ... krap up.
{
clLog()->Error(XrdCl::AppMsg, "Block finished with eorror.");
clLog()->Error(XrdCl::AppMsg, "File::Read() Block finished with eorror.");
bytes_read = -1;
errno = (*bi)->m_errno;
break;
Expand All @@ -539,7 +544,7 @@ int File::Read(char* iUserBuff, long long iUserOff, int iUserSize)
// Fourth, make sure all direct requests have arrived
if (direct_handler != 0)
{
clLog()->Error(XrdCl::AppMsg, "Read() waiting for direct requests.");
clLog()->Error(XrdCl::AppMsg, "File::Read() waiting for direct requests.");
XrdSysCondVarHelper _lck(direct_handler->m_cond);

if (direct_handler->m_to_wait > 0)
Expand Down Expand Up @@ -785,29 +790,40 @@ void File::Prefetch()


if (!stopping) {

clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch enter to check download status \n");
XrdSysCondVarHelper _lck(m_downloadCond);
if (m_cfi.IsComplete() == false)
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch enter to check download status BEGIN \n");
if (m_cfi.IsComplete() == false && m_block_map.size() < 1600)
{
int block_idx = -1;
clLog()->Debug(XrdCl::AppMsg, "Prefetch::Prefetch begin, block size %ld", m_block_map.size());

// check index not on disk and not in RAM
for (int f = 0; f < m_cfi.GetSizeInBits(); ++f)
bool found = false;
for (int f=0; f < m_cfi.GetSizeInBits(); ++f)
{
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch test bit %d", f);
if (!m_cfi.TestBit(f))
{
BlockMap_i bi = m_block_map.find(block_idx);
BlockMap_i bi = m_block_map.find(f);
if (bi == m_block_map.end()) {
block_idx = f;
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch take block %d", f);
Block *b = RequestBlock(f, true);
inc_ref_count(b);
m_prefetchReadCnt++;
m_prefetchScore = m_prefetchHitCnt/m_prefetchReadCnt;
found = true;
break;
}
}
}

if (cache()->RequestRAMBlock()) {
m_prefetchReadCnt++;
m_prefetchScore = m_prefetchHitCnt/m_prefetchReadCnt;
Block *b = RequestBlock(block_idx, true);
inc_ref_count(b);
if (!found) {
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch no free blcok found ");
m_cfi.CheckComplete();
if (m_cfi.IsComplete() == false)
clLog()->Dump(XrdCl::AppMsg, "Prefetch::Prefetch This shoulf not happedn !!!");
}
clLog()->Debug(XrdCl::AppMsg, "Prefetch::Prefetch end");
}
}

Expand Down Expand Up @@ -844,6 +860,8 @@ float File::GetPrefetchScore() const
//______________________________________________________________________________
void File::MarkPrefetch()
{

XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"File::MarkPrefetch()");
m_stateCond.Lock();
m_prefetchCurrentCnt++;
m_stateCond.UnLock();
Expand All @@ -865,6 +883,7 @@ void File::UnMarkPrefetch()
void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"BlockResponseHandler::HandleResponse()");
m_block->m_file->ProcessBlockResponse(m_block, status);

delete status;
Expand All @@ -878,7 +897,7 @@ void BlockResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
void DirectResponseHandler::HandleResponse(XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response)
{
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"Direct block response");
XrdCl::DefaultEnv::GetLog()->Dump(XrdCl::AppMsg,"DirectResponseHandler::HandleRespons()");
XrdSysCondVarHelper _lck(m_cond);

--m_to_wait;
Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheIOEntireFile.cc
Expand Up @@ -49,7 +49,7 @@ IOEntireFile::~IOEntireFile()

bool IOEntireFile::ioActive()
{
printf("called ioActive ...\n");
printf("called ioActive ...\n");
return m_file->InitiateClose();
}

Expand Down
13 changes: 10 additions & 3 deletions src/XrdFileCache/XrdFileCacheInfo.cc
Expand Up @@ -56,14 +56,14 @@ Info::~Info()
//______________________________________________________________________________


void Info::ResizeBits(int s, bool prefetch_stat)
void Info::ResizeBits(int s, bool init_prefetch_buff)
{
m_sizeInBits = s;
m_buff_fetched = (unsigned char*)malloc(GetSizeInBytes());
m_buff_write_called = (unsigned char*)malloc(GetSizeInBytes());
memset(m_buff_fetched, 0, GetSizeInBytes());
memset(m_buff_write_called, 0, GetSizeInBytes());
if (prefetch_stat) {
if (init_prefetch_buff) {
m_buff_prefetch = (unsigned char*)malloc(GetSizeInBytes());
memset(m_buff_prefetch, 0, GetSizeInBytes());
}
Expand All @@ -72,7 +72,7 @@ void Info::ResizeBits(int s, bool prefetch_stat)
//______________________________________________________________________________


int Info::Read(XrdOssDF* fp)
int Info::Read(XrdOssDF* fp, bool init_prefetch_buff )
{
// does not need lock, called only in File::Open
// before File::Run() starts
Expand All @@ -95,6 +95,13 @@ int Info::Read(XrdOssDF* fp)

off += fp->Read(&m_accessCnt, off, sizeof(int));
clLog()->Dump(XrdCl::AppMsg, "Info:::Read() complete %d access_cnt %d", m_complete, m_accessCnt);


if (init_prefetch_buff) {
m_buff_prefetch = (unsigned char*)malloc(GetSizeInBytes());
memset(m_buff_prefetch, 0, GetSizeInBytes());
}

return off;
}

Expand Down
2 changes: 1 addition & 1 deletion src/XrdFileCache/XrdFileCacheInfo.hh
Expand Up @@ -100,7 +100,7 @@ namespace XrdFileCache
//!
//! @return number of bytes read
//---------------------------------------------------------------------
int Read(XrdOssDF* fp);
int Read(XrdOssDF* fp, bool init_prefetch = false);

//---------------------------------------------------------------------
//! Write number of blocks and read buffer size
Expand Down

0 comments on commit a1fab5a

Please sign in to comment.