-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
fscache.py
303 lines (251 loc) · 9.69 KB
/
fscache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# -*- coding: utf-8 -*-
'''
Classes for salts filesystem cache for larger installations.
'''
# Import Python libs
import time
import os
from threading import Thread, Event
import multiprocessing
import signal
import logging
# Import salt libs
import salt.utils
import salt.config
from salt.caches.fsworker import FSWorker
import salt.log
# Import third party libs
try:
import zmq
HAS_ZMQ = True
except ImportError:
HAS_ZMQ = False
log = logging.getLogger(__name__)
class FSTimer(Thread):
'''
A basic timer class the fires timer-events every second.
'''
def __init__(self, opts, event):
Thread.__init__(self)
self.opts = opts
self.stopped = event
self.daemon = True
self.serial = salt.payload.Serial(opts.get('serial', ''))
self.timer_sock = os.path.join(self.opts['sock_dir'], 'fsc_timer.ipc')
def run(self):
'''
main loop that fires the event every second
'''
context = zmq.Context()
# the socket for outgoing timer events
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.LINGER, 100)
socket.bind('ipc:///' + self.timer_sock)
count = 0
log.debug('FSCache-Timer started')
while not self.stopped.wait(1):
socket.send(self.serial.dumps(count))
count += 1
if count >= 60:
count = 0
class FSCache(multiprocessing.Process):
'''
Provides access to the cache-system and manages the subprocesses
that do the cache updates in the background.
Access to the cache is available to any module that connects
to the FSCaches IPC-socket.
'''
def __init__(self, opts):
'''
starts the timer and inits the cache itself
'''
super(FSCache, self).__init__()
log.debug('FSCache initializing...')
# the possible settings for the cache
self.opts = opts
# all jobs the FSCache should run in intervals
self.jobs = {}
# the actual cached data
self.path_data = {}
# the timer provides 1-second intervals to the loop in run()
# to make the cache system most responsive, we do not use a loop-
# delay which makes it hard to get 1-second intervals without a timer
self.timer_stop = Event()
self.timer = FSTimer(self.opts, self.timer_stop)
self.timer.start()
self.running = True
self.cache_sock = os.path.join(self.opts['sock_dir'], 'fsc_cache.ipc')
self.update_sock = os.path.join(self.opts['sock_dir'], 'fsc_upd.ipc')
self.upd_t_sock = os.path.join(self.opts['sock_dir'], 'fsc_timer.ipc')
self.cleanup()
def signal_handler(self, sig, frame):
'''
handle signals and shutdown
'''
self.stop()
def cleanup(self):
log.debug('cleaning up')
if os.path.exists(self.cache_sock):
os.remove(self.cache_sock)
if os.path.exists(self.update_sock):
os.remove(self.update_sock)
if os.path.exists(self.upd_t_sock):
os.remove(self.upd_t_sock)
def secure(self):
if os.path.exists(self.cache_sock):
os.chmod(self.cache_sock, 0600)
if os.path.exists(self.update_sock):
os.chmod(self.update_sock, 0600)
if os.path.exists(self.upd_t_sock):
os.chmod(self.upd_t_sock, 0600)
def add_job(self, **kwargs):
'''
adds a new job to the FSCache
'''
req_vars = ['name', 'path', 'ival', 'patt']
# make sure new jobs have all variables set
for var in req_vars:
if var not in kwargs:
raise AttributeError('missing variable {0}'.format(var))
job_name = kwargs['name']
del kwargs['name']
self.jobs[job_name] = {}
self.jobs[job_name].update(kwargs)
def run_job(self, name):
'''
Creates a new subprocess to execute the given job in
'''
log.debug('Starting worker \'{0}\''.format(name))
sub_p = FSWorker(self.opts, name, **self.jobs[name])
sub_p.start()
def stop(self):
'''
shutdown cache process
'''
# avoid getting called twice
self.cleanup()
if self.running:
self.running = False
self.timer_stop.set()
self.timer.join()
def run(self):
'''
Main loop of the FSCache, checks schedule, retrieves result-data
from the workers and answer requests with data from the cache
'''
context = zmq.Context()
# the socket for incoming cache requests
creq_in = context.socket(zmq.REP)
creq_in.setsockopt(zmq.LINGER, 100)
creq_in.bind('ipc:///' + self.cache_sock)
# the socket for incoming cache-updates from workers
cupd_in = context.socket(zmq.REP)
cupd_in.setsockopt(zmq.LINGER, 100)
cupd_in.bind('ipc:///' + self.update_sock)
# wait for the timer to bind to its socket
log.debug('wait 2 secs for the timer')
time.sleep(2)
# the socket for the timer-event
timer_in = context.socket(zmq.PULL)
timer_in.setsockopt(zmq.LINGER, 100)
timer_in.connect('ipc:///' + self.upd_t_sock)
poller = zmq.Poller()
poller.register(creq_in, zmq.POLLIN)
poller.register(cupd_in, zmq.POLLIN)
poller.register(timer_in, zmq.POLLIN)
# our serializer
serial = salt.payload.Serial(self.opts.get('serial', ''))
# register a signal handler
signal.signal(signal.SIGINT, self.signal_handler)
# secure the sockets from the world
self.secure()
log.info('FSCache started')
log.debug('FSCache started')
while self.running:
# we check for new events with the poller
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
self.stop()
except zmq.ZMQError as t:
self.stop()
# check for next cache-request
if socks.get(creq_in) == zmq.POLLIN:
msg = serial.loads(creq_in.recv())
log.debug('Received request: {0}'.format(msg))
# we only accept requests as lists [req_id, <path>]
if isinstance(msg, list):
# for now only one item is assumed to be requested
msgid, file_n = msg[:]
log.debug('Looking for {0}:{1}'.format(msgid, file_n))
fdata = self.path_data.get(file_n, None)
if fdata is not None:
log.debug('Cache HIT')
else:
log.debug('Cache MISS')
# simulate slow caches
#randsleep = random.randint(0,3)
#time.sleep(randsleep)
# Send reply back to client
reply = serial.dumps([msgid, fdata])
creq_in.send(reply)
# wrong format, item not cached
else:
reply = serial.dumps([msgid, None])
creq_in.send(reply)
# check for next cache-update from workers
elif socks.get(cupd_in) == zmq.POLLIN:
new_c_data = serial.loads(cupd_in.recv())
# tell the worker to exit
cupd_in.send(serial.dumps('OK'))
# check if the returned data is usable
if not isinstance(new_c_data, dict):
log.error('Worker returned unusable result')
del new_c_data
continue
# the workers will return differing data:
# 1. '{'file1': <data1>, 'file2': <data2>,...}' - a cache update
# 2. '{search-path: None}' - job was not run, pre-checks failed
# 3. '{}' - no files found, check the pattern if defined?
# 4. anything else is considered malformed
if len(new_c_data) == 0:
log.debug('Got empty update from worker')
elif new_c_data.values()[0] is not None:
log.debug('Got cache update with {0} item(s)'.format(len(new_c_data)))
self.path_data.update(new_c_data)
else:
log.debug('Got malformed result dict from worker')
log.info('{0} entries in cache'.format(len(self.path_data)))
# check for next timer-event to start new jobs
elif socks.get(timer_in) == zmq.POLLIN:
sec_event = serial.loads(timer_in.recv())
log.debug('Timer event: #{0}'.format(sec_event))
# loop through the jobs and start if a jobs ival matches
for item in self.jobs:
if sec_event in self.jobs[item]['ival']:
self.run_job(item)
self.stop()
creq_in.close()
cupd_in.close()
timer_in.close()
context.term()
log.debug('Shutting down')\
if __name__ == '__main__':
def run_test():
opts = salt.config.master_config('./master')
wlk = FSCache(opts)
# add two jobs for jobs and cache-files
wlk.add_job(**{
'name': 'grains',
'path': '/var/cache/salt/master/minions',
'ival': [2, 12, 22],
'patt': '^.*$'
})
wlk.add_job(**{
'name': 'mine',
'path': '/var/cache/salt/master/jobs/',
'ival': [4, 14, 24, 34, 44, 54],
'patt': '^.*$'
})
wlk.start()
run_test()