Skip to content

Commit

Permalink
Merge 17d9569 into 5e52d29
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott Sanderson committed Sep 14, 2016
2 parents 5e52d29 + 17d9569 commit e04bf0d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
7 changes: 7 additions & 0 deletions zipline/pipeline/engine.py
Expand Up @@ -344,6 +344,8 @@ def compute_chunk(self, graph, dates, assets, initial_workspace):
loader_group_key = juxt(get_loader, getitem(graph.extra_rows))
loader_groups = groupby(loader_group_key, graph.loadable_terms)

refcounts = graph.initial_refcounts(workspace)

for term in graph.ordered():
# `term` may have been supplied in `initial_workspace`, and in the
# future we may pre-compute loadable terms coming from the same
Expand Down Expand Up @@ -380,6 +382,11 @@ def compute_chunk(self, graph, dates, assets, initial_workspace):
else:
assert workspace[term].shape == (mask.shape[0], 1)

# Decref dependencies of ``term``, and clear any terms whose
# refcounts hit 0.
for garbage_term in graph.decref_dependencies(term, refcounts):
del workspace[garbage_term]

out = {}
graph_extra_rows = graph.extra_rows
for name, term in iteritems(graph.outputs):
Expand Down
48 changes: 48 additions & 0 deletions zipline/pipeline/graph.py
Expand Up @@ -119,6 +119,54 @@ def svg(self):
def _repr_png_(self):
return self.png.data

def initial_refcounts(self, initial_terms):
"""
Calculate initial refcounts for execution of this graph.
Parameters
----------
initial_terms : iterable[Term]
An iterable of terms that were pre-computed before graph execution.
Each node starts with a refcount equal to its outdegree, and output
nodes get one extra reference to ensure that they're still in the graph
at the end of execution.
"""
refcounts = self.out_degree()
for t in self.outputs.values():
refcounts[t] += 1

for t in initial_terms:
self.decref_dependencies(t, refcounts)

return refcounts

def decref_dependencies(self, term, refcounts):
"""
Decrement in-edges for ``term`` after computation.
Parameters
----------
term : zipline.pipeline.Term
The term whose parents should be decref'ed.
refcounts : dict[Term -> int]
Dictionary of refcounts.
Return
------
garbage : set[Term]
Terms whose refcounts hit zero after decrefing.
"""
garbage = set()
# Edges are tuple of (from, to).
for parent, _ in self.in_edges([term]):
refcounts[parent] -= 1
# No one else depends on this term. Remove it from the
# workspace to conserve memory.
if refcounts[parent] == 0:
garbage.add(parent)
return garbage


class ExecutionPlan(TermGraph):
"""
Expand Down

0 comments on commit e04bf0d

Please sign in to comment.