/
XrdFileCachePurge.cc
214 lines (184 loc) · 7.16 KB
/
XrdFileCachePurge.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#include "XrdFileCache.hh"
#include "XrdFileCacheTrace.hh"
using namespace XrdFileCache;
#include <fcntl.h>
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucTrace.hh"
namespace
{
class FPurgeState
{
public:
struct FS
{
std::string path;
long long nByte;
FS(const char* p, long long n) : path(p), nByte(n) {}
};
typedef std::multimap<time_t, FS> map_t;
typedef map_t::iterator map_i;
FPurgeState(long long iNByteReq) : nByteReq(iNByteReq), nByteAccum(0) {}
map_t fmap;
void checkFile (time_t iTime, const char* iPath, long long iNByte)
{
if (nByteAccum < nByteReq || iTime < fmap.rbegin()->first)
{
fmap.insert(std::pair<const time_t, FS> (iTime, FS(iPath, iNByte)));
nByteAccum += iNByte;
// remove newest files from map if necessary
while (nByteAccum > nByteReq)
{
time_t nt = fmap.begin()->first;
std::pair<map_i, map_i> ret = fmap.equal_range(nt);
for (map_i it2 = ret.first; it2 != ret.second; ++it2)
nByteAccum -= it2->second.nByte;
fmap.erase(ret.first, ret.second);
}
}
}
private:
long long nByteReq;
long long nByteAccum;
};
XrdOucTrace* GetTrace()
{
// needed for logging macros
return Cache::GetInstance().GetTrace();
}
void FillFileMapRecurse( XrdOssDF* iOssDF, const std::string& path, FPurgeState& purgeState)
{
char buff[256];
XrdOucEnv env;
int rdr;
const size_t InfoExtLen = strlen(XrdFileCache::Info::m_infoExtension); // cached var
static const char* m_traceID = "Purge";
Cache& factory = Cache::GetInstance();
while ((rdr = iOssDF->Readdir(&buff[0], 256)) >= 0)
{
// printf("readdir [%s]\n", buff);
std::string np = path + "/" + std::string(buff);
size_t fname_len = strlen(&buff[0]);
if (fname_len == 0)
{
// std::cout << "Finish read dir.[" << np <<"] Break loop \n";
break;
}
if (strncmp("..", &buff[0], 2) && strncmp(".", &buff[0], 1))
{
XrdOssDF* dh = factory.GetOss()->newDir(factory.RefConfiguration().m_username.c_str());
XrdOssDF* fh = factory.GetOss()->newFile(factory.RefConfiguration().m_username.c_str());
if (fname_len > InfoExtLen && strncmp(&buff[fname_len - InfoExtLen], XrdFileCache::Info::m_infoExtension, InfoExtLen) == 0)
{
// XXXX MT - shouldn't we also check if it is currently opened?
Info cinfo(Cache::GetInstance().GetTrace());
if (fh->Open(np.c_str(), O_RDONLY, 0600, env) == XrdOssOK && cinfo.Read(fh, np))
{
time_t accessTime;
if (cinfo.GetLatestDetachTime(accessTime, fh))
{
TRACE(Dump, "FillFileMapRecurse() checking " << buff << " accessTime " << accessTime);
purgeState.checkFile(accessTime, np.c_str(), cinfo.GetNDownloadedBytes());
}
else
{
// cinfo file does not contain any known accesses, use stat.mtime instead.
TRACE(Warning, "FillFileMapRecurse() could not get access time for " << np << ", trying stat");
XrdOss* oss = Cache::GetInstance().GetOss();
struct stat fstat;
if (oss->Stat(np.c_str(), &fstat) == XrdOssOK)
{
accessTime = fstat.st_mtime;
TRACE(Dump, "FillFileMapRecurse() have access time for " << np << " via stat: " << accessTime);
purgeState.checkFile(accessTime, np.c_str(), cinfo.GetNDownloadedBytes());
}
else
{
// This really shouldn't happen ... but if it does remove cinfo and the data file right away.
TRACE(Warning, "FillFileMapRecurse() could not get access time for " << np);
oss->Unlink(np.c_str());
np = np.substr(0, np.size() - strlen(XrdFileCache::Info::m_infoExtension));
oss->Unlink(np.c_str());
}
}
}
else
{
TRACE(Warning, "FillFileMapRecurse() can't open or read " << np << " " << strerror(errno));
// XXXX Purge it!
}
}
else if (dh->Opendir(np.c_str(), env) == XrdOssOK)
{
FillFileMapRecurse(dh, np, purgeState);
}
delete dh; dh = 0;
delete fh; fh = 0;
}
}
}
}
void Cache::CacheDirCleanup()
{
// check state every sleep seconds
const static int sleept = 300;
struct stat fstat;
XrdOucEnv env;
XrdOss* oss = Cache::GetInstance().GetOss();
XrdOssVSInfo sP;
while (1)
{
// get amount of space to erase
long long bytesToRemove = 0;
if (oss->StatVS(&sP, "public", 1) < 0)
{
TRACE(Error, "Cache::CacheDirCleanup() can't get statvs for dir " << m_configuration.m_cache_dir.c_str());
exit(1);
}
else
{
long long ausage = sP.Total - sP.Free;
TRACE(Debug, "Cache::CacheDirCleanup() occupates disk space == " << ausage);
if (ausage > m_configuration.m_diskUsageHWM)
{
bytesToRemove = ausage - m_configuration.m_diskUsageLWM;
TRACE(Debug, "Cache::CacheDirCleanup() need space for " << bytesToRemove << " bytes");
}
}
if (bytesToRemove > 0)
{
// make a sorted map of file patch by access time
XrdOssDF* dh = oss->newDir(m_configuration.m_username.c_str());
if (dh->Opendir(m_configuration.m_cache_dir.c_str(), env) == XrdOssOK)
{
FPurgeState purgeState(bytesToRemove * 5 / 4); // prepare 20% more volume than required
FillFileMapRecurse(dh, m_configuration.m_cache_dir, purgeState);
// loop over map and remove files with highest value of access time
for (FPurgeState::map_i it = purgeState.fmap.begin(); it != purgeState.fmap.end(); ++it)
{
// XXXX MT - shouldn't we re-check if the file is currently opened?
std::string path = it->second.path;
// remove info file
if (oss->Stat(path.c_str(), &fstat) == XrdOssOK)
{
bytesToRemove -= fstat.st_size;
oss->Unlink(path.c_str());
TRACE(Info, "Cache::CacheDirCleanup() removed file:" << path << " size: " << fstat.st_size);
}
// remove data file
path = path.substr(0, path.size() - strlen(XrdFileCache::Info::m_infoExtension));
if (oss->Stat(path.c_str(), &fstat) == XrdOssOK)
{
bytesToRemove -= it->second.nByte;
oss->Unlink(path.c_str());
TRACE(Info, "Cache::CacheDirCleanup() removed file: %s " << path << " size " << it->second.nByte);
}
if (bytesToRemove <= 0)
break;
}
}
dh->Close();
delete dh; dh =0;
}
sleep(sleept);
}
}