Skip to content
This repository has been archived by the owner on Feb 11, 2021. It is now read-only.

Commit

Permalink
Merge pull request #38 from invenia/sm/1.0-compat
Browse files Browse the repository at this point in the history
Updates to work with julia v1.0
  • Loading branch information
iamed2 committed Sep 28, 2018
2 parents 23f4bb8 + b6bafae commit eed6fed
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 120 deletions.
19 changes: 15 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ os:
- osx
julia:
- 0.6
- 0.7
- 1.0
- nightly
matrix:
allow_failures:
Expand All @@ -23,7 +25,16 @@ notifications:
# - julia -e 'Pkg.clone(pwd()); Pkg.build("Dispatcher"); Pkg.test("Dispatcher"; coverage=true)'
after_success:
# push coverage results to Codecov
- julia -e 'cd(Pkg.dir("Dispatcher")); Pkg.add("Coverage"); using Coverage; Codecov.submit(Codecov.process_folder())'
# generate docs
- julia -e 'Pkg.add("Documenter")'
- julia -e 'cd(Pkg.dir("Dispatcher")); include(joinpath("docs", "make.jl"))'
- |
julia -e '
VERSION >= v"0.7.0-DEV.3656" && using Pkg
VERSION >= v"0.7.0-DEV.5183" || cd(Pkg.dir("Dispatcher"))
Pkg.add("Coverage"); using Coverage
Codecov.submit(process_folder())'
# generate docs
- |
julia -e '
VERSION >= v"0.7.0-DEV.3656" && using Pkg
VERSION >= v"0.7.0-DEV.5183" || cd(Pkg.dir("Dispatcher"))
Pkg.add("Documenter");
include(joinpath("docs", "make.jl"))'
6 changes: 3 additions & 3 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
julia 0.6
Compat 0.30.0
Compat 0.65.0
IterTools 0.1.0
LightGraphs 0.7.0
DeferredFutures 0.5.0
DeferredFutures 0.6.0
DataStructures 0.5.0
AutoHashEquals 0.1.2
Memento 0.5.0
Memento 0.7.0
ResultTypes 1.3.0
48 changes: 19 additions & 29 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,45 +1,35 @@
environment:
matrix:
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x86/0.6/julia-0.6-latest-win32.exe"
- JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.6/julia-0.6-latest-win64.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"
- julia_version: 0.6
- julia_version: 0.7
- julia_version: 1
- julia_version: nightly

branches:
only:
- master
- /release-.*/
platform:
- x86 # 32-bit
- x64 # 64-bit

matrix:
allow_failures:
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x86/julia-latest-win32.exe"
- JULIA_URL: "https://julialangnightlies-s3.julialang.org/bin/winnt/x64/julia-latest-win64.exe"

notifications:
- provider: Email
on_build_success: false
on_build_failure: false
on_build_status_changed: false
- julia_version: nightly

