-
Notifications
You must be signed in to change notification settings - Fork 149
/
XrdPfc.hh
501 lines (402 loc) · 17.6 KB
/
XrdPfc.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
#ifndef __XRDPFC_CACHE_HH__
#define __XRDPFC_CACHE_HH__
//----------------------------------------------------------------------------------
// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
//----------------------------------------------------------------------------------
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY emacs WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//----------------------------------------------------------------------------------
#include <string>
#include <list>
#include <map>
#include <set>
#include "Xrd/XrdScheduler.hh"
#include "XrdVersion.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucCache.hh"
#include "XrdOuc/XrdOucCallBack.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdPfcFile.hh"
#include "XrdPfcDecision.hh"
class XrdOucStream;
class XrdSysError;
class XrdSysTrace;
class XrdXrootdGStream;
namespace XrdCl
{
class Log;
}
namespace XrdPfc
{
class File;
class IO;
class DataFsState;
}
namespace XrdPfc
{
//----------------------------------------------------------------------------
//! Contains parameters configurable from the xrootd config file.
//----------------------------------------------------------------------------
struct Configuration
{
Configuration() :
m_hdfsmode(false),
m_allow_xrdpfc_command(false),
m_data_space("public"),
m_meta_space("public"),
m_diskTotalSpace(-1),
m_diskUsageLWM(-1),
m_diskUsageHWM(-1),
m_fileUsageBaseline(-1),
m_fileUsageNominal(-1),
m_fileUsageMax(-1),
m_purgeInterval(300),
m_purgeColdFilesAge(-1),
m_purgeColdFilesPeriod(-1),
m_accHistorySize(20),
m_dirStatsMaxDepth(-1),
m_dirStatsStoreDepth(-1),
m_dirStats(false),
m_bufferSize(1024*1024),
m_RamAbsAvailable(0),
m_RamKeepStdBlocks(0),
m_wqueue_blocks(16),
m_wqueue_threads(4),
m_prefetch_max_blocks(10),
m_hdfsbsize(128*1024*1024),
m_flushCnt(2000)
{}
bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0; }
bool is_purge_plugin_set_up() const { return false; }
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu);
// This might become more complicated with per-dir purge policy
bool are_dirstats_enabled() const { return m_dirStats; }
bool m_hdfsmode; //!< flag for enabling block-level operation
bool m_allow_xrdpfc_command; //!< flag for enabling access to /xrdpfc-command/ functionality.
std::string m_username; //!< username passed to oss plugin
std::string m_data_space; //!< oss space for data files
std::string m_meta_space; //!< oss space for metadata files (cinfo)
long long m_diskTotalSpace; //!< total disk space on configured partition or oss space
long long m_diskUsageLWM; //!< cache purge - disk usage low water mark
long long m_diskUsageHWM; //!< cache purge - disk usage high water mark
long long m_fileUsageBaseline; //!< cache purge - files usage baseline
long long m_fileUsageNominal; //!< cache purge - files usage nominal
long long m_fileUsageMax; //!< cache purge - files usage maximum
int m_purgeInterval; //!< sleep interval between cache purges
int m_purgeColdFilesAge; //!< purge files older than this age
int m_purgeColdFilesPeriod; //!< peform cold file purge every this many purge cycles
int m_accHistorySize; //!< max number of entries in access history part of cinfo file
std::set<std::string> m_dirStatsDirs; //!< directories for which stat reporting was requested
std::set<std::string> m_dirStatsDirGlobs; //!< directory globs for which stat reporting was requested
int m_dirStatsMaxDepth; //!< maximum depth for statistics write out
int m_dirStatsStoreDepth; //!< depth to which statistics should be collected
bool m_dirStats; //!< is directory access / usage statistics enabled
long long m_bufferSize; //!< prefetch buffer size, default 1MB
long long m_RamAbsAvailable; //!< available from configuration
int m_RamKeepStdBlocks; //!< number of standard-sized blocks kept after release
int m_wqueue_blocks; //!< maximum number of blocks written per write-queue loop
int m_wqueue_threads; //!< number of threads writing blocks to disk
int m_prefetch_max_blocks; //!< maximum number of blocks to prefetch per file
long long m_hdfsbsize; //!< used with m_hdfsmode, default 128MB
long long m_flushCnt; //!< nuber of unsynced blcoks on disk before flush is called
};
//------------------------------------------------------------------------------
struct TmpConfiguration
{
std::string m_diskUsageLWM;
std::string m_diskUsageHWM;
std::string m_fileUsageBaseline;
std::string m_fileUsageNominal;
std::string m_fileUsageMax;
std::string m_flushRaw;
TmpConfiguration() :
m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
m_flushRaw("")
{}
};
//==============================================================================
struct SplitParser
{
char *str;
const char *delim;
char *state;
bool first;
SplitParser(const std::string &s, const char *d) :
str(strdup(s.c_str())), delim(d), state(0), first(true)
{}
~SplitParser() { free(str); }
char* get_token()
{
if (first) { first = false; return strtok_r(str, delim, &state); }
else { return strtok_r(0, delim, &state); }
}
char* get_reminder_with_delim()
{
if (first) { return str; }
else { *(state - 1) = delim[0]; return state - 1; }
}
char *get_reminder()
{
return first ? str : state;
}
int fill_argv(std::vector<char*> &argv)
{
if (!first) return 0;
int dcnt = 0; { char *p = str; while (*p) { if (*(p++) == delim[0]) ++dcnt; } }
argv.reserve(dcnt + 1);
int argc = 0;
char *i = strtok_r(str, delim, &state);
while (i)
{
++argc;
argv.push_back(i);
// printf(" arg %d : '%s'\n", argc, i);
i = strtok_r(0, delim, &state);
}
return argc;
}
};
struct PathTokenizer : private SplitParser
{
std::vector<const char*> m_dirs;
const char *m_reminder;
int m_n_dirs;
PathTokenizer(const std::string &path, int max_depth, bool parse_as_lfn) :
SplitParser(path, "/"),
m_reminder (0)
{
// If parse_as_lfn is true store final token into reminder, regardless of maxdepth.
// This assumes the last token is a file name (and full path if lfn, including the file name).
m_dirs.reserve(max_depth);
char *t;
for (int i = 0; i < max_depth; ++i)
{
t = get_token();
if (t == 0) break;
m_dirs.emplace_back(t);
}
if (parse_as_lfn && (t == 0 || * get_reminder() == 0))
{
m_reminder = m_dirs.back();
m_dirs.pop_back();
}
else
{
m_reminder = get_reminder();
}
m_n_dirs = (int) m_dirs.size();
}
int get_n_dirs()
{
return m_n_dirs;
}
const char *get_dir(int pos)
{
if (pos >= m_n_dirs) return 0;
return m_dirs[pos];
}
std::string make_path()
{
std::string res;
for (std::vector<const char*>::iterator i = m_dirs.begin(); i != m_dirs.end(); ++i)
{
res += "/";
res += *i;
}
if (m_reminder != 0)
{
res += "/";
res += m_reminder;
}
return res;
}
void deboog()
{
printf("PathTokenizer::deboog size=%d\n", m_n_dirs);
for (int i = 0; i < m_n_dirs; ++i)
{
printf(" %2d: %s\n", i, m_dirs[i]);
}
printf(" rem: %s\n", m_reminder);
}
};
//==============================================================================
// Cache
//==============================================================================
//----------------------------------------------------------------------------
//! Attaches/creates and detaches/deletes cache-io objects for disk based cache.
//----------------------------------------------------------------------------
class Cache : public XrdOucCache
{
public:
//---------------------------------------------------------------------
//! Constructor
//---------------------------------------------------------------------
Cache(XrdSysLogger *logger, XrdOucEnv *env);
//---------------------------------------------------------------------
//! Obtain a new IO object that fronts existing XrdOucCacheIO.
//---------------------------------------------------------------------
using XrdOucCache::Attach;
virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
//---------------------------------------------------------------------
// Virtual function of XrdOucCache. Used for redirection to a local
// file on a distributed FS.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
LFP_Reason why=ForAccess, bool forall=false);
//---------------------------------------------------------------------
// Virtual function of XrdOucCache. Used for deferred open.
virtual int Prepare(const char *url, int oflags, mode_t mode);
// virtual function of XrdOucCache.
virtual int Stat(const char *url, struct stat &sbuff);
// virtual function of XrdOucCache.
virtual int Unlink(const char *url);
//--------------------------------------------------------------------
//! \brief Makes decision if the original XrdOucCacheIO should be cached.
//!
//! @param & URL of file
//!
//! @return decision if IO object will be cached.
//--------------------------------------------------------------------
bool Decide(XrdOucCacheIO*);
//------------------------------------------------------------------------
//! Reference XrdPfc configuration
//------------------------------------------------------------------------
const Configuration& RefConfiguration() const { return m_configuration; }
//---------------------------------------------------------------------
//! \brief Parse configuration file
//!
//! @param config_filename path to configuration file
//! @param parameters optional parameters to be passed
//!
//! @return parse status
//---------------------------------------------------------------------
bool Config(const char *config_filename, const char *parameters);
//---------------------------------------------------------------------
//! Singleton creation.
//---------------------------------------------------------------------
static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
//---------------------------------------------------------------------
//! Singleton access.
//---------------------------------------------------------------------
static Cache &GetInstance();
//---------------------------------------------------------------------
//! Version check.
//---------------------------------------------------------------------
static bool VCheck(XrdVersionInfo &urVersion) { return true; }
//---------------------------------------------------------------------
//! Thread function checking resource usage periodically.
//---------------------------------------------------------------------
void ResourceMonitorHeartBeat();
//---------------------------------------------------------------------
//! Thread function invoked to scan and purge files from disk when needed.
//---------------------------------------------------------------------
void Purge();
//---------------------------------------------------------------------
//! Remove file from cache unless it is currently open.
//---------------------------------------------------------------------
int UnlinkUnlessOpen(const std::string& f_name);
//---------------------------------------------------------------------
//! Add downloaded block in write queue.
//---------------------------------------------------------------------
void AddWriteTask(Block* b, bool from_read);
//---------------------------------------------------------------------
//! \brief Remove blocks from write queue which belong to given prefetch.
//! This method is used at the time of File destruction.
//---------------------------------------------------------------------
void RemoveWriteQEntriesFor(File *f);
//---------------------------------------------------------------------
//! Separate task which writes blocks from ram to disk.
//---------------------------------------------------------------------
void ProcessWriteTasks();
char* RequestRAM(long long size);
void ReleaseRAM(char* buf, long long size);
void RegisterPrefetchFile(File*);
void DeRegisterPrefetchFile(File*);
File* GetNextFileToPrefetch();
void Prefetch();
XrdOss* GetOss() const { return m_oss; }
bool IsFileActiveOrPurgeProtected(const std::string&);
File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
void ReleaseFile(File*, IO*);
void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
void FileSyncDone(File*, bool high_debug);
XrdSysError* GetLog() { return &m_log; }
XrdSysTrace* GetTrace() { return m_trace; }
XrdXrootdGStream* GetGStream() { return m_gstream; }
void ExecuteCommandUrl(const std::string& command_url);
static XrdScheduler *schedP;
private:
bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
bool ConfigXeq(char *, XrdOucStream &);
bool xdlib(XrdOucStream &);
bool xtrace(XrdOucStream &);
bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
int UnlinkCommon(const std::string& f_name, bool fail_if_open);
static Cache *m_instance; //!< this object
XrdOucEnv *m_env; //!< environment passed in at creation
XrdSysError m_log; //!< XrdPfc namespace logger
XrdSysTrace *m_trace;
const char *m_traceID;
XrdOucCacheStats m_ouc_stats; //!<
XrdOss *m_oss; //!< disk cache file system
XrdXrootdGStream *m_gstream;
std::vector<XrdPfc::Decision*> m_decisionpoints; //!< decision plugins
Configuration m_configuration; //!< configurable parameters
XrdSysCondVar m_prefetch_condVar; //!< lock for vector of prefetching files
bool m_prefetch_enabled; //!< set to true when prefetching is enabled
XrdSysMutex m_RAM_mutex; //!< lock for allcoation of RAM blocks
long long m_RAM_used;
long long m_RAM_write_queue;
std::list<char*> m_RAM_std_blocks; //!< A list of blocks of standard size, to be reused.
int m_RAM_std_size;
bool m_isClient; //!< True if running as client
struct WriteQ
{
WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
XrdSysCondVar condVar; //!< write list condVar
std::list<Block*> queue; //!< container
long long writes_between_purges; //!< upper bound on amount of bytes written between two purge passes
int size; //!< current size of write queue
};
WriteQ m_writeQ;
// active map, purge delay set
typedef std::map<std::string, File*> ActiveMap_t;
typedef ActiveMap_t::iterator ActiveMap_i;
typedef std::multimap<std::string, XrdPfc::Stats> StatsMMap_t;
typedef StatsMMap_t::iterator StatsMMap_i;
typedef std::set<std::string> FNameSet_t;
ActiveMap_t m_active; //!< Map of currently active / open files.
StatsMMap_t m_closed_files_stats;
FNameSet_t m_purge_delay_set;
bool m_in_purge;
XrdSysCondVar m_active_cond; //!< Cond-var protecting active file data structures.
void inc_ref_cnt(File*, bool lock, bool high_debug);
void dec_ref_cnt(File*, bool high_debug);
void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
// prefetching
typedef std::vector<File*> PrefetchList;
PrefetchList m_prefetchList;
//---------------------------------------------------------------------------
// Statistics, heart-beat, scan-and-purge
enum ScanAndPurgeThreadState_e { SPTS_Idle, SPTS_Scan, SPTS_Purge, SPTS_Done };
XrdSysCondVar m_stats_n_purge_cond; //!< communication between heart-beat and scan-purge threads
DataFsState *m_fs_state; //!< directory state for access / usage info and quotas
int m_last_scan_duration;
int m_last_purge_duration;
ScanAndPurgeThreadState_e m_spt_state;
void copy_out_active_stats_and_update_data_fs_state();
};
}
#endif