Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
zgornel committed Feb 25, 2019
2 parents 5d71400 + 4d5aabf commit 15435bb
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 93 deletions.
5 changes: 5 additions & 0 deletions NEWS.md
@@ -1,5 +1,10 @@
## DispatcherCache Release Notes

v0.1.1
------
- Added caching support for all DispatchNodes
- Minor refactoring and bugfixes

v0.1.0
------
- Inital release
2 changes: 1 addition & 1 deletion Project.toml
@@ -1,7 +1,7 @@
name = "DispatcherCache"
uuid = "38708fd2-87cb-5685-8cf6-f81d134ed590"
authors = ["Corneliu Cofaru <cornel@oxoaresearch.com>"]
version = "0.1.0"
version = "0.1.1"

[deps]
CodecBzip2 = "523fee87-0ab8-5b00-afb7-3ecf72e48cfd"
Expand Down
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -15,7 +15,11 @@ git clone "https://zgornel.github.com/DispatcherCache.jl"
```
or, from inside Julia,
```
] add https://zgornel.github.com/DispatcherCache.jl#master
] add DispatcherCache
```
and for the latest `master` branch,
```
] add https://github.com/zgornel/DispatcherCache.jl#master
```


Expand Down
8 changes: 6 additions & 2 deletions docs/src/index.md
Expand Up @@ -4,12 +4,12 @@ CurrentModule=DispatcherCache

# Introduction

DispatcherCache is a task persistency mechanism for [Dispatcher.jl](https://github.com/invenia/Dispatcher.jl) computational task graphs. It is based on [graphchain](https://github.com/radix-ai/graphchain).
DispatcherCache is a task persistency mechanism for [Dispatcher.jl](https://github.com/invenia/Dispatcher.jl) computational task graphs. It is based on [graphchain](https://github.com/radix-ai/graphchain) which is a caching mechanism for [Dask](https://dask.org) task graphs.

## Motivation
[Dispatcher.jl](https://github.com/invenia/Dispatcher.jl) represents a convenient way of organizing i.e. scheduling complex computational workflows for asynchronous or parallel execution. Running the same workflow multiple times is not uncommon, albeit with varying input parameters or data. Depending on the type of tasks being executed, in many cases, some of the tasks remain unchanged between distinct runs: the same function is being called on identical input arguments.

`DispatcherCache` provides a way of re-using the output of the nodes in the dispatch graph, as long as their state did not change to some unobserved one. By state it is understood the source code, arguments and the node dependencies associated with the nodes. If the state of some node does change between two consecutive executions of the graph, only it and the nodes impacted downstream (towards the leafs of the graph) are actually executed; the nodes whose state did not change are in effect (not practice) pruned from the graph, with the exception of the ones that are dependencies of nodes that have to be re-executed due to state change.
`DispatcherCache` provides a way of re-using the output of the nodes in the dispatch graph, as long as their state did not change to some unobserved one. By state it is understood the source code, arguments and the node dependencies associated with the nodes. If the state of some node does change between two consecutive executions of the graph, only the node and the nodes impacted downstream (towards the leafs of the graph) are actually executed. The nodes whose state did not change are in effect (not practice) pruned from the graph, with the exception of the ones that are dependencies of nodes that have to be re-executed due to state change.

## Installation

Expand All @@ -19,5 +19,9 @@ $ git clone https://github.com/zgornel/DispatcherCache.jl
```
or, inside Julia
```
] add DispatcherCache
```
and for the latest `master` branch,
```
] add https://github.com/zgornel/DispatcherCache.jl#master
```
69 changes: 33 additions & 36 deletions src/core.jl
@@ -1,5 +1,6 @@
"""
add_hash_cache!(graph, endpoints, uncacheable, compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR)
add_hash_cache!(graph, endpoints=[], uncacheable=[]
[; compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR])
Optimizes a delayed execution graph `graph::DispatchGraph`
by wrapping individual nodes in load-from-disk on execute-and-store
Expand All @@ -15,12 +16,12 @@ or alternatively re-run and store the outputs of nodes which have new state.
# Arguments
* `exec::Executor` the `Dispatcher.jl` executor
* `graph::DispatchGraph` input dispatch graph
* `endpoints::Vector{Union{DispatchNode, AbstractString}}` leaf nodes for which
caching will occur; nodes that depend on these will not be cached. The nodes
can be specified either by label of by the node object itself
* uncacheable::Vector{Union{DispatchNode, AbstractString}}` nodes that will
never be cached and will always be executed (these nodes are still hashed and
their hashes influence upstream node hashes as well)
* `endpoints::AbstractVector` leaf nodes for which caching will occur;
nodes that depend on these will not be cached. The nodes can be specified
either by label of by the node object itself
* uncacheable::AbstractVector` nodes that will never be cached and
will always be executed (these nodes are still hashed and their hashes
influence upstream node hashes as well)
# Keyword arguments
* `compression::String` enables compression of the node outputs.
Expand All @@ -34,41 +35,38 @@ Note: This function should be used with care as it modifies the input
the distict, functionally identical graphs.
"""
function add_hash_cache!(graph::DispatchGraph,
endpoints::Vector{T}=T[],
uncacheable::Vector{T}=T[];
endpoints::AbstractVector=[],
uncacheable::AbstractVector=[];
compression::String=DEFAULT_COMPRESSION,
cachedir::String=DEFAULT_CACHE_DIR
) where T<:Union{<:DispatchNode, <:AbstractString}
cachedir::String=DEFAULT_CACHE_DIR)
# Checks
if isempty(endpoints)
@warn "No enpoints for graph, will not process dispatch graph."
return nothing
end

