Skip to content

Commit

Permalink
[Pfc] Implement async read and readV from the perspective of XrdOucCa…
Browse files Browse the repository at this point in the history
…cheIO. (#1717)

Implement async Read, pgRead, and ReadV.

Mark file as complete when final block is successfully written to disk.

Perform sync when file is completely downloaded, do not wait for final detach.

Adjust default block size down to 128 kB to be closer to async default of 64 kB.

Rename class IOEntireFile to IOFile.

IO: Use override for virtual methods.

File: Cache some frequently used variables in adjacent locations.

Info: Coalesce functions SetBufferSize/FileSizeAndCreationTime() into a single function as this all needs to happen at the same time.

Put pfc.blocksize into env XRDPFC.SEGSIZE.
  • Loading branch information
osschar committed Jun 7, 2022
1 parent a43982a commit 8caf77f
Show file tree
Hide file tree
Showing 17 changed files with 1,098 additions and 1,132 deletions.
3 changes: 1 addition & 2 deletions src/XrdPfc.cmake
Expand Up @@ -23,11 +23,10 @@ add_library(
XrdPfc/XrdPfcPurge.cc
XrdPfc/XrdPfcCommand.cc
XrdPfc/XrdPfcFile.cc XrdPfc/XrdPfcFile.hh
XrdPfc/XrdPfcVRead.cc
XrdPfc/XrdPfcStats.hh
XrdPfc/XrdPfcInfo.cc XrdPfc/XrdPfcInfo.hh
XrdPfc/XrdPfcIO.cc XrdPfc/XrdPfcIO.hh
XrdPfc/XrdPfcIOEntireFile.cc XrdPfc/XrdPfcIOEntireFile.hh
XrdPfc/XrdPfcIOFile.cc XrdPfc/XrdPfcIOFile.hh
XrdPfc/XrdPfcIOFileBlock.cc XrdPfc/XrdPfcIOFileBlock.hh
XrdPfc/XrdPfcDecision.hh)

Expand Down
12 changes: 6 additions & 6 deletions src/XrdPfc/XrdPfc.cc
Expand Up @@ -38,7 +38,7 @@
#include "XrdPfc.hh"
#include "XrdPfcTrace.hh"
#include "XrdPfcInfo.hh"
#include "XrdPfcIOEntireFile.hh"
#include "XrdPfcIOFile.hh"
#include "XrdPfcIOFileBlock.hh"

using namespace XrdPfc;
Expand Down Expand Up @@ -219,18 +219,18 @@ XrdOucCacheIO *Cache::Attach(XrdOucCacheIO *io, int Options)
}
else
{
IOEntireFile *ioef = new IOEntireFile(io, *this);
IOFile *iof = new IOFile(io, *this);

if ( ! ioef->HasFile())
if ( ! iof->HasFile())
{
delete ioef;
delete iof;
// TODO - redirect instead. But this is kind of an awkward place for it.
// errno is set during IOEntireFile construction.
// errno is set during IOFile construction.
TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
return io;
}

cio = ioef;
cio = iof;
}

TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
Expand Down
3 changes: 1 addition & 2 deletions src/XrdPfc/XrdPfcCommand.cc
Expand Up @@ -233,8 +233,7 @@ void Cache::ExecuteCommandUrl(const std::string& command_url)
// Fill up cinfo.

Info myInfo(m_trace, false);
myInfo.SetBufferSize(block_size);
myInfo.SetFileSizeAndCreationTime(file_size);
myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
myInfo.SetAllBitsSynced();

for (int i = 0; i < at_count; ++i)
Expand Down
6 changes: 4 additions & 2 deletions src/XrdPfc/XrdPfcConfiguration.cc
Expand Up @@ -36,7 +36,7 @@ Configuration::Configuration() :
m_accHistorySize(20),
m_dirStatsMaxDepth(-1),
m_dirStatsStoreDepth(0),
m_bufferSize(256*1024),
m_bufferSize(128*1024),
m_RamAbsAvailable(0),
m_RamKeepStdBlocks(0),
m_wqueue_blocks(16),
Expand Down Expand Up @@ -303,7 +303,7 @@ bool Cache::Config(const char *config_filename, const char *parameters)
// Adjust default parameters for client/serverless caching
if (m_isClient)
{
m_configuration.m_bufferSize = 256 * 1024;
m_configuration.m_bufferSize = 128 * 1024; // same as normal.
m_configuration.m_wqueue_blocks = 8;
m_configuration.m_wqueue_threads = 1;
}
Expand Down Expand Up @@ -532,6 +532,8 @@ bool Cache::Config(const char *config_filename, const char *parameters)
}

m_log.Say(buff);

m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
}

// Derived settings
Expand Down

0 comments on commit 8caf77f

Please sign in to comment.