-
Notifications
You must be signed in to change notification settings - Fork 4
/
depgraph.py
375 lines (309 loc) · 11 KB
/
depgraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import os
import itertools
class Reason(object):
""" A Reason describes why a build step is performed.
Parameters
----------
explanation : str
"""
def __init__(self, explanation):
self._explanation = explanation
def __str__(self):
return self._explanation
PARENTMISSING = Reason("a parent doesn't exist")
PARENTNEWER = Reason("a parent is newer than the child")
MISSING = Reason("the dataset doesn't exist")
class Dataset(object):
""" Dataset represents a dataset or a step along a dependency chain.
Parameters
----------
name : str
imagined to be a filename
Other keyword arguments are accessible as instance attributes.
"""
__hash__ = object.__hash__
def __init__(self, name, **kw):
self.name = name
self._parents = set()
self._children = set()
self._store = kw
return
def __str__(self):
return self.name
def __eq__(self, other):
return (self.name == other.name) and (self._store == other._store)
def __neq__(self, other):
return not (self == other)
def __getattr__(self, name):
if name in object.__getattribute__(self, "_store"):
return object.__getattribute__(self, "_store")[name]
else:
raise AttributeError("'{0}'".format(name))
@property
def min_age(self):
return os.stat(self.name).st_mtime
@property
def max_age(self):
return os.stat(self.name).st_mtime
@property
def exists(self):
return os.path.exists(self.name)
def is_older_than(self, other):
return self.max_age < other.min_age
def dependson(self, *datasets):
""" Declare that Dataset depends on one or more other Dataset instances.
Does not affect previous declarations.
"""
self._parents = self._parents.union(datasets)
for parent in datasets:
if self not in parent._children:
parent._children = parent._children.union((self,))
def parents(self, depth=-1):
""" Return the Dataset instances that this Dataset depends on.
Parameters
----------
depth : int
Recursion depth. 0 means no recursion, while -1 means infinite
recursion.
"""
yielded = {}
for dataset in self._parents:
if dataset not in yielded:
yielded[dataset] = True
yield dataset
if depth != 0:
for grandparent in dataset.parents(depth=depth-1):
if grandparent not in yielded:
yielded[grandparent] = True
yield grandparent
def children(self, depth=-1):
""" Return the Dataset instances that depend on this Dataset.
Parameters
----------
depth : int
Recursion depth. 0 means no recursion, while -1 means infinite
recursion.
"""
yielded = {}
for dataset in self._children:
if dataset not in yielded:
yielded[dataset] = True
yield dataset
if depth != 0:
for grandchild in dataset.children(depth=depth-1):
if grandchild not in yielded:
yielded[grandchild] = True
yield grandchild
def buildnext(self, ignore=None):
""" Generator for datasets that require building/rebuilding in order to
build this (target) Dataset, given the present state of the dependency
graph.
These targets are necessary but not always sufficient to build the
objective Dataset after a single iteration. Intended use is to call
`buildnext` repeatedly, building the targets after each call, until
the objective Dataset can be built.
Parameters
----------
ignore : list, optional
list of dependencies to ignore building (e.g., because unbuildable
or otherwise broken for reasons not encoded in the dependency
graph).
Yields
------
(Dataset, Reason)
"""
if not is_acyclic(self):
raise CircularDependency()
def needsbuild(child):
if not all(p.exists for p in child.parents(0)):
return False, PARENTMISSING
elif child.exists and \
any(child.is_older_than(par) for par in child.parents(0)):
return True, PARENTNEWER
elif not child.exists:
return True, MISSING
else:
return False, None
def walkbranch(stem, ancestors, branches):
""" Breadth-first search through branch for broken branches
involving ancestors """
for child in stem.children(0):
if child not in ancestors:
continue
build, reason = needsbuild(child)
if build:
if reason in (PARENTNEWER, MISSING):
yield child, reason
else:
raise RuntimeError("unexpected reason")
elif reason is PARENTMISSING:
# A parent derived from another stem is missing, so delay
# build until that branch is traversed
pass
elif not build:
# Child is up to date, so append it as a new 'stem' to walk down
if child not in branches:
branches.append(child)
ancestors = list(self.parents())
branches = list(self.roots())
if ignore is None:
ignore = []
built = [_ for _ in ignore]
while True:
if len(branches) == 0:
break
for dep, reason in walkbranch(branches[0], ancestors, branches):
if dep not in built:
built.append(dep)
yield dep, reason
branches = branches[1:]
return
def roots(self):
""" Generator for the roots (dependency-less parents) of this Dataset.
Yields
------
Dataset
"""
yielded = {}
for dataset in self._parents:
if len(dataset._parents) == 0:
if dataset not in yielded:
yield dataset
yielded[dataset] = True
else:
for gp in dataset.roots():
if gp not in yielded:
yield gp
yielded[gp] = True
class DatasetGroup(Dataset):
""" DatasetGroup represents multiple Dataset instances that are built
together. For example, these might be a dataset and associated metadata.
These should be built together, and dependent files are sensitive to
updates in any member of a DatasetGroup.
"""
def __init__(self, name, datasets, **kw):
self.name = name
self.datasets = datasets
self._store = kw
def __iter__(self):
for d in self.datasets:
yield d
@property
def _parents(self):
return set(itertools.chain(*[ds._parents for ds in self.datasets]))
@property
def _children(self):
return set(itertools.chain(*[ds._children for ds in self.datasets]))
@property
def min_age(self):
return min(os.stat(d.name).st_mtime for d in self)
@property
def max_age(self):
return max(os.stat(d.name).st_mtime for d in self)
@property
def exists(self):
return all(d.exists for d in self)
class CircularDependency(Exception):
def __init__(self, msg="graph not acyclic"):
self.message = msg
def is_acyclic(dataset):
""" Verifies that the portion of the dependency graph *above* a particular
dataset is acyclic, i.e. it contains no circular dependencies.
Parameters
----------
dataset : Dataset
Returns
-------
bool
"""
def visit(dataset, temp_marks, perm_marks):
if dataset in temp_marks:
raise CircularDependency()
if dataset not in perm_marks:
temp_marks.append(dataset)
for parent in dataset.parents(0):
visit(parent, temp_marks, perm_marks)
perm_marks.append(dataset)
idx = temp_marks.index(dataset)
del temp_marks[idx]
return True
temp_marks = []
perm_marks = []
try:
return visit(dataset, temp_marks, perm_marks)
except CircularDependency:
return False
def buildall(target):
""" Yield groups of dependencies in the order they should be built to
satisfy a target dataset.
Parameters
----------
target : Dataset
dataset to be built
Yields
------
lists of (Dataset, Reason) tuples
datasets that must be built (potentially in parallel) before the next
group of datasets
Raises
------
CircularDependency if graph is not acyclic
Notes
-----
Compared to Dataset.buildnext, this obviates the need to traverse the
entire graph at every step.
"""
if not is_acyclic(target):
raise CircularDependency()
def needsbuild(dataset):
if dataset.exists and \
any(not par.exists or dataset.is_older_than(par)
for par in dataset.parents()):
return True, PARENTNEWER
elif not dataset.exists:
return True, MISSING
else:
return False, None
def mark_children_breadthfirst(roots, parents):
""" Mark build order for all ancestors, beginning with the roots. """
marks = {}
queue = [(0, root) for root in roots]
while len(queue) != 0:
i, dep = queue.pop(0)
for child in filter(lambda d: d in parents, dep.children(0)):
iold = marks.get(child, -1)
if i > iold:
marks[child] = i
queue.append((i+1, child))
return marks
# Map of Dataset -> integer, where the integer indicates the build step
marks = mark_children_breadthfirst(target.roots(), set(target.parents()))
groups = []
maxi = 0
for dep, i in marks.items():
nb, reason = needsbuild(dep)
if nb:
while i >= maxi:
groups.append([])
maxi += 1
groups[i].append((dep, reason))
for group in groups:
yield group
if needsbuild(target)[0]:
yield [(target, MISSING)]
def get_ancestor_edges(dataset):
edges = []
for parent in dataset.parents(0):
e = (parent, dataset)
if e not in edges:
edges.append(e)
edges.extend(e for e in get_ancestor_edges(parent) if e not in edges)
return edges
def get_descendent_edges(dataset):
edges = []
for child in dataset.children(0):
e = (dataset, child)
if e not in edges:
edges.append(e)
edges.extend(e for e in get_descendent_edges(child) if e not in edges)
return edges