Skip to content

Commit

Permalink
Improve reduction and add ProductSection
Browse files Browse the repository at this point in the history
Use Arrays instead of generators in pmapreduce_commutative
Add ProductSection and dropleading
  • Loading branch information
jishnub committed Jun 4, 2020
1 parent c398221 commit 271dd7b
Show file tree
Hide file tree
Showing 7 changed files with 1,048 additions and 686 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
Manifest.toml
*.cov
coverage
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelUtilities"
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
authors = ["Jishnu Bhattacharya <jishnuonline@gmail.com>"]
version = "0.7.2"
version = "0.7.3"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ julia> @btime last($ps_long)
(3000, 3000, 1200)
```

We may also compute the index of a particular set of parameters in the iterator. This is somewhat slower and is evaluated in `O(log(n))` time using a binary search. Whether or not the value exists in the list may however be evaluated in `O(1)` time.
We may evaluate whether or not a value exists in the list and its index in `O(1)` time.

```julia
julia> val = (3,3,4)
Expand All @@ -343,11 +343,11 @@ julia> localindex(ps,val)
julia> val=(10,2,901);

julia> @btime $val in $ps_long
67.824 ns (0 allocations: 0 bytes)
50.183 ns (0 allocations: 0 bytes)
true

julia> @btime localindex($ps_long,$val)
1.036 μs (0 allocations: 0 bytes)
julia> @btime localindex($ps_long, $val)
104.513 ns (0 allocations: 0 bytes)
3010
```

Expand Down
162 changes: 102 additions & 60 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,92 +61,131 @@ abstract type Ordering end
struct Sorted <: Ordering end
struct Unsorted <: Ordering end

abstract type ReductionNode end
struct TopTreeNode <: ReductionNode
rank :: Int
end
struct SubTreeNode <: ReductionNode
rank :: Int
end

function reducedvalue(freduce::Function,rank,
pipe::BranchChannel,ifsorted::Ordering)

reducedvalue(freduce,
rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank),
pipe,ifsorted)
end

function reducedvalue(freduce::Function,node::SubTreeNode,
pipe::BranchChannel{Tmap,Tred},::Unsorted) where {Tmap,Tred}

self = take!(pipe.selfchannels.out) :: Tmap
N = nchildren(pipe)
if rank > 0
self = take!(pipe.selfchannels.out) :: Tmap
if N > 0
reducechildren = freduce(take!(pipe.childrenchannels.out)::Tred for i=1:N)::Tred
res = freduce((reducechildren, self)) :: Tred
elseif N == 0
res = freduce((self,)) :: Tred
end
else
if N > 0
res = freduce(take!(pipe.childrenchannels.out)::Tred for i=1:N)::Tred
elseif N == 0
# N == 0 && rank <= 0
# shouldn't reach this
error("nodes with rank <=0 must have children")
vals = Vector{Tred}(undef,N+1)
@sync begin
@async vals[1] = freduce([self]) :: Tred
@async for i=1:N
vals[i+1] = take!(pipe.childrenchannels.out)::Tred
end
end
return res

freduce(vals)
end
function reducedvalue(freduce::Function,node::TopTreeNode,
pipe::BranchChannel{<:Any,Tred},::Unsorted) where {Tred}

function reducedvalue(freduce::Function,rank,
N = nchildren(pipe)
if N == 0
# shouldn't reach this
error("Nodes on the top tree must have children")
end
vals = Vector{Tred}(undef,N)
for i=1:N
vals[i] = take!(pipe.childrenchannels.out)::Tred
end

freduce(vals)
end

function reducedvalue(freduce::Function,node::SubTreeNode,
pipe::BranchChannel{Tmap,Tred},::Sorted) where {Tmap,Tred}

rank = node.rank
N = nchildren(pipe)
leftchild = N > 0
selfvalpresent = rank > 0
vals = Vector{Tred}(undef,N + selfvalpresent)
vals = Vector{Tred}(undef,N + 1)
@sync begin
@async begin
if selfvalpresent
selfval = take!(pipe.selfchannels.out)::Tmap
selfvalred = freduce((value(selfval),))
pv = pval(rank,selfvalred)
ind = selfvalpresent + leftchild
vals[ind] = pv
end
selfval = take!(pipe.selfchannels.out)::Tmap
selfvalred = freduce((value(selfval),))
pv = pval(rank,selfvalred)
ind = leftchild + 1
vals[ind] = pv
end
@async begin
if selfvalpresent
for i=1:N
pv = take!(pipe.childrenchannels.out) :: Tred
shift = pv.rank > rank ? 1 : -1
ind = shift + leftchild + 1
vals[ind] = pv
end
else
for i=1:N
pv = take!(pipe.childrenchannels.out) :: Tred
vals[i] = pv
end
sort!(vals,by=pv->pv.rank)
end
@async for i=1:N
pv = take!(pipe.childrenchannels.out) :: Tred
shift = pv.rank > rank ? 1 : -1
ind = shift + leftchild + 1
vals[ind] = pv
end
end

Tred(rank,freduce(value(v) for v in vals))
end
function reducedvalue(freduce::Function,node::TopTreeNode,
pipe::BranchChannel{<:Any,Tred},::Sorted) where {Tred}

rank = node.rank
N = nchildren(pipe)
leftchild = N > 0
@assert leftchild "Nodes on the top tree must have children"
vals = Vector{Tred}(undef,N)
for i=1:N
pv = take!(pipe.childrenchannels.out) :: Tred
vals[i] = pv
end
sort!(vals,by=pv->pv.rank)

Tred(rank,freduce(value(v) for v in vals))
end

function indicatereduceprogress!(::Nothing,rank) end
function indicatereduceprogress!(progress::RemoteChannel,rank)
put!(progress,(false,true,rank))
end

function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel{Tmap,Tred},
ifsort::Ordering,progress::Union{Nothing,RemoteChannel}) where {Tmap,Tred}
function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel,
ifsort::Ordering,progress)

reduceTreeNode(freduce,
rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank),
pipe,ifsort,progress)
end

function checkerror(::SubTreeNode,pipe::BranchChannel)
selferr = take!(pipe.selfchannels.err)
childrenerr = any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe))
selferr || childrenerr
end
function checkerror(::TopTreeNode,pipe::BranchChannel)
any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe))
end

function reduceTreeNode(freduce::Function,node::ReductionNode,
pipe::BranchChannel{<:Any,Tred},
ifsort::Ordering,progress::Union{Nothing,RemoteChannel}) where {Tred}
# This function that communicates with the parent and children

# Start by checking if there is any error locally in the map,
# and if there's none then check if there are any errors on the children
if rank > 0
anyerr = take!(pipe.selfchannels.err)
else
anyerr = false
end
anyerr = anyerr ||
any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe))

anyerr = checkerror(node,pipe)
rank = node.rank
# Evaluate the reduction only if there's no error
# In either case push the error flag to the parent
if !anyerr
try
res = reducedvalue(freduce,rank,pipe,ifsort) :: Tred
res = reducedvalue(freduce,node,pipe,ifsort) :: Tred
put!(pipe.parentchannels.out,res)
put!(pipe.parentchannels.err,false)
indicatereduceprogress!(progress,rank)
Expand All @@ -170,7 +209,7 @@ function return_unless_error(r::RemoteChannelContainer)
end
end

@inline function return_unless_error(b::BranchChannel)
function return_unless_error(b::BranchChannel)
return_unless_error(b.parentchannels)
end

Expand All @@ -193,19 +232,22 @@ function pmapreduceworkers(fmap::Function,freduce::Function,iterators::Tuple,

for (ind,mypipe) in enumerate(branches)
p = mypipe.p
rank = ind - extrareducenodes
if rank > 0
ind_reduced = ind - extrareducenodes
rank = ind_reduced
if ind_reduced > 0
iterable_on_proc = ProductSplit(iterators,num_workers_active,rank)

@spawnat p mapTreeNode(fmap,iterable_on_proc,rank,mypipe,
ifelse(showprogress,progresschannel,nothing),
showprogress ? progresschannel : nothing,
args...;kwargs...)

@spawnat p reduceTreeNode(freduce,rank,mypipe,ord,
ifelse(showprogress,progresschannel,nothing))
@spawnat p reduceTreeNode(freduce,SubTreeNode(rank),
mypipe,ord,
showprogress ? progresschannel : nothing)
else
@spawnat p reduceTreeNode(freduce,rank,mypipe,ord,
ifelse(showprogress,progresschannel,nothing))
@spawnat p reduceTreeNode(freduce,TopTreeNode(rank),
mypipe,ord,
showprogress ? progresschannel : nothing)
end
end

Expand Down
Loading

2 comments on commit 271dd7b

@jishnub
Copy link
Owner Author

@jishnub jishnub commented on 271dd7b Jun 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/15853

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.7.3 -m "<description of version>" 271dd7b458cc47b837012b316e6d65320b24d20b
git push origin v0.7.3

Please sign in to comment.