-
Notifications
You must be signed in to change notification settings - Fork 0
/
misc.py
261 lines (213 loc) · 8.7 KB
/
misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
'''fitz.misc - various useful (but unrelated) one-off functions and classes
'''
import os
import os.path
import logging
import numpy as np
log = logging.getLogger('fitz')
def assert_directory_exists(path):
'''create the directory if it does not exist
designed for *nix, but might be cross-platform. I haven\'t tested.
'''
if path == '' or path == '/':
return
if not os.path.exists(path):
assert_directory_exists(os.path.dirname(path))
os.mkdir(path)
elif not os.path.isdir(path):
raise AssertionError, '%r is not a directory, but it should be' % path
def findall(pattern, data):
'''finds all non-overlapping occurances of "pattern" in the string "data"
return value is a list of re.Match objects
'''
matches = []
pos = 0
while True:
match = pattern.search(data, pos=pos)
if match is None:
return matches
matches.append(match)
pos = match.end(0)
class ShortCircuitCallbacks(Exception): pass
class Callbacks:
'''Base class for objects that support registering and triggering callbacks
a bit like the signals on gobjects from glib, but written in python
'''
def __init__(self):
self._callbacks = {}
self._suppressed_callbacks = []
def register_callback(self, signal, callback, *args):
'''register a function to be called when signal is triggered
callback should be a callable object. It will be called with
the *args given here followed by the *args given by trigger_callback()
The **kwargs given to trigger_callback() are also passed.
The return value is a callable object that may be used to unregister
the callback. There is no other way to unregister a callback.
A callbacks may raise a ShortCircuitCallbacks instance to prevent
further callbacks from getting called. The return value of
.trigger_callback() will be the first arg passed to
ShortCircuitCallbacks.
'''
callback_list = self._callbacks.setdefault(signal, [])
entry = (callback, args)
callback_list.append(entry)
return lambda : callback_list.remove(entry)
def trigger_callback(self, signal, *args, **kwargs):
'trigger the signal'
if self.callback_suppressed(signal):
return
log.debug('callback triggered: %s %r %r'%(signal, args, kwargs))
try:
callbacks = self._callbacks.get(signal,())
if len(callbacks) == 0:
log.warn('%r was triggered, but it has no handlers! '
'maybe it\'s misspelled?'%signal)
for callback,curried_args in callbacks:
callback(*(curried_args+args), **kwargs)
except ShortCircuitCallbacks, e:
if len(e.args) > 0:
return e.args[0]
def suppress_callback(self, signal):
'any calls to trigger_callback() using signal will be ignored'
self._suppressed_callbacks.append(signal)
def callback_suppressed(self, signal):
'query whether signal is being suppressed'
return signal in self._suppressed_callbacks
def resume_callback(self, signal):
'opposite of suppress_callback: trigger_callback() will work again'
if self.callback_suppressed(signal):
self._suppressed_callbacks.remove(signal)
def cached_property(producer_func):
'''decorator for caching the results of (mostly) static calculations
If the calculations are not actually static, you can force a
recalculation on next attribute access by using del on the attribute
to clear the cache. See usage below for more.
Usage:
from fitz.misc import cached_property
class k:
@cached_property
def blerg(self):
[time-consuming calculation]
return value
i = k()
i.blerg # does the calculation and returns the value
i.blerg # retrieves value from cache without running the code
i.blerg # still in the cache, so no calculation
del i.blerg # clears the cached value
i.blerg # does the calculation again since the cache was cleared
implementation detail: the attribute "_property_cache" will be added to
the instance
'''
def cached_property_getter(self):
try:
cache = self._property_cache
except AttributeError:
cache = self._property_cache = {}
try:
return cache[producer_func.__name__]
except KeyError:
rval = producer_func(self)
cache[producer_func.__name__] = rval
return rval
def cached_property_deleter(self):
try:
cache = self._property_cache
except AttributeError:
cache = self._property_cache = {}
try:
del cache[producer_func.__name__]
except KeyError:
pass
return property(cached_property_getter, None, cached_property_deleter)
class AtomicFileWriter(file):
'''subclass of file that makes overwriting files an atomic operation
That is, either the whole file is committed to disk, or else the old
version of the file is still in place. This is accomplished by writing
the data to x.tmp, and then renaming x.tmp to x when .close() is called.
On many filesystems (eg ext3 with the default data=ordered option) this
ensures that when the file x already exists, it is "overwritten"
atomically.
This is very useful for program state and config files.
'''
def __init__(self, path, mode='w'):
assert 'w' in mode
self.final_path = path
self.initial_path = path+'.tmp'
file.__init__(self, self.initial_path, mode)
self._close_has_been_called = False
def __del__(self):
self.close()
def close(self):
file.close(self)
if not self._close_has_been_called:
os.rename(self.initial_path, self.final_path)
self._close_has_been_called = True
def dependency(target, *prereqs):
"""decorator for creating makefile-like lazily-evaluated rules
Typical usage might be something like this:
from fitz.misc import dependency
@dependency('index.html', 'blog-entries.xml', 'blog-entries-index.xsl',
'blog-entries-template.xsl')
def _(outfile, xmlfile, xsltfile, *rest):
spawn_process('xsltproc', '--xinclude', '--output', outfile,
xsltfile, xmlfile)
The first argument to dependency() is a filename that the rule is
responsible for creating or updating, called the target. The *rest of
the arguments are files that the target depends upon. If any of them
are newer than the target (according to mtime) or if the target doesn't
even exist yet, then the decorated function is called. Otherwise,
everything is assumed to be up-to-date and the decorated function is
never called.
"""
def check_dependency(creator_func):
if os.path.exists(target):
target_mtime = os.stat(target).st_mtime
for prereq in prereqs:
if target_mtime < os.stat(prereq).st_mtime:
break
else:
log.debug('target is up-to-date: %s' % target)
return
log.info('creating target %s' % target)
creator_func(target, *prereqs)
return check_dependency
def grepfile(filename, pattern, outfilename=None):
"""'grep' the file for the given pattern.
This is a simple module to emulate the bash shell utility 'grep'. The
pattern should be a regular expression in the 're' module syntax, as it
will be compiled with said module. An outputfile for the result may
optionally be specified.
"""
lines = []
if outfilename:
fout = open(outfilename,'w')
find = re.compile(pattern).search
for line in file(filename):
if find(line):
if outfilename:
fout.write(line)
else:
lines.append(line)
if outfilename:
fout.close()
return lines
def find(arg,dirname,files):
"""callback function for os.path.walk() to emulate Unix's 'find' utility.
"""
for filename in files:
if arg in filename:
print dirname+filename
def load_column(fname, col_num, comment_symbols=['#','@']):
"""Load the data from the column 'colnum' from the data file 'fname' into a
1d array and return it, ignoring lines with the comment_symbol.
"""
lines = file(fname).read().strip().split('\n')
data_lst = []
for line in lines:
if line[0] not in comment_symbols:
try:
data_lst.append(float(line.split()[col_num]))
except:
pass
data = np.array(data_lst)
return data