Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple IO objects working with the same file #835

Merged
merged 1 commit into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 67 additions & 47 deletions src/XrdFileCache/XrdFileCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ XrdOucCache2 *XrdOucGetCache2(XrdSysLogger *logger,
pthread_t tid1;
XrdSysThread::Run(&tid1, ProcessWriteTaskThread, (void*)(&factory), 0, "XrdFileCache WriteTasks ");

pthread_t tid2;
XrdSysThread::Run(&tid2, PrefetchThread, (void*)(&factory), 0, "XrdFileCache Prefetch ");
if (factory.RefConfiguration().m_prefetch_max_blocks > 0)
{
pthread_t tid2;
XrdSysThread::Run(&tid2, PrefetchThread, (void*)(&factory), 0, "XrdFileCache Prefetch ");
}

pthread_t tid;
XrdSysThread::Run(&tid, PurgeThread, NULL, 0, "XrdFileCache Purge");
Expand Down Expand Up @@ -166,6 +169,8 @@ Cache::Cache(XrdSysLogger *logger) :
{
// Default log level is Warning.
m_trace->What = 2;

m_prefetch_enabled = (m_configuration.m_prefetch_max_blocks > 0);
}


Expand Down Expand Up @@ -203,6 +208,7 @@ int Cache::isAttached()
void Cache::AddWriteTask(Block* b, bool fromRead)
{
TRACE(Dump, "Cache::AddWriteTask() bOff=%ld " << b->m_offset);

m_writeQ.condVar.Lock();
if (fromRead)
m_writeQ.queue.push_back(b);
Expand Down Expand Up @@ -281,11 +287,11 @@ void Cache::RAMBlockReleased()
}


File* Cache::GetFile(const std::string& path, IO* iIO, long long off, long long filesize)
File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
{
// Called from virtual IO::Attach

TRACE(Debug, "Cache::GetFile " << path);
TRACE(Debug, "Cache::GetFile " << path << ", io " << io);

{
XrdSysCondVarHelper lock(&m_active_cond);
Expand All @@ -304,15 +310,9 @@ File* Cache::GetFile(const std::string& path, IO* iIO, long long off, long long

if (it->second != 0)
{
IO* prevIO = it->second->SetIO(iIO);
if (prevIO)
{
prevIO->RelinquishFile(it->second);
}
else
{
inc_ref_cnt(it->second, false);
}
it->second->AddIO(io);
inc_ref_cnt(it->second, false, true);

return it->second;
}
else
Expand All @@ -326,7 +326,7 @@ File* Cache::GetFile(const std::string& path, IO* iIO, long long off, long long
if (filesize == 0)
{
struct stat st;
int res = iIO->Fstat(st);
int res = io->Fstat(st);
if (res) {
TRACE(Error, "Cache::GetFile, could not get valid stat");
return 0;
Expand All @@ -335,29 +335,35 @@ File* Cache::GetFile(const std::string& path, IO* iIO, long long off, long long
filesize = st.st_size;
}

File* file = new File(iIO, path, off, filesize);
File* file = new File(path, off, filesize);

{
XrdSysCondVarHelper lock(&m_active_cond);

inc_ref_cnt(file, false);
inc_ref_cnt(file, false, true);
m_active[file->GetLocalPath()] = file;

file->AddIO(io);

m_active_cond.Broadcast();
}

return file;
}


void Cache::ReleaseFile(File* f)
void Cache::ReleaseFile(File* f, IO* io)
{
// Called from virtual IO::Detach

TRACE(Debug, "Cache::ReleaseFile " << f->GetLocalPath());
TRACE(Debug, "Cache::ReleaseFile " << f->GetLocalPath() << ", io " << io);

f->ReleaseIO();
dec_ref_cnt(f);
{
XrdSysCondVarHelper lock(&m_active_cond);

f->RemoveIO(io);
}
dec_ref_cnt(f, true);
}


Expand All @@ -368,17 +374,19 @@ class DiskSyncer : public XrdJob
{
private:
File *m_file;
bool m_high_debug;

public:
DiskSyncer(File *f, const char *desc = "") :
DiskSyncer(File *f, bool high_debug, const char *desc = "") :
XrdJob(desc),
m_file(f)
m_file(f),
m_high_debug(high_debug)
{}

void DoIt()
{
m_file->Sync();
Cache::GetInstance().FileSyncDone(m_file);
Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
delete this;
}
};
Expand Down Expand Up @@ -413,10 +421,10 @@ void *callDoIt(void *pp)
}


void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set)
void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
{
DiskSyncer* ds = new DiskSyncer(f);
if ( ! ref_cnt_already_set) inc_ref_cnt(f, true);
DiskSyncer* ds = new DiskSyncer(f, high_debug);
if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
if (m_isClient) ds->DoIt();
else if (schedP) schedP->Schedule(ds);
else {pthread_t tid;
Expand All @@ -425,47 +433,51 @@ void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set)
}


void Cache::FileSyncDone(File* f)
void Cache::FileSyncDone(File* f, bool high_debug)
{
dec_ref_cnt(f);
dec_ref_cnt(f, high_debug);
}


void Cache::inc_ref_cnt(File* f, bool lock)
void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
{
// called from GetFile() or SheduleFileSync();

TRACE(Debug, "Cache::inc_ref_cnt " << f->GetLocalPath());

int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;

if (lock) m_active_cond.Lock();
f->inc_ref_cnt();
int rc = f->inc_ref_cnt();
if (lock) m_active_cond.UnLock();


TRACE_INT(tlvl, "Cache::inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
}


void Cache::dec_ref_cnt(File* f)
void Cache::dec_ref_cnt(File* f, bool high_debug)
{
// called from ReleaseFile() or DiskSync callback

// Called from ReleaseFile() or DiskSync callback.

int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;

m_active_cond.Lock();
int cnt = f->get_ref_cnt();
m_active_cond.UnLock();

TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);

if (cnt == 1)
{
if (f->FinalizeSyncBeforeExit())
{
TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
schedule_file_sync(f, true);
schedule_file_sync(f, true, true);
return;
}
}

m_active_cond.Lock();
cnt = f->dec_ref_cnt();
TRACE(Debug, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
TRACE_INT(tlvl, "Cache::dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
if (cnt == 0)
{
ActiveMap_i it = m_active.find(f->GetLocalPath());
Expand All @@ -491,21 +503,28 @@ bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)

void Cache::RegisterPrefetchFile(File* file)
{
// called from File::Open()
// Can be called with other locks held.

if (Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
if ( ! m_prefetch_enabled)
{
m_prefetch_condVar.Lock();
m_prefetchList.push_back(file);
m_prefetch_condVar.Signal();
m_prefetch_condVar.UnLock();
return;
}

m_prefetch_condVar.Lock();
m_prefetchList.push_back(file);
m_prefetch_condVar.Signal();
m_prefetch_condVar.UnLock();
}


void Cache::DeRegisterPrefetchFile(File* file)
{
// called from last line File::InitiateClose()
// Can be called with other locks held.

if ( ! m_prefetch_enabled)
{
return;
}

m_prefetch_condVar.Lock();
for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
Expand Down Expand Up @@ -541,7 +560,8 @@ File* Cache::GetNextFileToPrefetch()

void Cache::Prefetch()
{
int limitRAM = int( Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7 );
const int limitRAM = int( Cache::GetInstance().RefConfiguration().m_NRamBuffers * 0.7 );

while (true)
{
m_RAMblock_mutex.Lock();
Expand Down
17 changes: 9 additions & 8 deletions src/XrdFileCache/XrdFileCache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ public:

File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);

void ReleaseFile(File*);
void ReleaseFile(File*, IO*);

void ScheduleFileSync(File* f) { schedule_file_sync(f, false); }
void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }

void FileSyncDone(File*);
void FileSyncDone(File*, bool high_debug);

XrdSysTrace* GetTrace() { return m_trace; }

Expand Down Expand Up @@ -270,9 +270,10 @@ private:

Configuration m_configuration; //!< configurable parameters

XrdSysCondVar m_prefetch_condVar; //!< central lock for this class
XrdSysCondVar m_prefetch_condVar; //!< lock for vector of prefetching files
bool m_prefetch_enabled; //!< set to true when prefetching is enabled

XrdSysMutex m_RAMblock_mutex; //!< central lock for this class
XrdSysMutex m_RAMblock_mutex; //!< lock for allcoation of RAM blocks
int m_RAMblocks_used;
bool m_isClient; //!< True if running as client

Expand All @@ -298,10 +299,10 @@ private:
bool m_in_purge;
XrdSysCondVar m_active_cond;

void inc_ref_cnt(File*, bool lock);
void dec_ref_cnt(File*);
void inc_ref_cnt(File*, bool lock, bool high_debug);
void dec_ref_cnt(File*, bool high_debug);

void schedule_file_sync(File*, bool ref_cnt_already_set);
void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);

// prefetching
typedef std::vector<File*> PrefetchList;
Expand Down