# Initializations
subgraph = Dispatcher.subgraph(
graph, map(n->get_node(graph, n), endpoints))

work = collect(T, get_keys(graph, T)) # keys to be traversed
solved = Set{T}() # keys of computable tasks
dependencies = get_dependencies(graph, T) # dependencies of all tasks
key2hash = Dict{T, String}() # key=>hash mapping
_endpoints = map(n->get_node(graph, n), endpoints)
_uncacheable = imap(n->get_node(graph, n), uncacheable)
subgraph = Dispatcher.subgraph(graph, _endpoints)
work = collect(nodes(graph)) # nodes to be traversed
solved = Set{DispatchNode}() # computable tasks
node2hash = Dict{DispatchNode, String}() # node => hash mapping
storable = Set{String}() # hashes of nodes with storable output
updates = Dict{DispatchNode, DispatchNode}()

# Load hashchain
hashchain = load_hashchain(cachedir, compression=compression)
# Traverse dispatch graph
while !isempty(work)
key = popfirst!(work)
deps = dependencies[key]
node = popfirst!(work)
deps = dependencies(node)
if isempty(deps) || issubset(deps, solved)
# Node is solvable
push!(solved, key)
node = get_node(graph, key) # The node is always a DispatchNode
_hash_node, _hash_comp = node_hash(node, key2hash)
key2hash[key] = _hash_node
skipcache = key in uncacheable || !(node in subgraph.nodes)
push!(solved, node)
_hash_node, _hash_comp = node_hash(node, node2hash)
node2hash[node] = _hash_node
skipcache = node in _uncacheable || !(node in subgraph.nodes)
# Wrap nodes
if _hash_node in keys(hashchain) && !skipcache &&
!(_hash_node in storable)
Expand All @@ -93,7 +91,7 @@ function add_hash_cache!(graph::DispatchGraph,
end
else
# Non-solvable node
push!(work, key)
push!(work, node)
end
end
# Update graph
Expand All @@ -117,12 +115,12 @@ outputs of the nodes in the subgraph whose leaf nodes are given by
# Arguments
* `exec::Executor` the `Dispatcher.jl` executor
* `graph::DispatchGraph` input dispatch graph
* `endpoints::Vector{Union{DispatchNode, AbstractString}}` leaf nodes for which
caching will occur; nodes that depend on these will not be cached. The nodes
can be specified either by label of by the node object itself
* uncacheable::Vector{Union{DispatchNode, AbstractString}}` nodes that will
never be cached and will always be executed (these nodes are still hashed and
their hashes influence upstream node hashes as well)
* `endpoints::AbstractVector` leaf nodes for which caching will occur;
nodes that depend on these will not be cached. The nodes can be specified
either by label of by the node object itself
* `uncacheable::AbstractVector` nodes that will never be cached and will
always be executed (these nodes are still hashed and their hashes influence
upstream node hashes as well)
# Keyword arguments
* `compression::String` enables compression of the node outputs.
Expand Down Expand Up @@ -187,11 +185,10 @@ julia> readdir(cachedir)
"""
function run!(exec::Executor,
graph::DispatchGraph,
endpoints::Vector{T},
uncacheable::Vector{T}=T[];
endpoints::AbstractVector,
uncacheable::AbstractVector=[];
compression::String=DEFAULT_COMPRESSION,
cachedir::String=DEFAULT_CACHE_DIR
) where T<:Union{<:DispatchNode, <:AbstractString}
cachedir::String=DEFAULT_CACHE_DIR)
# Make a copy of the input graph that will be modified
# and mappings from the original nodes to the copies
tmp_graph = Base.deepcopy(graph)
Expand Down
8 changes: 7 additions & 1 deletion src/hash.jl
Expand Up @@ -43,7 +43,13 @@ julia> using DispatcherCache: source_hash
"""
source_hash(node::Op) = begin
f = node.func
code = join(code_lowered(f)[1].code, "\n")
local code
try
code = join(code_lowered(f)[1].code, "\n")
catch
code = get_label(node)
@warn "Cannot hash code for node $(code) (using label)."
end
return __hash(code)
end

Expand Down
41 changes: 9 additions & 32 deletions src/utils.jl
@@ -1,33 +1,3 @@
"""
get_keys(graph, ::Type{T})
Returns an iterator of the nodes of the dispatch graph `graph`.
The returned iterator generates elements of type `T`: if
`T<:AbstractString` the iterator is over the labels of the graph,
if `T<:DispatchNode` the iterator is over the nodes of the graph.
"""
get_keys(graph::DispatchGraph, ::Type{T}) where T<:DispatchNode =
keys(graph.nodes.node_dict)

get_keys(graph::DispatchGraph, ::Type{T}) where T<:AbstractString =
imap(x->x.label, get_keys(graph, DispatchNode))


"""
get_dependencies(graph, ::Type{T})
Returns a dictionary where the keys are, depending on `T`, the node labels
or nodes of `graph::DsipatchGraph` and vthe alues are iterators over either
the node labels or nodes corresponding to the depencies of the key node.
"""
get_dependencies(graph::DispatchGraph, ::Type{T}) where T<:DispatchNode =
Dict(k => (dependencies(k)) for k in get_keys(graph, T))

get_dependencies(graph::DispatchGraph, ::Type{T}) where T<:AbstractString =
Dict(k.label => imap(x->x.label, dependencies(k))
for k in get_keys(graph, DispatchNode))


"""
get_node(graph, label)
Expand All @@ -37,14 +7,21 @@ get_node(graph::DispatchGraph, node::T) where T<:DispatchNode = node

get_node(graph::DispatchGraph, label::T) where T<:AbstractString = begin
found = Set{DispatchNode}()
for node in get_keys(graph, DispatchNode)
node.label == label && push!(found, node)
for node in nodes(graph)
if has_label(node)
get_label(node) == label && push!(found, node)
end
end
length(found) > 1 && throw(ErrorException("Labels in dispatch graph are not unique."))
length(found) < 1 && throw(ErrorException("No nodes with label $label found."))
return pop!(found)
end

get_node(graph::DispatchGraph, node::T) where T = begin
throw(ArgumentError("A node identifier can be either a " *
"::DispatchNode or ::AbstractString."))
end


"""
load_hashchain(cachedir [; compression=DEFAULT_COMPRESSION])
Expand Down
82 changes: 65 additions & 17 deletions src/wrappers.jl
@@ -1,5 +1,7 @@
"""
wrap_to_load!(updates, node, nodehash; cachedir=DEFAULT_CACHE_DIR, compression=DEFAULT_COMPRESSION)
wrap_to_load!(updates, node, nodehash;
cachedir=DEFAULT_CACHE_DIR,
compression=DEFAULT_COMPRESSION)
Generates a new dispatch node that corresponds to `node::DispatchNode`
and which loads a file from the `cachedir` cache directory whose name and extension
Expand Down Expand Up @@ -39,21 +41,25 @@ function wrap_to_load!(updates::Dict{DispatchNode, DispatchNode},

# Add wrapped node to updates (no arguments to update :)
newnode = Op(loading_wrapper)
newnode.label = node.label
newnode.label = _labelize(node, nodehash)
push!(updates, node => newnode)
return nothing
end


"""
wrap_to_store!(graph, node, nodehash; compression=DEFAULT_COMPRESSION)
wrap_to_store!(graph, node, nodehash;
cachedir=DEFAULT_CACHE_DIR,
compression=DEFAULT_COMPRESSION,
skipcache=false)
Generates a new dispatch node that corresponds to `node::DispatchNode`
Generates a new `Op` node that corresponds to `node::DispatchNode`
and which stores the output of the execution of `node` in a file whose
name and extension depend on `nodehash` and `compression`. The generated
node is added to `updates` which maps `node` to the generated node.
node is added to `updates` which maps `node` to the generated node. The
node output is stored in `cachedir`. The caching is skipped if `skipcache`
is `true`.
"""
# TODO(Corneliu) handle other types of DispatchNode
function wrap_to_store!(updates::Dict{DispatchNode, DispatchNode},
node::DispatchNode,
nodehash::String;
Expand All @@ -78,10 +84,11 @@ function wrap_to_store!(updates::Dict{DispatchNode, DispatchNode},
compressor = get_compressor(compression, "compress")

# Get calculation result
result = node.func(args...; kwargs...)
result = _run_node(node, args...; kwargs...)

# Store result
@debug "[$nodehash][$(node.label)] $operation (compression=$compression)"
label = _labelize(node, nodehash)
@debug "[$nodehash][$(label)] $operation (compression=$compression)"
if !skipcache
if !isfile(filepath)
open(compressor, filepath, "w") do fid
Expand All @@ -98,16 +105,57 @@ function wrap_to_store!(updates::Dict{DispatchNode, DispatchNode},
# the latter should contain at this point only solved nodes (so the
# dependencies of the current node should be good.
newnode = Op(exec_store_wrapper)
newnode.label = node.label
newnode.args = map(node.args) do arg
ifelse(arg isa DispatchNode, get(updates, arg, arg), arg)
end
newnode.kwargs = pairs(
NamedTuple{(node.kwargs.itr...,)}(
((map(node.kwargs.data) do kwarg
ifelse(kwarg isa DispatchNode, get(updates, kwarg, kwarg), kwarg)
end)...,)))
newnode.label = _labelize(node, nodehash)
newnode.args = _arguments(node, updates)
newnode.kwargs = _kwarguments(node, updates)
# Add wrapped node to updates
push!(updates, node=>newnode)
return nothing
end


# Small wrapper that executes a node
_run_node(node::Op, args...; kwargs...) = node.func(args...; kwargs...)

_run_node(node::IndexNode, args...; kwargs...) = getindex(args..., node.index)

_run_node(node::CollectNode, args...; kwargs...) = vcat(args...)

_run_node(node::DataNode, args...; kwargs...) = identity(args...)


# Small wrapper that gets the label for a wrapped node
_labelize(node::Op, args...) = get_label(node)

_labelize(node::CollectNode, args...) = get_label(node)

_labelize(node::IndexNode, nodehash::String) = "IndexNode_$nodehash"

_labelize(node::DataNode, nodehash::String) = "DataNode_$nodehash"


# Small wrapper that generates new arguments for a wrapped node
_arguments(node::Op, updates) = map(node.args) do arg
ifelse(arg isa DispatchNode, get(updates, arg, arg), arg)
end

_arguments(node::IndexNode, updates) =
tuple(get(updates, node.node, node.node))

_arguments(node::CollectNode, updates) =
Tuple(get(updates, n, n) for n in node.nodes)

_arguments(node::DataNode, updates) =
tuple(ifelse(node.data isa DispatchNode,
get(updates, node.data, node.data),
node.data))


# Small wrapper that generates new keyword arguments for a wrapped node
_kwarguments(node::Op, updates) = pairs(
NamedTuple{(node.kwargs.itr...,)}(
((map(node.kwargs.data) do kwarg
ifelse(kwarg isa DispatchNode, get(updates, kwarg, kwarg), kwarg)
end)...,)))

_kwarguments(node::DispatchNode, updates) = pairs(NamedTuple())

0 comments on commit 15435bb

Please sign in to comment.