Skip to content

Commit

Permalink
Merge pull request #1211 from osschar/pfc-monitor-rb1
Browse files Browse the repository at this point in the history
[pfc] g-stream file_close & write queue mem handling
  • Loading branch information
osschar committed Jun 2, 2020
2 parents 572ca9c + 454f588 commit 0aa6169
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 291 deletions.
254 changes: 180 additions & 74 deletions src/XrdPfc/XrdPfc.cc

Large diffs are not rendered by default.

60 changes: 41 additions & 19 deletions src/XrdPfc/XrdPfc.hh
Expand Up @@ -35,6 +35,7 @@
class XrdOucStream;
class XrdSysError;
class XrdSysTrace;
class XrdXrootdGStream;

namespace XrdCl
{
Expand Down Expand Up @@ -78,7 +79,7 @@ struct Configuration
m_dirStats(false),
m_bufferSize(1024*1024),
m_RamAbsAvailable(0),
m_NRamBuffers(-1),
m_RamKeepStdBlocks(0),
m_wqueue_blocks(16),
m_wqueue_threads(4),
m_prefetch_max_blocks(10),
Expand Down Expand Up @@ -121,7 +122,7 @@ struct Configuration

long long m_bufferSize; //!< prefetch buffer size, default 1MB
long long m_RamAbsAvailable; //!< available from configuration
int m_NRamBuffers; //!< number of total in-memory cache blocks, cached
int m_RamKeepStdBlocks; //!< number of standard-sized blocks kept after release
int m_wqueue_blocks; //!< maximum number of blocks written per write-queue loop
int m_wqueue_threads; //!< number of threads writing blocks to disk
int m_prefetch_max_blocks; //!< maximum number of blocks to prefetch per file
Expand Down Expand Up @@ -282,7 +283,7 @@ public:
//---------------------------------------------------------------------
//! Constructor
//---------------------------------------------------------------------
Cache(XrdSysLogger *logger);
Cache(XrdSysLogger *logger, XrdOucEnv *env);

//---------------------------------------------------------------------
//! Obtain a new IO object that fronts existing XrdOucCacheIO.
Expand Down Expand Up @@ -334,7 +335,7 @@ public:
//---------------------------------------------------------------------
//! Singleton creation.
//---------------------------------------------------------------------
static Cache &CreateInstance(XrdSysLogger *logger);
static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);

//---------------------------------------------------------------------
//! Singleton access.
Expand All @@ -347,7 +348,12 @@ public:
static bool VCheck(XrdVersionInfo &urVersion) { return true; }

//---------------------------------------------------------------------
//! Thread function running disk cache purge periodically.
//! Thread function checking resource usage periodically.
//---------------------------------------------------------------------
void ResourceMonitorHeartBeat();

//---------------------------------------------------------------------
//! Thread function invoked to scan and purge files from disk when needed.
//---------------------------------------------------------------------
void Purge();

Expand All @@ -372,9 +378,8 @@ public:
//---------------------------------------------------------------------
void ProcessWriteTasks();

bool RequestRAMBlock();

void RAMBlockReleased();
char* RequestRAM(long long size);
void ReleaseRAM(char* buf, long long size);

void RegisterPrefetchFile(File*);
void DeRegisterPrefetchFile(File*);
Expand All @@ -383,7 +388,7 @@ public:

void Prefetch();

XrdOss* GetOss() const { return m_output_fs; }
XrdOss* GetOss() const { return m_oss; }

bool IsFileActiveOrPurgeProtected(const std::string&);

Expand All @@ -398,6 +403,8 @@ public:
XrdSysError* GetLog() { return &m_log; }
XrdSysTrace* GetTrace() { return m_trace; }

XrdXrootdGStream* GetGStream() { return m_gstream; }

void ExecuteCommandUrl(const std::string& command_url);

static XrdScheduler *schedP;
Expand All @@ -412,26 +419,31 @@ private:

int UnlinkCommon(const std::string& f_name, bool fail_if_open);

static Cache *m_factory; //!< this object
static Cache *m_instance; //!< this object

XrdOucEnv *m_env; //!< environment passed in at creation
XrdSysError m_log; //!< XrdPfc namespace logger
XrdSysTrace *m_trace;
const char *m_traceID;

XrdOucCacheStats m_ouc_stats; //!<
XrdOss *m_output_fs; //!< disk cache file system
XrdOss *m_oss; //!< disk cache file system