install:
- ps: "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12"
- ps: iex ((new-object net.webclient).DownloadString("https://raw.githubusercontent.com/JuliaCI/Appveyor.jl/version-1/bin/install.ps1"))
# If there's a newer build queued for the same PR, cancel this one
- ps: if ($env:APPVEYOR_PULL_REQUEST_NUMBER -and $env:APPVEYOR_BUILD_NUMBER -ne ((Invoke-RestMethod `
https://ci.appveyor.com/api/projects/$env:APPVEYOR_ACCOUNT_NAME/$env:APPVEYOR_PROJECT_SLUG/history?recordsNumber=50).builds | `
Where-Object pullRequestId -eq $env:APPVEYOR_PULL_REQUEST_NUMBER)[0].buildNumber) { `
throw "There are newer queued builds for this pull request, failing early." }
# Download most recent Julia Windows binary
- ps: (new-object net.webclient).DownloadFile(
$env:JULIA_URL,
"C:\projects\julia-binary.exe")
# Run installer silently, output to C:\projects\julia
- C:\projects\julia-binary.exe /S /D=C:\projects\julia

build_script:
# Need to convert from shallow to complete for Pkg.clone to work
- IF EXIST .git\shallow (git fetch --unshallow)
- C:\projects\julia\bin\julia -e "versioninfo();
Pkg.clone(pwd(), \"Dispatcher\"); Pkg.build(\"Dispatcher\")"
- echo "%JL_BUILD_SCRIPT%"
- C:\julia\bin\julia -e "%JL_BUILD_SCRIPT%"

test_script:
- C:\projects\julia\bin\julia -e "Pkg.test(\"Dispatcher\")"
- echo "%JL_TEST_SCRIPT%"
- C:\julia\bin\julia -e "%JL_TEST_SCRIPT%"

# # Uncomment to support code coverage upload. Should only be enabled for packages
# # which would have coverage gaps without running on Windows
# on_success:
# - echo "%JL_CODECOV_SCRIPT%"
# - C:\julia\bin\julia -e "%JL_CODECOV_SCRIPT%"
2 changes: 1 addition & 1 deletion docs/src/pages/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Note that code in the argument list gets evaluated immediately; only the functio

An `Executor` runs a `DispatchGraph`.
This package currently provides two `Executor`s: `AsyncExecutor` and `ParallelExecutor`.
They work the same way, except `AsyncExecutor` runs nodes using `@schedule` and `ParallelExecutor` uses `@spawn`.
They work the same way, except `AsyncExecutor` runs nodes using `@async` and `ParallelExecutor` uses `@spawn`.

This call:

Expand Down
3 changes: 3 additions & 0 deletions src/Dispatcher.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ using IterTools
using LightGraphs
using Memento
using ResultTypes
using Compat.Distributed

abstract type DispatcherError <: Exception end

const _IdDict = VERSION < v"0.7" ? ObjectIdDict : IdDict{Any, Any}
typed_stack(t) = VERSION < v"0.7" ? Stack(t) : Stack{t}()
const logger = getlogger(@__MODULE__)
const reset! = DeferredFutures.reset! # DataStructures also exports this.

Expand Down
20 changes: 10 additions & 10 deletions src/executors.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Base.Distributed: wrap_on_error, wrap_retry
import Compat.Distributed: wrap_on_error, wrap_retry

"""
An `Executor` handles execution of [`DispatchGraph`](@ref)s.
Expand Down Expand Up @@ -82,10 +82,10 @@ function run!(
exec::Executor,
output_nodes::AbstractArray{T},
input_nodes::AbstractArray{S}=DispatchNode[];
input_map::Associative=Dict{DispatchNode, Any}(),
input_map::AbstractDict=Dict{DispatchNode, Any}(),
throw_error=true
) where {T<:DispatchNode, S<:DispatchNode}
graph = DispatchGraph(output_nodes, collect(chain(input_nodes, keys(input_map))))
graph = DispatchGraph(output_nodes, collect(DispatchNode, Iterators.flatten((input_nodes, keys(input_map)))))

if is_cyclic(graph.graph)
throw(ExecutorError(
Expand All @@ -94,7 +94,7 @@ function run!(
end

# replace nodes in input_map with their values
for (node, val) in chain(zip(input_nodes, imap(fetch, input_nodes)), input_map)
for (node, val) in Iterators.flatten((zip(input_nodes, imap(fetch, input_nodes)), input_map))
node_id = graph.nodes[node]
graph.nodes[node_id] = DataNode(val)
end
Expand Down Expand Up @@ -377,7 +377,7 @@ function run_inner_node!(exec::Executor, node::DispatchNode, id::Int)
err.captured.ex, err.captured.processed_bt, id
)
else
DependencyError(err, catch_stacktrace(), id)
DependencyError(err, stacktrace(catch_backtrace()), id)
end

debug(logger, "Node $id: throwing $dep_err)")
Expand Down Expand Up @@ -414,11 +414,11 @@ end
dispatch!(exec::AsyncExecutor, node::DispatchNode) -> Task
`dispatch!` takes the `AsyncExecutor` and a `DispatchNode` to run.
The [`run!(::DispatchNode)`](@ref) method on the node is called within a `@schedule` block
The [`run!(::DispatchNode)`](@ref) method on the node is called within a `@async` block
and the resulting `Task` is returned.
This is the defining method of `AsyncExecutor`.
"""
dispatch!(exec::AsyncExecutor, node::DispatchNode) = @schedule run!(node)
dispatch!(exec::AsyncExecutor, node::DispatchNode) = @async run!(node)

"""
`ParallelExecutor` is an [`Executor`](@ref) which creates a Julia `Task` for each
Expand Down Expand Up @@ -453,20 +453,20 @@ mutable struct ParallelExecutor <: Executor
# If we are in the middle of fetching data and the process is killed we
# could get an ArgumentError saying that the stream was closed or unusable.
(e) -> begin
isa(e, ArgumentError) && contains(e.msg, "stream is closed or unusable")
isa(e, ArgumentError) && occursin("stream is closed or unusable", e.msg)
end,

# Julia appears to have a race condition where the worker process is removed at the
# same time as `@spawn` is selecting a pid which results in a negative pid.
# This is extremely hard to reproduce, but has happened a few times.
(e) -> begin
isa(e, ArgumentError) && contains(e.msg, "IntSet elements cannot be negative")
isa(e, ArgumentError) && occursin("IntSet elements cannot be negative", e.msg)
end,

# Similar to the "stream is closed or unusable" error, we can get an error
# attempting to write to the unknown socket (of a process that has been killed)
(e) -> begin
isa(e, ErrorException) && contains(e.msg, "attempt to send to unknown socket")
isa(e, ErrorException) && occursin("attempt to send to unknown socket", e.msg)
end
]

Expand Down
32 changes: 16 additions & 16 deletions src/graph.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ function DispatchGraph(
input_nodes::Union{AbstractArray{S}, Base.AbstractSet{S}}=DispatchNode[],
) where {T<:DispatchNode, S<:DispatchNode}
graph = DispatchGraph()
to_visit = Stack(DispatchNode)
to_visit = typed_stack(DispatchNode)

# this is an ObjectIdDict to avoid a hashing stack overflow when there are cycles
visited = ObjectIdDict()
visited = _IdDict()
for node in output_nodes
push!(graph, node)
push!(to_visit, node)
Expand Down Expand Up @@ -114,21 +114,21 @@ Return an iterable of all nodes stored in the `DispatchGraph`.
nodes(graph::DispatchGraph) = nodes(graph.nodes)

"""
in_neighbors(graph::DispatchGraph, node::DispatchNode) ->
inneighbors(graph::DispatchGraph, node::DispatchNode) ->
Return an iterable of all nodes in the graph with edges from themselves to `node`.
"""
function LightGraphs.in_neighbors(graph::DispatchGraph, node::DispatchNode)
imap(n->graph.nodes[n], in_neighbors(graph.graph, graph.nodes[node]))
function LightGraphs.inneighbors(graph::DispatchGraph, node::DispatchNode)
imap(n->graph.nodes[n], inneighbors(graph.graph, graph.nodes[node]))
end

"""
out_neighbors(graph::DispatchGraph, node::DispatchNode) ->
outneighbors(graph::DispatchGraph, node::DispatchNode) ->
Return an iterable of all nodes in the graph with edges from `node` to themselves.
"""
function LightGraphs.out_neighbors(graph::DispatchGraph, node::DispatchNode)
imap(n->graph.nodes[n], out_neighbors(graph.graph, graph.nodes[node]))
function LightGraphs.outneighbors(graph::DispatchGraph, node::DispatchNode)
imap(n->graph.nodes[n], outneighbors(graph.graph, graph.nodes[node]))
end

"""
Expand All @@ -152,7 +152,7 @@ function LightGraphs.induced_subgraph(graph::DispatchGraph, vs)
end

for keep_id in vs
for vc in out_neighbors(graph.graph, keep_id)
for vc in outneighbors(graph.graph, keep_id)
if vc in vs
add_edge!(
new_graph.graph,
Expand Down Expand Up @@ -184,8 +184,8 @@ function Base.:(==)(graph1::DispatchGraph, graph2::DispatchGraph)
end

for node in nodes1
if Set{DispatchNode}(out_neighbors(graph1, node)) !=
Set{DispatchNode}(out_neighbors(graph2, node))
if Set{DispatchNode}(outneighbors(graph1, node)) !=
Set{DispatchNode}(outneighbors(graph2, node))
return false
end
end
Expand Down Expand Up @@ -220,25 +220,25 @@ function subgraph(
endpoints::AbstractArray{Int},
roots::AbstractArray{Int}=Int[],
)
to_visit = Stack(Int)
to_visit = typed_stack(Int)

if isempty(endpoints)
rootset = Set{Int}(roots)
discards = Set{Int}()

for v in roots
for vp in in_neighbors(graph.graph, v)
for vp in inneighbors(graph.graph, v)
push!(to_visit, vp)
end
end

while length(to_visit) > 0
v = pop!(to_visit)

if all((vc in rootset || vc in discards) for vc in out_neighbors(graph.graph, v))
if all((vc in rootset || vc in discards) for vc in outneighbors(graph.graph, v))
push!(discards, v)

for vp in in_neighbors(graph.graph, v)
for vp in inneighbors(graph.graph, v)
push!(to_visit, vp)
end
end
Expand All @@ -259,7 +259,7 @@ function subgraph(
while length(to_visit) > 0
v = pop!(to_visit)

for vp in in_neighbors(graph.graph, v)
for vp in inneighbors(graph.graph, v)
if !(vp in keeps)
push!(to_visit, vp)
end
Expand Down
Loading

0 comments on commit eed6fed

Please sign in to comment.