std::vector<XrdPfc::Decision*> m_decisionpoints; //!< decision plugins
XrdXrootdGStream *m_gstream;

std::map<std::string, long long> m_filesInQueue;
std::vector<XrdPfc::Decision*> m_decisionpoints; //!< decision plugins

Configuration m_configuration; //!< configurable parameters

XrdSysCondVar m_prefetch_condVar; //!< lock for vector of prefetching files
bool m_prefetch_enabled; //!< set to true when prefetching is enabled

XrdSysMutex m_RAMblock_mutex; //!< lock for allcoation of RAM blocks
int m_RAMblocks_used;
XrdSysMutex m_RAM_mutex; //!< lock for allcoation of RAM blocks
long long m_RAM_used;
long long m_RAM_write_queue;
std::list<char*> m_RAM_std_blocks; //!< A list of blocks of standard size, to be reused.
int m_RAM_std_size;

bool m_isClient; //!< True if running as client

struct WriteQ
Expand All @@ -453,11 +465,11 @@ private:
typedef StatsMMap_t::iterator StatsMMap_i;
typedef std::set<std::string> FNameSet_t;

ActiveMap_t m_active;
ActiveMap_t m_active; //!< Map of currently active / open files.
StatsMMap_t m_closed_files_stats;
FNameSet_t m_purge_delay_set;
bool m_in_purge;
XrdSysCondVar m_active_cond;
XrdSysCondVar m_active_cond; //!< Cond-var protecting active file data structures.

void inc_ref_cnt(File*, bool lock, bool high_debug);
void dec_ref_cnt(File*, bool high_debug);
Expand All @@ -468,8 +480,18 @@ private:
typedef std::vector<File*> PrefetchList;
PrefetchList m_prefetchList;

// directory state for access / usage info and quotas
DataFsState *m_fs_state;
//---------------------------------------------------------------------------
// Statistics, heart-beat, scan-and-purge

enum ScanAndPurgeThreadState_e { SPTS_Idle, SPTS_Scan, SPTS_Purge, SPTS_Done };

XrdSysCondVar m_stats_n_purge_cond; //!< communication between heart-beat and scan-purge threads

DataFsState *m_fs_state; //!< directory state for access / usage info and quotas

int m_last_scan_duration;
int m_last_purge_duration;
ScanAndPurgeThreadState_e m_spt_state;

void copy_out_active_stats_and_update_data_fs_state();
};
Expand Down
22 changes: 14 additions & 8 deletions src/XrdPfc/XrdPfcConfiguration.cc
Expand Up @@ -227,19 +227,19 @@ bool Cache::Config(const char *config_filename, const char *parameters)
// Load OSS plugin.
if (ofsCfg->Load(XrdOfsConfigPI::theOssLib))
{
ofsCfg->Plugin(m_output_fs);
ofsCfg->Plugin(m_oss);
}
else
{
TRACE(Error, "Cache::Config() Unable to create an OSS object");
m_output_fs = 0;
m_oss = 0;
return false;
}

// sets default value for disk usage
XrdOssVSInfo sP;
{
if (m_output_fs->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
{
m_log.Emsg("Cache::ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
return false;
Expand Down Expand Up @@ -312,7 +312,8 @@ bool Cache::Config(const char *config_filename, const char *parameters)
snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
m_log.Say("Config info: ", buff);
}
m_configuration.m_NRamBuffers = static_cast<int>(m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize);
// Setup number of standard-size blocks not released back to the system to 5% of total RAM.
m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;


// Set tracing to debug if this is set in environment
Expand Down Expand Up @@ -383,12 +384,17 @@ bool Cache::Config(const char *config_filename, const char *parameters)
m_log.Say(buff);
}

m_log.Say("------ File Caching Proxy interface initialization ", retval ? "completed" : "failed");
// Derived settings
m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
Info::s_maxNumAccess = m_configuration.m_accHistorySize;

if (ofsCfg) delete ofsCfg;
m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");

// Broadcast settings as needed:
Info::s_maxNumAccess = m_configuration.m_accHistorySize;
m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive");

m_log.Say("------ Proxy File Cache configuration parsing ", retval ? "completed" : "failed");

if (ofsCfg) delete ofsCfg;

return retval;
}
Expand Down

0 comments on commit 0aa6169

Please sign in to comment.