From 271dd7b458cc47b837012b316e6d65320b24d20b Mon Sep 17 00:00:00 2001 From: jishnub Date: Sun, 31 May 2020 22:37:38 +0530 Subject: [PATCH] Improve reduction and add ProductSection Use Arrays instead of generators in pmapreduce_commutative Add ProductSection and dropleading --- .gitignore | 2 + Project.toml | 2 +- README.md | 8 +- src/mapreduce.jl | 162 +++--- src/productsplit.jl | 380 ++++++++++---- src/trees.jl | 3 +- test/tests.jl | 1177 ++++++++++++++++++++++++------------------- 7 files changed, 1048 insertions(+), 686 deletions(-) diff --git a/.gitignore b/.gitignore index ba39cc5..a8ff89a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ Manifest.toml +*.cov +coverage diff --git a/Project.toml b/Project.toml index 4090b50..c0bc97f 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ParallelUtilities" uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0" authors = ["Jishnu Bhattacharya "] -version = "0.7.2" +version = "0.7.3" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/README.md b/README.md index a3f4cf6..058bc37 100644 --- a/README.md +++ b/README.md @@ -328,7 +328,7 @@ julia> @btime last($ps_long) (3000, 3000, 1200) ``` -We may also compute the index of a particular set of parameters in the iterator. This is somewhat slower and is evaluated in `O(log(n))` time using a binary search. Whether or not the value exists in the list may however be evaluated in `O(1)` time. +We may evaluate whether or not a value exists in the list and its index in `O(1)` time. ```julia julia> val = (3,3,4) @@ -343,11 +343,11 @@ julia> localindex(ps,val) julia> val=(10,2,901); julia> @btime $val in $ps_long - 67.824 ns (0 allocations: 0 bytes) + 50.183 ns (0 allocations: 0 bytes) true -julia> @btime localindex($ps_long,$val) - 1.036 μs (0 allocations: 0 bytes) +julia> @btime localindex($ps_long, $val) + 104.513 ns (0 allocations: 0 bytes) 3010 ``` diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 80074de..e80b333 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -61,92 +61,131 @@ abstract type Ordering end struct Sorted <: Ordering end struct Unsorted <: Ordering end +abstract type ReductionNode end +struct TopTreeNode <: ReductionNode + rank :: Int +end +struct SubTreeNode <: ReductionNode + rank :: Int +end + function reducedvalue(freduce::Function,rank, + pipe::BranchChannel,ifsorted::Ordering) + + reducedvalue(freduce, + rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank), + pipe,ifsorted) +end + +function reducedvalue(freduce::Function,node::SubTreeNode, pipe::BranchChannel{Tmap,Tred},::Unsorted) where {Tmap,Tred} + self = take!(pipe.selfchannels.out) :: Tmap N = nchildren(pipe) - if rank > 0 - self = take!(pipe.selfchannels.out) :: Tmap - if N > 0 - reducechildren = freduce(take!(pipe.childrenchannels.out)::Tred for i=1:N)::Tred - res = freduce((reducechildren, self)) :: Tred - elseif N == 0 - res = freduce((self,)) :: Tred - end - else - if N > 0 - res = freduce(take!(pipe.childrenchannels.out)::Tred for i=1:N)::Tred - elseif N == 0 - # N == 0 && rank <= 0 - # shouldn't reach this - error("nodes with rank <=0 must have children") + vals = Vector{Tred}(undef,N+1) + @sync begin + @async vals[1] = freduce([self]) :: Tred + @async for i=1:N + vals[i+1] = take!(pipe.childrenchannels.out)::Tred end end - return res + + freduce(vals) end +function reducedvalue(freduce::Function,node::TopTreeNode, + pipe::BranchChannel{<:Any,Tred},::Unsorted) where {Tred} -function reducedvalue(freduce::Function,rank, + N = nchildren(pipe) + if N == 0 + # shouldn't reach this + error("Nodes on the top tree must have children") + end + vals = Vector{Tred}(undef,N) + for i=1:N + vals[i] = take!(pipe.childrenchannels.out)::Tred + end + + freduce(vals) +end + +function reducedvalue(freduce::Function,node::SubTreeNode, pipe::BranchChannel{Tmap,Tred},::Sorted) where {Tmap,Tred} + rank = node.rank N = nchildren(pipe) leftchild = N > 0 - selfvalpresent = rank > 0 - vals = Vector{Tred}(undef,N + selfvalpresent) + vals = Vector{Tred}(undef,N + 1) @sync begin @async begin - if selfvalpresent - selfval = take!(pipe.selfchannels.out)::Tmap - selfvalred = freduce((value(selfval),)) - pv = pval(rank,selfvalred) - ind = selfvalpresent + leftchild - vals[ind] = pv - end + selfval = take!(pipe.selfchannels.out)::Tmap + selfvalred = freduce((value(selfval),)) + pv = pval(rank,selfvalred) + ind = leftchild + 1 + vals[ind] = pv end - @async begin - if selfvalpresent - for i=1:N - pv = take!(pipe.childrenchannels.out) :: Tred - shift = pv.rank > rank ? 1 : -1 - ind = shift + leftchild + 1 - vals[ind] = pv - end - else - for i=1:N - pv = take!(pipe.childrenchannels.out) :: Tred - vals[i] = pv - end - sort!(vals,by=pv->pv.rank) - end + @async for i=1:N + pv = take!(pipe.childrenchannels.out) :: Tred + shift = pv.rank > rank ? 1 : -1 + ind = shift + leftchild + 1 + vals[ind] = pv end end Tred(rank,freduce(value(v) for v in vals)) end +function reducedvalue(freduce::Function,node::TopTreeNode, + pipe::BranchChannel{<:Any,Tred},::Sorted) where {Tred} + + rank = node.rank + N = nchildren(pipe) + leftchild = N > 0 + @assert leftchild "Nodes on the top tree must have children" + vals = Vector{Tred}(undef,N) + for i=1:N + pv = take!(pipe.childrenchannels.out) :: Tred + vals[i] = pv + end + sort!(vals,by=pv->pv.rank) + + Tred(rank,freduce(value(v) for v in vals)) +end function indicatereduceprogress!(::Nothing,rank) end function indicatereduceprogress!(progress::RemoteChannel,rank) put!(progress,(false,true,rank)) end -function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel{Tmap,Tred}, - ifsort::Ordering,progress::Union{Nothing,RemoteChannel}) where {Tmap,Tred} +function reduceTreeNode(freduce::Function,rank,pipe::BranchChannel, + ifsort::Ordering,progress) + + reduceTreeNode(freduce, + rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank), + pipe,ifsort,progress) +end + +function checkerror(::SubTreeNode,pipe::BranchChannel) + selferr = take!(pipe.selfchannels.err) + childrenerr = any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe)) + selferr || childrenerr +end +function checkerror(::TopTreeNode,pipe::BranchChannel) + any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe)) +end + +function reduceTreeNode(freduce::Function,node::ReductionNode, + pipe::BranchChannel{<:Any,Tred}, + ifsort::Ordering,progress::Union{Nothing,RemoteChannel}) where {Tred} # This function that communicates with the parent and children # Start by checking if there is any error locally in the map, # and if there's none then check if there are any errors on the children - if rank > 0 - anyerr = take!(pipe.selfchannels.err) - else - anyerr = false - end - anyerr = anyerr || - any(take!(pipe.childrenchannels.err) for i=1:nchildren(pipe)) - + anyerr = checkerror(node,pipe) + rank = node.rank # Evaluate the reduction only if there's no error # In either case push the error flag to the parent if !anyerr try - res = reducedvalue(freduce,rank,pipe,ifsort) :: Tred + res = reducedvalue(freduce,node,pipe,ifsort) :: Tred put!(pipe.parentchannels.out,res) put!(pipe.parentchannels.err,false) indicatereduceprogress!(progress,rank) @@ -170,7 +209,7 @@ function return_unless_error(r::RemoteChannelContainer) end end -@inline function return_unless_error(b::BranchChannel) +function return_unless_error(b::BranchChannel) return_unless_error(b.parentchannels) end @@ -193,19 +232,22 @@ function pmapreduceworkers(fmap::Function,freduce::Function,iterators::Tuple, for (ind,mypipe) in enumerate(branches) p = mypipe.p - rank = ind - extrareducenodes - if rank > 0 + ind_reduced = ind - extrareducenodes + rank = ind_reduced + if ind_reduced > 0 iterable_on_proc = ProductSplit(iterators,num_workers_active,rank) @spawnat p mapTreeNode(fmap,iterable_on_proc,rank,mypipe, - ifelse(showprogress,progresschannel,nothing), + showprogress ? progresschannel : nothing, args...;kwargs...) - @spawnat p reduceTreeNode(freduce,rank,mypipe,ord, - ifelse(showprogress,progresschannel,nothing)) + @spawnat p reduceTreeNode(freduce,SubTreeNode(rank), + mypipe,ord, + showprogress ? progresschannel : nothing) else - @spawnat p reduceTreeNode(freduce,rank,mypipe,ord, - ifelse(showprogress,progresschannel,nothing)) + @spawnat p reduceTreeNode(freduce,TopTreeNode(rank), + mypipe,ord, + showprogress ? progresschannel : nothing) end end diff --git a/src/productsplit.jl b/src/productsplit.jl index faba8e8..182f83f 100644 --- a/src/productsplit.jl +++ b/src/productsplit.jl @@ -1,3 +1,6 @@ +abstract type AbstractConstrainedProduct{T,N} end +Base.eltype(::AbstractConstrainedProduct{T}) where {T} = T + """ ProductSplit{T,N,Q} @@ -6,8 +9,10 @@ reverse-lexicographic order. The ranges need to be strictly increasing. Given `N` ranges, each element returned by the iterator will be a tuple of length `N` with one element from each range. + +See also: [`ProductSection`](@ref) """ -struct ProductSplit{T,N,Q} +struct ProductSplit{T,N,Q} <: AbstractConstrainedProduct{T,N} iterators :: Q togglelevels :: NTuple{N,Int} np :: Int @@ -23,12 +28,44 @@ struct ProductSplit{T,N,Q} Q = typeof(iterators) # Ensure that all the iterators are strictly increasing - all(x->step(x)>0,iterators) || throw(DecreasingIteratorError()) + all(x->step(x)>0,iterators) || + throw(ArgumentError("all the iterators need to be strictly increasing")) new{T,N,Q}(iterators,togglelevels,np,p,firstind,lastind) end end -Base.eltype(::ProductSplit{T}) where {T} = T + +""" + ProductSection{T,N,Q} + +Iterator that loops over a specified section of the +outer product of the ranges provided in +reverse-lexicographic order. The ranges need to be strictly +increasing. Given `N` ranges, +each element returned by the iterator will be +a tuple of length `N` with one element from each range. + +See also: [`ProductSplit`](@ref) +""" +struct ProductSection{T,N,Q} <: AbstractConstrainedProduct{T,N} + iterators :: Q + togglelevels :: NTuple{N,Int} + firstind :: Int + lastind :: Int + + function ProductSection(iterators::Tuple{Vararg{AbstractRange,N}},togglelevels::NTuple{N,Int}, + firstind::Int,lastind::Int) where {N} + + T = Tuple{eltype.(iterators)...} + Q = typeof(iterators) + + # Ensure that all the iterators are strictly increasing + all(x->step(x)>0,iterators) || + throw(ArgumentError("all the iterators need to be strictly increasing")) + + new{T,N,Q}(iterators,togglelevels,firstind,lastind) + end +end function mwerepr(ps::ProductSplit) "ProductSplit("*repr(ps.iterators)*","*repr(ps.np)*","*repr(ps.p)*")" @@ -41,8 +78,8 @@ function _cumprod(len::Tuple) (0,_cumprod(first(len),Base.tail(len))...) end -@inline _cumprod(::Int,::Tuple{}) = () -function _cumprod(n::Int,tl::Tuple) +@inline _cumprod(::Integer,::Tuple{}) = () +function _cumprod(n::Integer, tl::Tuple) (n,_cumprod(n*first(tl),Base.tail(tl))...) end @@ -53,10 +90,10 @@ The total number of elements in the outer product of the ranges contained in `iterators`, equal to `prod(length.(iterators))` """ @inline ntasks(iterators::Tuple) = mapreduce(length,*,iterators) -@inline ntasks(ps::ProductSplit) = ntasks(ps.iterators) +@inline ntasks(ps::AbstractConstrainedProduct) = ntasks(ps.iterators) """ - ProductSplit(iterators, np::Int, p::Int) + ProductSplit(iterators::Tuple{Vararg{AbstractRange}}, np::Integer, p::Integer) Construct a `ProductSplit` iterator that represents the outer product of the iterators split over `np` workers, with this instance reprsenting @@ -75,7 +112,7 @@ julia> ProductSplit((1:2,4:5), 2, 2) |> collect (2, 5) ``` """ -function ProductSplit(iterators::Tuple{Vararg{AbstractRange}},np::Int,p::Int) +function ProductSplit(iterators::Tuple{Vararg{AbstractRange}},np::Integer,p::Integer) len = size.(iterators,1) Nel = prod(len) togglelevels = _cumprod(len) @@ -84,52 +121,134 @@ function ProductSplit(iterators::Tuple{Vararg{AbstractRange}},np::Int,p::Int) lastind = d*p + min(r,p) ProductSplit(iterators,togglelevels,np,p,firstind,lastind) end -ProductSplit(::Tuple{},::Int,::Int) = throw(ArgumentError("Need at least one iterator")) +ProductSplit(::Tuple{},::Integer,::Integer) = throw(ArgumentError("Need at least one iterator")) + +""" + ProductSection(iterators::Tuple{Vararg{AbstractRange}}, inds::AbstractUnitRange) + +Construct a `ProductSection` iterator that represents a view of the outer product +of the ranges provided in `iterators`, with the range of indices in the view being +specified by `inds`. + +# Examples +```jldoctest +julia> ProductSection((1:3,4:6), 5:8) |> collect +4-element Array{Tuple{Int64,Int64},1}: + (2, 5) + (3, 5) + (1, 6) + (2, 6) -Base.isempty(ps::ProductSplit) = (ps.firstind > ps.lastind) +julia> collect(ProductSection((1:3,4:6), 5:8)) == collect(Iterators.product(1:3,4:6))[5:8] +true +``` +""" +function ProductSection(iterators::Tuple{Vararg{AbstractRange}}, + inds::AbstractUnitRange) -@inline Base.@propagate_inbounds function Base.first(ps::ProductSplit) + isempty(inds) && throw(ArgumentError("range of indices must not be empty")) + firstind,lastind = extrema(inds) + + len = size.(iterators,1) + Nel = prod(len) + 1 <= firstind || throw( + ArgumentError("the range of indices must start from a number ≥ 1")) + lastind <= Nel || throw( + ArgumentError("the maximum index must be less than or equal to the total number of elements = $Nel")) + togglelevels = _cumprod(len) + ProductSection(iterators,togglelevels,firstind,lastind) +end +function ProductSection(::Tuple{},::AbstractUnitRange) + throw(ArgumentError("Need at least one iterator")) +end + +Base.isempty(ps::AbstractConstrainedProduct) = (ps.firstind > ps.lastind) + +@inline Base.@propagate_inbounds function Base.first(ps::AbstractConstrainedProduct) isempty(ps) ? nothing : _first(ps.iterators,childindex(ps,ps.firstind)...) end -@inline Base.@propagate_inbounds function _first(t::Tuple,ind::Int,rest::Int...) +@inline Base.@propagate_inbounds function _first(t::Tuple,ind::Integer,rest::Integer...) @boundscheck (1 <= ind <= length(first(t))) || throw(BoundsError(first(t),ind)) (@inbounds first(t)[ind],_first(Base.tail(t),rest...)...) end @inline _first(::Tuple{}) = () -@inline Base.@propagate_inbounds function Base.last(ps::ProductSplit) +@inline Base.@propagate_inbounds function Base.last(ps::AbstractConstrainedProduct) isempty(ps) ? nothing : _last(ps.iterators,childindex(ps,ps.lastind)...) end -@inline Base.@propagate_inbounds function _last(t::Tuple,ind::Int,rest::Int...) +@inline Base.@propagate_inbounds function _last(t::Tuple,ind::Integer,rest::Integer...) @boundscheck (1 <= ind <= length(first(t))) || throw(BoundsError(first(t),ind)) (@inbounds first(t)[ind],_last(Base.tail(t),rest...)...) end @inline _last(::Tuple{}) = () -@inline Base.length(ps::ProductSplit) = ps.lastind - ps.firstind + 1 +@inline Base.length(ps::AbstractConstrainedProduct) = ps.lastind - ps.firstind + 1 + +@inline Base.firstindex(ps::AbstractConstrainedProduct) = 1 +@inline Base.lastindex(ps::AbstractConstrainedProduct) = ps.lastind - ps.firstind + 1 + +""" + childindex(ps::AbstractConstrainedProduct, ind) + +Return a tuple containing the indices of the individual iterators +corresponding to the element that is present at index `ind` in the +outer product of the iterators. + +# Examples +```jldoctest +julia> ps = ProductSplit((1:5,2:4,1:3),7,1); + +julia> childindex(ps, 6) +(1, 2, 1) -@inline Base.firstindex(ps::ProductSplit) = 1 -@inline Base.lastindex(ps::ProductSplit) = ps.lastind - ps.firstind + 1 +julia> v = collect(Iterators.product(1:5, 2:4, 1:3)); + +julia> getindex.(ps.iterators, childindex(ps,6)) == v[6] +true +``` -@inline function childindex(ps::ProductSplit,ind::Int) +See also: [`childindexshifted`](@ref) +""" +@inline function childindex(ps::AbstractConstrainedProduct, ind) tl = reverse(Base.tail(ps.togglelevels)) reverse(childindex(tl,ind)) end -@inline function childindex(tl::Tuple,ind::Int) +@inline function childindex(tl::Tuple, ind) t = first(tl) k = div(ind-1,t) (k+1,childindex(Base.tail(tl),ind-k*t)...) end # First iterator gets the final remainder -@inline childindex(::Tuple{},ind::Int) = (ind,) +@inline childindex(::Tuple{}, ind) = (ind,) + +""" + childindexshifted(ps::AbstractConstrainedProduct, ind) + +Return a tuple containing the indices in the individual iterators +given an index of a `AbstractConstrainedProduct`. -@inline childindexshifted(ps::ProductSplit,ind::Int) = childindex(ps, (ind - 1) + ps.firstind) +# Examples +```jldoctest +julia> ps = ProductSplit((1:5,2:4,1:3), 7, 3); + +julia> childindexshifted(ps,3) +(2, 1, 2) + +julia> getindex.(ps.iterators,childindexshifted(ps,3)) == ps[3] +true +``` -@inline Base.@propagate_inbounds function Base.getindex(ps::ProductSplit,ind::Int) +See also: [`childindex`](@ref) +""" +@inline function childindexshifted(ps::AbstractConstrainedProduct, ind) + childindex(ps, (ind - 1) + ps.firstind) +end + +@inline Base.@propagate_inbounds function Base.getindex(ps::AbstractConstrainedProduct, ind) @boundscheck 1 <= ind <= length(ps) || throw(BoundsError(ps,ind)) _getindex(ps,childindexshifted(ps, ind)...) end @@ -137,19 +256,19 @@ end # it's not clear if the single index is for the ProductSplit or the child iterator # This method asserts that the number of indices is correct -@inline Base.@propagate_inbounds function _getindex(ps::ProductSplit{<:Any,N}, - inds::Vararg{Int,N}) where {N} +@inline Base.@propagate_inbounds function _getindex(ps::AbstractConstrainedProduct{<:Any,N}, + inds::Vararg{Integer,N}) where {N} _getindex(ps.iterators,inds...) end -@inline function _getindex(t::Tuple,ind::Int,rest::Int...) +@inline function _getindex(t::Tuple,ind::Integer,rest::Integer...) @boundscheck (1 <= ind <= length(first(t))) || throw(BoundsError(first(t),ind)) (@inbounds first(t)[ind],_getindex(Base.tail(t),rest...)...) end -@inline _getindex(::Tuple{},rest::Int...) = () +@inline _getindex(::Tuple{},::Integer...) = () -function Base.iterate(ps::ProductSplit{T},state=(first(ps),1)) where {T} +function Base.iterate(ps::AbstractConstrainedProduct{T},state=(first(ps),1)) where {T} el,n = state if n > length(ps) @@ -164,14 +283,14 @@ function Base.iterate(ps::ProductSplit{T},state=(first(ps),1)) where {T} (el::T,next_state) end -@inline Base.@propagate_inbounds function _firstlastalongdim(ps::ProductSplit{<:Any,N},dim::Int, +@inline Base.@propagate_inbounds function _firstlastalongdim(ps::AbstractConstrainedProduct{<:Any,N},dim, firstindchild::Tuple=childindex(ps,ps.firstind), lastindchild::Tuple=childindex(ps,ps.lastind)) where {N} _firstlastalongdim(ps.iterators,dim,firstindchild,lastindchild) end -@inline Base.@propagate_inbounds function _firstlastalongdim(iterators::Tuple{Vararg{Any,N}},dim::Int, +@inline Base.@propagate_inbounds function _firstlastalongdim(iterators::Tuple{Vararg{Any,N}},dim, firstindchild::Tuple,lastindchild::Tuple) where {N} @boundscheck (1 <= dim <= N) || throw(BoundsError(iterators,dim)) @@ -187,14 +306,14 @@ end (first_iter,last_iter) end -function _checkrollover(ps::ProductSplit{<:Any,N},dim::Int, +function _checkrollover(ps::AbstractConstrainedProduct{<:Any,N},dim, firstindchild::Tuple=childindex(ps,ps.firstind), lastindchild::Tuple=childindex(ps,ps.lastind)) where {N} _checkrollover(ps.iterators,dim,firstindchild,lastindchild) end -function _checkrollover(t::Tuple{Vararg{Any,N}},dim::Int, +function _checkrollover(t::Tuple{Vararg{Any,N}},dim, firstindchild::Tuple,lastindchild::Tuple) where {N} if dim > 0 @@ -214,16 +333,8 @@ function _checknorollover(t,firstindchild,lastindchild) end _checknorollover(::Tuple{},::Tuple{},::Tuple{}) = true -@inline function Base.maximum(ps::ProductSplit{<:Any,1}) - isempty(ps) && return nothing - lastindchild = childindex(ps,ps.lastind) - @inbounds lic_dim = lastindchild[1] - @inbounds iter = ps.iterators[1] - iter[lic_dim] -end - """ - maximum(ps::ProductSplit; dim::Int) + maximum(ps::ProductSplit; dim::Integer) Compute the maximum value of the range number `dim` that is contained in `ps`. @@ -244,7 +355,7 @@ julia> maximum(ps,dim=2) 4 ``` """ -@inline function Base.maximum(ps::ProductSplit{<:Any,N};dim::Int) where {N} +function Base.maximum(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N} @boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim)) @@ -270,16 +381,16 @@ julia> maximum(ps,dim=2) return v end -@inline function Base.minimum(ps::ProductSplit{<:Any,1}) +function Base.maximum(ps::AbstractConstrainedProduct{<:Any,1}) isempty(ps) && return nothing - firstindchild = childindex(ps,ps.firstind) - @inbounds fic_dim = firstindchild[1] + lastindchild = childindex(ps,ps.lastind) + @inbounds lic_dim = lastindchild[1] @inbounds iter = ps.iterators[1] - iter[fic_dim] + iter[lic_dim] end """ - minimum(ps::ProductSplit; dim::Int) + minimum(ps::ProductSplit; dim::Integer) Compute the minimum value of the range number `dim` that is contained in `ps`. @@ -300,7 +411,7 @@ julia> minimum(ps,dim=2) 4 ``` """ -@inline function Base.minimum(ps::ProductSplit{<:Any,N};dim::Int) where {N} +function Base.minimum(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N} @boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim)) @@ -326,19 +437,16 @@ julia> minimum(ps,dim=2) return v end -@inline function Base.extrema(ps::ProductSplit{<:Any,1}) +function Base.minimum(ps::AbstractConstrainedProduct{<:Any,1}) isempty(ps) && return nothing firstindchild = childindex(ps,ps.firstind) - lastindchild = childindex(ps,ps.lastind) @inbounds fic_dim = firstindchild[1] - @inbounds lic_dim = lastindchild[1] @inbounds iter = ps.iterators[1] - - (iter[fic_dim],iter[lic_dim]) + iter[fic_dim] end """ - extrema(ps::ProductSplit; dim::Int) + extrema(ps::ProductSplit; dim::Integer) Compute the minimum and maximum of the range number `dim` that is contained in `ps`. @@ -359,7 +467,7 @@ julia> extrema(ps,dim=2) (4, 4) ``` """ -@inline function Base.extrema(ps::ProductSplit{<:Any,N};dim::Int) where {N} +function Base.extrema(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N} @boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim)) @@ -384,8 +492,19 @@ julia> extrema(ps,dim=2) return v end +function Base.extrema(ps::AbstractConstrainedProduct{<:Any,1}) + isempty(ps) && return nothing + firstindchild = childindex(ps,ps.firstind) + lastindchild = childindex(ps,ps.lastind) + @inbounds fic_dim = firstindchild[1] + @inbounds lic_dim = lastindchild[1] + @inbounds iter = ps.iterators[1] + + (iter[fic_dim],iter[lic_dim]) +end + """ - extremadims(ps::ProductSplit) + extremadims(ps::AbstractConstrainedProduct) Compute the extrema of all the ranges contained in `ps`. @@ -402,15 +521,15 @@ julia> extremadims(ps) ((1, 2), (4, 4)) ``` """ -extremadims(ps::ProductSplit) = _extremadims(ps,1,ps.iterators) +extremadims(ps::AbstractConstrainedProduct) = _extremadims(ps,1,ps.iterators) -function _extremadims(ps::ProductSplit,dim::Int,iterators::Tuple) +function _extremadims(ps::AbstractConstrainedProduct,dim::Integer,iterators::Tuple) (extrema(ps;dim=dim),_extremadims(ps,dim+1,Base.tail(iterators))...) end -_extremadims(::ProductSplit,::Int,::Tuple{}) = () +_extremadims(::AbstractConstrainedProduct,::Integer,::Tuple{}) = () """ - extrema_commonlastdim(ps::ProductSplit) + extrema_commonlastdim(ps::AbstractConstrainedProduct) Return the reverse-lexicographic extrema of values taken from ranges contained in `ps`, where the pairs of ranges are constructed @@ -437,7 +556,7 @@ julia> extrema_commonlastdim(ps) ([(1, 2), (6, 2)], [(3, 3), (5, 3)]) ``` """ -function extrema_commonlastdim(ps::ProductSplit{<:Any,N}) where {N} +function extrema_commonlastdim(ps::AbstractConstrainedProduct{<:Any,N}) where {N} isempty(ps) && return nothing @@ -468,31 +587,72 @@ function extrema_commonlastdim(ps::ProductSplit{<:Any,N}) where {N} [(m,lastvar_min) for m in min_vals],[(m,lastvar_max) for m in max_vals] end -_infullrange(val::T,ps::ProductSplit{T}) where {T} = _infullrange(val,ps.iterators) +_infullrange(val::T,ps::AbstractConstrainedProduct{T}) where {T} = _infullrange(val,ps.iterators) function _infullrange(val,t::Tuple) first(val) in first(t) && _infullrange(Base.tail(val),Base.tail(t)) end @inline _infullrange(::Tuple{},::Tuple{}) = true -# This struct is just a wrapper to flip the tuples before comparing -struct ReverseLexicographicTuple{T} - t :: T +function c2l_rec(iprev, nprev, ax, inds) + i = searchsortedfirst(ax[1],inds[1]) + inew = iprev + (i-1)*nprev + n = nprev*length(ax[1]) + c2l_rec(inew, n, Base.tail(ax), Base.tail(inds)) end -Base.isless(a::ReverseLexicographicTuple{T},b::ReverseLexicographicTuple{T}) where {T} = reverse(a.t) < reverse(b.t) -Base.isequal(a::ReverseLexicographicTuple{T},b::ReverseLexicographicTuple{T}) where {T} = a.t == b.t +c2l_rec(i, n, ::Tuple{}, ::Tuple{}) = i + +_cartesiantolinear(ax, inds) = c2l_rec(1,1,ax,inds) + +""" + indexinproduct(iterators::Tuple{Vararg{AbstractRange,N}}, val::Tuple{Any,N}) where {N} + +Return the index of `val` in the outer product of `iterators`, +where `iterators` is a `Tuple` of increasing `AbstractRange`s. +Return nothing if `val` is not present. + +# Examples +```jldoctest +julia> iterators = (1:4, 1:3, 3:5); + +julia> val = (2, 2, 4); + +julia> ind = indexinproduct(iterators,val) +18 + +julia> collect(Iterators.product(iterators...))[ind] == val +true +``` +""" +function indexinproduct(iterators::Tuple{Vararg{AbstractRange,N}}, + val::Tuple{Vararg{Any,N}}) where {N} + + all(in.(val,iterators)) || return nothing -function Base.in(val::T,ps::ProductSplit{T}) where {T} + ax = axes.(iterators,1) + individual_inds = searchsortedfirst.(iterators,val) + + _cartesiantolinear(ax, individual_inds) +end + +indexinproduct(::Tuple{},::Tuple) = throw(ArgumentError("need at least one iterator")) + +function Base.in(val::T, ps::AbstractConstrainedProduct{T}) where {T} _infullrange(val,ps) || return false - val_lt = ReverseLexicographicTuple(val) - first_iter = ReverseLexicographicTuple(ps[1]) - last_iter = ReverseLexicographicTuple(ps[end]) + ind = indexinproduct(ps.iterators, val) + ps.firstind <= ind <= ps.lastind +end - first_iter <= val_lt <= last_iter +# This struct is just a wrapper to flip the tuples before comparing +struct ReverseLexicographicTuple{T} + t :: T end +Base.isless(a::ReverseLexicographicTuple{T},b::ReverseLexicographicTuple{T}) where {T} = reverse(a.t) < reverse(b.t) +Base.isequal(a::ReverseLexicographicTuple{T},b::ReverseLexicographicTuple{T}) where {T} = a.t == b.t + """ whichproc(iterators::Tuple, val::Tuple, np::Integer ) @@ -527,7 +687,7 @@ function whichproc(iterators, val, np::Integer) val_t = ReverseLexicographicTuple(val) while left <= right - mid = floor(Int,(left+right)/2) + mid = div(left+right, 2) ps = ProductSplit(iterators,np,mid) # If np is greater than the number of ntasks then it's possible @@ -571,7 +731,7 @@ julia> procrange_recast(iters, ps, 10) 3:4 ``` """ -function procrange_recast(iterators::Tuple, ps::ProductSplit, np_new::Integer) +function procrange_recast(iterators::Tuple, ps::AbstractConstrainedProduct, np_new::Integer) if isempty(ps) return 0:-1 # empty range @@ -593,7 +753,7 @@ function procrange_recast(iterators::Tuple, ps::ProductSplit, np_new::Integer) return procid_start:procid_end end -function procrange_recast(ps::ProductSplit, np_new::Integer) +function procrange_recast(ps::AbstractConstrainedProduct, np_new::Integer) procrange_recast(ps.iterators,ps,np_new) end @@ -618,37 +778,19 @@ julia> localindex(ps,(3,9)) 2 ``` """ -function localindex(ps::ProductSplit{T},val::T) where {T} - # Can carry out a binary search +function localindex(ps::AbstractConstrainedProduct{T},val::T) where {T} (isempty(ps) || val ∉ ps) && return nothing - left,right = 1,length(ps) - - val == first(ps) && return left - val == last(ps) && return right - - val_t = ReverseLexicographicTuple(val) - - while left <= right - mid = floor(Int,(left+right)/2) - val_mid = @inbounds ps[mid] - - if val_t < ReverseLexicographicTuple(val_mid) - right = mid - 1 - elseif val_t > ReverseLexicographicTuple(val_mid) - left = mid + 1 - else - return mid - end - end + indflat = indexinproduct(ps.iterators, val) + indflat - ps.firstind + 1 end -localindex(::ProductSplit, ::Nothing) = nothing +localindex(::AbstractConstrainedProduct, ::Nothing) = nothing -function localindex(iterators::Tuple, val::Tuple, np::Integer,procid::Integer) - ps = ProductSplit(iterators,np,procid) - localindex(ps,val) +function localindex(iterators::Tuple, val::Tuple, np::Integer, p::Integer) + ps = ProductSplit(iterators, np, p) + localindex(ps, val) end """ @@ -679,4 +821,44 @@ function whichproc_localindex(iterators::Tuple, val::Tuple, np::Integer) procid = whichproc(iterators,val,np) index = localindex(iterators,val,np,procid) return procid,index +end + +################################################################# + +""" + dropleading(ps::AbstractConstrainedProduct) + +Return a `ProductSection` leaving out the first iterator contained in `ps`. +The range of values of the remaining iterators in the +resulting `ProductSection` will be the same as in `ps`. + +# Examples +```jldoctest +julia> ps = ProductSplit((1:5,2:4,1:3),7,3); + +julia> collect(ps) +7-element Array{Tuple{Int64,Int64,Int64},1}: + (5, 4, 1) + (1, 2, 2) + (2, 2, 2) + (3, 2, 2) + (4, 2, 2) + (5, 2, 2) + (1, 3, 2) + +julia> dropleading(ps) |> collect +3-element Array{Tuple{Int64,Int64},1}: + (4, 1) + (2, 2) + (3, 2) +``` +""" +function dropleading(ps::AbstractConstrainedProduct) + isempty(ps) && throw(ArgumentError("need at least one iterator")) + iterators = Base.tail(ps.iterators) + first_element = Base.tail(first(ps)) + last_element = Base.tail(last(ps)) + firstind = indexinproduct(iterators, first_element) + lastind = indexinproduct(iterators, last_element) + ProductSection(iterators,firstind:lastind) end \ No newline at end of file diff --git a/src/trees.jl b/src/trees.jl index 4ee703b..81c7bcd 100644 --- a/src/trees.jl +++ b/src/trees.jl @@ -32,8 +32,7 @@ struct SequentialBinaryTree{T<:AbstractVector{<:Integer}} <: BinaryTree (N >=1) || throw(DomainError(N, "need at least one node to create a binary tree")) - h = floor(Int,log2(N)) # Number of levels of the tree (starting from zero) - Ninternalnodes = (1 << h) - 1 + Ninternalnodes = prevpow(2,N) - 1 Nleaf = N - Ninternalnodes Nonechildinternalnodes = (Ninternalnodes > 0) ? rem(Nleaf,2) : 0 twochildendind = div(N-1, 2) diff --git a/test/tests.jl b/test/tests.jl index 68b2459..a43755a 100644 --- a/test/tests.jl +++ b/test/tests.jl @@ -1,7 +1,6 @@ -using DataStructures -using Test - @everywhere begin + using DataStructures + using Test using ParallelUtilities import ParallelUtilities: BinaryTreeNode, RemoteChannelContainer, BranchChannel, Sorted, Unsorted, Ordering, pval, value, reducedvalue, reduceTreeNode, mapTreeNode, @@ -9,7 +8,8 @@ using Test SegmentedOrderedBinaryTree, parentnoderank, nchildren, maybepvalput!, createbranchchannels, nworkersactive, workersactive, - procs_node, leafrankfoldedtree + procs_node, leafrankfoldedtree, + TopTreeNode, SubTreeNode, ProductSection, indexinproduct, dropleading end macro testsetwithinfo(str,ex) @@ -28,356 +28,393 @@ function showworkernumber(ind,nw) print(endchar) end -@testsetwithinfo "ProductSplit" begin - - various_iters = [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6), - (1:2,Base.OneTo(4),1:3:10)] +@testsetwithinfo "AbstractConstrainedProduct" begin - function split_across_processors_iterators(arr::Iterators.ProductIterator,num_procs,proc_id) + various_iters = [(1:10,),(1:10,4:6),(1:10,4:6,1:4),(1:2:10,4:1:6), + (1:2,Base.OneTo(4),1:3:10),(1:0.5:3,2:4)] - num_tasks = length(arr); + @testsetwithinfo "ProductSplit" begin - num_tasks_per_process,num_tasks_leftover = divrem(num_tasks,num_procs) + function split_across_processors_iterators(arr::Iterators.ProductIterator,num_procs,proc_id) - num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 ); - task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover,proc_id-1) + 1; + num_tasks = length(arr); - Iterators.take(Iterators.drop(arr,task_start-1),num_tasks_on_proc) - end + num_tasks_per_process,num_tasks_leftover = divrem(num_tasks,num_procs) - function split_product_across_processors_iterators(arrs_tuple,num_procs,proc_id) - split_across_processors_iterators(Iterators.product(arrs_tuple...),num_procs,proc_id) - end + num_tasks_on_proc = num_tasks_per_process + (proc_id <= mod(num_tasks,num_procs) ? 1 : 0 ); + task_start = num_tasks_per_process*(proc_id-1) + min(num_tasks_leftover,proc_id-1) + 1; - @testset "Constructor" begin - - function checkPSconstructor(iters,npmax=10) - ntasks_total = prod(length.(iters)) - for np = 1:npmax, p = 1:np - ps = ProductSplit(iters,np,p) - @test eltype(ps) == Tuple{eltype.(iters)...} - @test collect(ps) == collect(split_product_across_processors_iterators(iters,np,p)) - @test ntasks(ps) == ntasks_total - @test ntasks(ps.iterators) == ntasks_total - @test eltype(ps) == Tuple{map(eltype,iters)...} - end - - @test_throws ParallelUtilities.ProcessorNumberError ProductSplit(iters,npmax,npmax+1) - end - - @testset "0D" begin - @test_throws ArgumentError ProductSplit((),2,1) - end - - @testset "cumprod" begin - @test ParallelUtilities._cumprod(1,()) == () - @test ParallelUtilities._cumprod(1,(2,)) == (1,) - @test ParallelUtilities._cumprod(1,(2,3)) == (1,2) - @test ParallelUtilities._cumprod(1,(2,3,4)) == (1,2,6) - end - - @testset "1D" begin - iters = (1:10,) - checkPSconstructor(iters) - end - @testset "2D" begin - iters = (1:10,4:6) - checkPSconstructor(iters) - end - @testset "3D" begin - iters = (1:10,4:6,1:4) - checkPSconstructor(iters) - end - @testset "steps" begin - iters = (1:2:10,4:1:6) - checkPSconstructor(iters) - iters = (10:-1:10,6:-2:0) - @test_throws ParallelUtilities.DecreasingIteratorError ProductSplit(iters,3,2) - end - @testset "mixed" begin - for iters in [(1:2,4:2:6),(1:2,Base.OneTo(4),1:3:10)] - checkPSconstructor(iters) - end + Iterators.take(Iterators.drop(arr,task_start-1),num_tasks_on_proc) end - @testset "empty" begin - iters = (1:1,) - ps = ProductSplit(iters,10,2) - @test isempty(ps) - @test length(ps) == 0 + function split_product_across_processors_iterators(arrs_tuple,num_procs,proc_id) + split_across_processors_iterators(Iterators.product(arrs_tuple...),num_procs,proc_id) end - @testset "first and last ind" begin - for iters in [(1:10,),(1:2,Base.OneTo(4),1:3:10)] - ps = ProductSplit(iters,2,1) - @test firstindex(ps) == 1 - @test ps.firstind == 1 - @test ps.lastind == div(ntasks(iters),2) - @test lastindex(ps) == div(ntasks(iters),2) - @test lastindex(ps) == length(ps) - ps = ProductSplit(iters,2,2) - @test ps.firstind == div(ntasks(iters),2) + 1 - @test firstindex(ps) == 1 - @test ps.lastind == ntasks(iters) - @test lastindex(ps) == length(ps) - - for np in ntasks(iters)+1:ntasks(iters)+10, - p in ntasks(iters)+1:np - - ps = ProductSplit(iters,np,p) - @test ps.firstind == ntasks(iters) + 1 - @test ps.lastind == ntasks(iters) - end - end - end + @testset "Constructor" begin + + function checkPSconstructor(iters,npmax=10) + ntasks_total = prod(length.(iters)) + for np = 1:npmax, p = 1:np + ps = ProductSplit(iters,np,p) + @test eltype(ps) == Tuple{eltype.(iters)...} + @test collect(ps) == collect(split_product_across_processors_iterators(iters,np,p)) + @test ntasks(ps) == ntasks_total + @test ntasks(ps.iterators) == ntasks_total + @test eltype(ps) == Tuple{map(eltype,iters)...} + end + + @test_throws ParallelUtilities.ProcessorNumberError ProductSplit(iters,npmax,npmax+1) + end + + @testset "0D" begin + @test_throws ArgumentError ProductSplit((),2,1) + end + + @testset "cumprod" begin + @test ParallelUtilities._cumprod(1,()) == () + @test ParallelUtilities._cumprod(1,(2,)) == (1,) + @test ParallelUtilities._cumprod(1,(2,3)) == (1,2) + @test ParallelUtilities._cumprod(1,(2,3,4)) == (1,2,6) + end + + @testset "1D" begin + iters = (1:10,) + checkPSconstructor(iters) + end + @testset "2D" begin + iters = (1:10,4:6) + checkPSconstructor(iters) + end + @testset "3D" begin + iters = (1:10,4:6,1:4) + checkPSconstructor(iters) + end + @testset "steps" begin + iters = (1:2:10,4:1:6) + checkPSconstructor(iters) + iters = (10:-1:10,6:-2:0) + @test_throws ArgumentError ProductSplit(iters,3,2) + end + @testset "mixed" begin + for iters in [(1:2,4:2:6),(1:2,Base.OneTo(4),1:3:10)] + checkPSconstructor(iters) + end + end + + @testset "empty" begin + iters = (1:1,) + ps = ProductSplit(iters,10,2) + @test isempty(ps) + @test length(ps) == 0 + end + + @testset "first and last ind" begin + for iters in [(1:10,),(1:2,Base.OneTo(4),1:3:10)] + ps = ProductSplit(iters,2,1) + @test firstindex(ps) == 1 + @test ps.firstind == 1 + @test ps.lastind == div(ntasks(iters),2) + @test lastindex(ps) == div(ntasks(iters),2) + @test lastindex(ps) == length(ps) + ps = ProductSplit(iters,2,2) + @test ps.firstind == div(ntasks(iters),2) + 1 + @test firstindex(ps) == 1 + @test ps.lastind == ntasks(iters) + @test lastindex(ps) == length(ps) + + for np in ntasks(iters)+1:ntasks(iters)+10, + p in ntasks(iters)+1:np + + ps = ProductSplit(iters,np,p) + @test ps.firstind == ntasks(iters) + 1 + @test ps.lastind == ntasks(iters) + end + end + end - @testset "summary" begin - ps = ProductSplit((1:3, 4:5:19),3,2) - reprstr = "ProductSplit("*repr((1:3, 4:5:19))*",3,2)" - @test ParallelUtilities.mwerepr(ps) == reprstr + @testset "summary" begin + ps = ProductSplit((1:3, 4:5:19),3,2) + reprstr = "ProductSplit("*repr((1:3, 4:5:19))*",3,2)" + @test ParallelUtilities.mwerepr(ps) == reprstr - summarystr = "$(length(ps))-element "*reprstr - @test ParallelUtilities.summary(ps) == summarystr + summarystr = "$(length(ps))-element "*reprstr + @test ParallelUtilities.summary(ps) == summarystr - io = IOBuffer() - summary(io,ps) - @test String(take!(io)) == summarystr + io = IOBuffer() + summary(io,ps) + @test String(take!(io)) == summarystr + end end - end - @testset "firstlast" begin - @testset "first" begin + @testset "firstlast" begin + @testset "first" begin - @test ParallelUtilities._first(()) == () + @test ParallelUtilities._first(()) == () - for iters in various_iters,np=1:5ntasks(iters) + for iters in various_iters,np=1:5ntasks(iters) - ps = ProductSplit(iters,np,1) - @test first(ps) == ( isempty(ps) ? nothing : map(first,iters) ) - end + ps = ProductSplit(iters,np,1) + @test first(ps) == ( isempty(ps) ? nothing : map(first,iters) ) + end - iters = (1:1,) - ps = ProductSplit(iters,2ntasks(iters),ntasks(iters)+1) # must be empty - @test first(ps) === nothing - end - @testset "last" begin + iters = (1:1,) + ps = ProductSplit(iters,2ntasks(iters),ntasks(iters)+1) # must be empty + @test first(ps) === nothing + end + @testset "last" begin - @test ParallelUtilities._last(()) == () + @test ParallelUtilities._last(()) == () - for iters in various_iters,np=1:5ntasks(iters) + for iters in various_iters,np=1:5ntasks(iters) - ps = ProductSplit(iters,np,np) - @test last(ps) == ( isempty(ps) ? nothing : map(last,iters) ) - end + ps = ProductSplit(iters,np,np) + @test last(ps) == ( isempty(ps) ? nothing : map(last,iters) ) + end - iters = (1:1,) - ps = ProductSplit(iters,2length(iters[1]),length(iters[1])+1) # must be empty - @test last(ps) === nothing + iters = (1:1,) + ps = ProductSplit(iters,2length(iters[1]),length(iters[1])+1) # must be empty + @test last(ps) === nothing + end end - end - - @testset "extrema" begin - - @testset "min max extrema" begin - function checkPSextrema(iters,fn::Function,npmax=10) - for np = 1:npmax, p = 1:np - ps = ProductSplit(iters,np,p) - pcol = collect(ps) - for dim in 1:length(iters) - @test begin - res = fn(ps,dim=dim) == fn(x[dim] for x in pcol) - if !res - println(summary(ps)) - end - res - end - end - end - end - - for iters in various_iters, fn in [maximum,minimum,extrema] - checkPSextrema(iters,fn) - end - @test minimum(ProductSplit((1:5,),2,1)) == 1 - @test maximum(ProductSplit((1:5,),2,1)) == 3 - @test extrema(ProductSplit((1:5,),2,1)) == (1,3) - - @test minimum(ProductSplit((1:5,),2,2)) == 4 - @test maximum(ProductSplit((1:5,),2,2)) == 5 - @test extrema(ProductSplit((1:5,),2,2)) == (4,5) - end - - @testset "extremadims" begin - ps = ProductSplit((1:10,),2,1) - @test ParallelUtilities._extremadims(ps,1,()) == () - for iters in various_iters - - dims = length(iters) - for np = 1:5ntasks(iters), proc_id = 1:np - ps = ProductSplit(iters,np,proc_id) - if isempty(ps) - @test extremadims(ps) == Tuple(nothing for i=1:dims) - else - ext = Tuple(map(extrema,zip(collect(ps)...))) - @test extremadims(ps) == ext - end - end - end - end + @testset "extrema" begin + + @testset "min max extrema" begin + function checkPSextrema(iters,fn::Function,npmax=10) + for np = 1:npmax, p = 1:np + ps = ProductSplit(iters,np,p) + pcol = collect(ps) + for dim in 1:length(iters) + @test begin + res = fn(ps,dim=dim) == fn(x[dim] for x in pcol) + if !res + println(summary(ps)) + end + res + end + end + end + end + + for iters in various_iters, fn in [maximum,minimum,extrema] + checkPSextrema(iters,fn) + end + + @test minimum(ProductSplit((1:5,),2,1)) == 1 + @test maximum(ProductSplit((1:5,),2,1)) == 3 + @test extrema(ProductSplit((1:5,),2,1)) == (1,3) + + @test minimum(ProductSplit((1:5,),2,2)) == 4 + @test maximum(ProductSplit((1:5,),2,2)) == 5 + @test extrema(ProductSplit((1:5,),2,2)) == (4,5) + end + + @testset "extremadims" begin + ps = ProductSplit((1:10,),2,1) + @test ParallelUtilities._extremadims(ps,1,()) == () + for iters in various_iters + + dims = length(iters) + for np = 1:5ntasks(iters), proc_id = 1:np + ps = ProductSplit(iters,np,proc_id) + if isempty(ps) + @test extremadims(ps) == Tuple(nothing for i=1:dims) + else + ext = Tuple(map(extrema,zip(collect(ps)...))) + @test extremadims(ps) == ext + end + end + end + end + + @testset "extrema_commonlastdim" begin + iters = (1:10,4:6,1:4) + ps = ProductSplit(iters,37,8) + @test extrema_commonlastdim(ps) == ([(9,1),(6,1)],[(2,2),(4,2)]) + ps = ProductSplit(iters,ntasks(iters)+1,ntasks(iters)+1) + @test extrema_commonlastdim(ps) === nothing + end + end - @testset "extrema_commonlastdim" begin - iters = (1:10,4:6,1:4) - ps = ProductSplit(iters,37,8) - @test extrema_commonlastdim(ps) == ([(9,1),(6,1)],[(2,2),(4,2)]) - ps = ProductSplit(iters,ntasks(iters)+1,ntasks(iters)+1) - @test extrema_commonlastdim(ps) === nothing - end - end + @testset "in" begin + + function checkifpresent(iters,npmax=10) + for np = 1:npmax, p = 1:np + ps = ProductSplit(iters,np,p) + pcol = collect(ps) + + for el in pcol + # It should be contained in this iterator + @test el in ps + for p2 in 1:np + # It should not be contained anywhere else + p2 == p && continue + ps2 = ProductSplit(iters,np,p2) + @test !(el in ps2) + end + end + end + end + + for iters in various_iters + checkifpresent(iters) + end - @testset "in" begin - - function checkifpresent(iters,npmax=10) - for np = 1:npmax, p = 1:np - ps = ProductSplit(iters,np,p) - pcol = collect(ps) - - for el in pcol - # It should be contained in this iterator - @test el in ps - for p2 in 1:np - # It should not be contained anywhere else - p2 == p && continue - ps2 = ProductSplit(iters,np,p2) - @test !(el in ps2) - end - end - end - end + @test ParallelUtilities._infullrange((),()) + end - for iters in various_iters - checkifpresent(iters) - end + @testset "whichproc + procrange_recast" begin + np,proc_id = 5,5 + iters = (1:10,4:6,1:4) + ps = ProductSplit(iters,np,proc_id) + @test whichproc(iters,first(ps),1) == 1 + @test whichproc(iters,(100,100,100),1) === nothing + @test procrange_recast(iters,ps,1) == 1:1 + @test procrange_recast(ps,1) == 1:1 + + smalleriter = (1:1,1:1,1:1) + err = ParallelUtilities.TaskNotPresentError(smalleriter,first(ps)) + @test_throws err procrange_recast(smalleriter,ps,1) + smalleriter = (7:9,4:6,1:4) + err = ParallelUtilities.TaskNotPresentError(smalleriter,last(ps)) + @test_throws err procrange_recast(smalleriter,ps,1) + + iters = (1:1,2:2) + ps = ProductSplit(iters,np,proc_id) + @test whichproc(iters,first(ps),np) === nothing + @test whichproc(iters,nothing,np) === nothing + @test procrange_recast(iters,ps,2) == (0:-1) + @test procrange_recast(ps,2) == (0:-1) + + iters = (1:1,2:2) + ps = ProductSplit(iters,1,1) + @test procrange_recast(iters,ps,2) == 1:1 + @test procrange_recast(ps,2) == 1:1 + + iters = (Base.OneTo(2),2:4) + ps = ProductSplit(iters,2,1) + @test procrange_recast(iters,ps,1) == 1:1 + @test procrange_recast(iters,ps,2) == 1:1 + @test procrange_recast(iters,ps,ntasks(iters)) == 1:length(ps) + + for np_new in 1:5ntasks(iters) + for proc_id_new=1:np_new + ps_new = ProductSplit(iters,np_new,proc_id_new) + + for val in ps_new + # Should loop only if ps_new is non-empty + @test whichproc(iters,val,np_new) == proc_id_new + end + end + procid_new_first = whichproc(iters,first(ps),np_new) + proc_new_last = whichproc(iters,last(ps),np_new) + @test procrange_recast(iters,ps,np_new) == (isempty(ps) ? (0:-1) : (procid_new_first:proc_new_last)) + @test procrange_recast(ps,np_new) == (isempty(ps) ? (0:-1) : (procid_new_first:proc_new_last)) + end - @test ParallelUtilities._infullrange((),()) - end + @testset "different set" begin + iters = (1:100,1:4000) + ps = ProductSplit((20:30,1:1),2,1) + @test procrange_recast(iters,ps,700) == 1:1 + ps = ProductSplit((20:30,1:1),2,2) + @test procrange_recast(iters,ps,700) == 1:1 + + iters = (1:1,2:2) + ps = ProductSplit((20:30,2:2),2,1) + @test_throws ParallelUtilities.TaskNotPresentError procrange_recast(iters,ps,3) + ps = ProductSplit((1:30,2:2),2,1) + @test_throws ParallelUtilities.TaskNotPresentError procrange_recast(iters,ps,3) + end + end - @testset "whichproc + procrange_recast" begin - np,proc_id = 5,5 - iters = (1:10,4:6,1:4) - ps = ProductSplit(iters,np,proc_id) - @test whichproc(iters,first(ps),1) == 1 - @test whichproc(iters,(100,100,100),1) === nothing - @test procrange_recast(iters,ps,1) == 1:1 - @test procrange_recast(ps,1) == 1:1 - - smalleriter = (1:1,1:1,1:1) - err = ParallelUtilities.TaskNotPresentError(smalleriter,first(ps)) - @test_throws err procrange_recast(smalleriter,ps,1) - smalleriter = (7:9,4:6,1:4) - err = ParallelUtilities.TaskNotPresentError(smalleriter,last(ps)) - @test_throws err procrange_recast(smalleriter,ps,1) - - iters = (1:1,2:2) - ps = ProductSplit(iters,np,proc_id) - @test whichproc(iters,first(ps),np) === nothing - @test whichproc(iters,nothing,np) === nothing - @test procrange_recast(iters,ps,2) == (0:-1) - @test procrange_recast(ps,2) == (0:-1) - - iters = (1:1,2:2) - ps = ProductSplit(iters,1,1) - @test procrange_recast(iters,ps,2) == 1:1 - @test procrange_recast(ps,2) == 1:1 - - iters = (Base.OneTo(2),2:4) - ps = ProductSplit(iters,2,1) - @test procrange_recast(iters,ps,1) == 1:1 - @test procrange_recast(iters,ps,2) == 1:1 - @test procrange_recast(iters,ps,ntasks(iters)) == 1:length(ps) - - for np_new in 1:5ntasks(iters) - for proc_id_new=1:np_new - ps_new = ProductSplit(iters,np_new,proc_id_new) - - for val in ps_new - # Should loop only if ps_new is non-empty - @test whichproc(iters,val,np_new) == proc_id_new - end - end - procid_new_first = whichproc(iters,first(ps),np_new) - proc_new_last = whichproc(iters,last(ps),np_new) - @test procrange_recast(iters,ps,np_new) == (isempty(ps) ? (0:-1) : (procid_new_first:proc_new_last)) - @test procrange_recast(ps,np_new) == (isempty(ps) ? (0:-1) : (procid_new_first:proc_new_last)) + @testset "indexinproduct" begin + @test indexinproduct((1:4,2:3:8),(3,5)) == 7 + @test indexinproduct((1:4,2:3:8),(3,6)) === nothing + @test_throws ArgumentError indexinproduct((),()) end - @testset "different set" begin - iters = (1:100,1:4000) - ps = ProductSplit((20:30,1:1),2,1) - @test procrange_recast(iters,ps,700) == 1:1 - ps = ProductSplit((20:30,1:1),2,2) - @test procrange_recast(iters,ps,700) == 1:1 - - iters = (1:1,2:2) - ps = ProductSplit((20:30,2:2),2,1) - @test_throws ParallelUtilities.TaskNotPresentError procrange_recast(iters,ps,3) - ps = ProductSplit((1:30,2:2),2,1) - @test_throws ParallelUtilities.TaskNotPresentError procrange_recast(iters,ps,3) + @testset "localindex" begin + + for iters in various_iters + for np=1:5ntasks(iters),proc_id=1:np + ps = ProductSplit(iters,np,proc_id) + for (ind,val) in enumerate(ps) + @test localindex(ps,val) == ind + @test localindex(iters,val,np,proc_id) == ind + end + if isempty(ps) + @test localindex(ps,first(ps)) === nothing + end + end + end end - end - @testset "localindex" begin - - for iters in various_iters - for np=1:5ntasks(iters),proc_id=1:np - ps = ProductSplit(iters,np,proc_id) - for (ind,val) in enumerate(ps) - @test localindex(ps,val) == ind - @test localindex(iters,val,np,proc_id) == ind - end - if isempty(ps) - @test localindex(ps,first(ps)) === nothing - end - end - end - end + @testset "whichproc_localindex" begin + for iters in various_iters + for np=1:ntasks(iters),proc_id=1:np + ps_col = collect(ProductSplit(iters,np,proc_id)) + ps_col_rev = [reverse(t) for t in ps_col] + for val in ps_col + p,ind = whichproc_localindex(iters,val,np) + @test p == proc_id + ind_in_arr = searchsortedfirst(ps_col_rev,reverse(val)) + @test ind == ind_in_arr + end + end + end + end - @testset "whichproc_localindex" begin - for iters in various_iters - for np=1:ntasks(iters),proc_id=1:np - ps_col = collect(ProductSplit(iters,np,proc_id)) - ps_col_rev = [reverse(t) for t in ps_col] - for val in ps_col - p,ind = whichproc_localindex(iters,val,np) - @test p == proc_id - ind_in_arr = searchsortedfirst(ps_col_rev,reverse(val)) - @test ind == ind_in_arr - end - end - end + @testset "getindex" begin + + @test ParallelUtilities._getindex((),1) == () + @test ParallelUtilities._getindex((),1,2) == () + + @test ParallelUtilities.childindex((),1) == (1,) + + for iters in various_iters + for np=1:ntasks(iters),p=1:np + ps = ProductSplit(iters,np,p) + ps_col = collect(ps) + for i in 1:length(ps) + @test ps[i] == ps_col[i] + end + @test ps[end] == ps[length(ps)] + for ind in [0,length(ps)+1] + @test_throws ParallelUtilities.BoundsError(ps,ind) ps[ind] + end + end + end + end end - - @testset "getindex" begin - - @test ParallelUtilities._getindex((),1) == () - @test ParallelUtilities._getindex((),1,2) == () - - @test ParallelUtilities.childindex((),1) == (1,) - - for iters in various_iters - for np=1:ntasks(iters),p=1:np - ps = ProductSplit(iters,np,p) - ps_col = collect(ps) - for i in 1:length(ps) - @test ps[i] == ps_col[i] - end - @test ps[end] == ps[length(ps)] - for ind in [0,length(ps)+1] - @test_throws ParallelUtilities.BoundsError(ps,ind) ps[ind] + @testsetwithinfo "ProductSection" begin + @testset "Constructor" begin + function testPS(iterators) + itp = collect(Iterators.product(iterators...)) + l = length(itp) + for startind in 1:l, endind in startind:l + ps = ProductSection(iterators, startind:endind) + @test eltype(ps) == Tuple{eltype.(iterators)...} + for (psind,ind) in enumerate(startind:endind) + @test ps[psind] == itp[ind] + end end end + + for iter in various_iters + testPS(iter) + end + + @test_throws ArgumentError ProductSection((),2:3) + @test_throws ArgumentError ProductSection((1:3,),1:0) end end + @testset "dropleading" begin + ps = ProductSplit((1:5,2:4,1:3),7,3); + @test dropleading(ps) isa ProductSection + @test collect(dropleading(ps)) == [(4,1),(2,2),(3,2)] + @test collect(dropleading(dropleading(ps))) == [(1,),(2,)] + end end; @testset "ReverseLexicographicTuple" begin @@ -1385,41 +1422,35 @@ end; end end; -@testset "pmap and reduce" begin - - exceptiontype = RemoteException - if VERSION >= v"1.3" - exceptiontype = CompositeException - end - - @testset "Sorted and Unsorted" begin - @test Sorted() isa Ordering - @test Unsorted() isa Ordering - end; +@testset "map reduce" begin + @testset "Sorted and Unsorted" begin + @test Sorted() isa Ordering + @test Unsorted() isa Ordering + end; - @testset "pval" begin - p = pval(2,3) - @test value(p) == 3 - @test value(3) == 3 - @test value(p) == value(value(p)) + @testset "pval" begin + p = pval(2,3) + @test value(p) == 3 + @test value(3) == 3 + @test value(p) == value(value(p)) @test convert(pval{Any},p) == pval{Any}(2,3) @test convert(pval{Float64},p) == pval{Any}(2,3.0) - end; + end; - @testset "mapTreeNode" begin + @testset "mapTreeNode" begin - @testset "maybepvalput!" begin - pipe = BranchChannel{Int,Int}(myid(),0) + @testset "maybepvalput!" begin + pipe = BranchChannel{Int,Int}(myid(),0) rank = 1 - maybepvalput!(pipe,rank,0) - @test isready(pipe.selfchannels.out) - @test take!(pipe.selfchannels.out) == 0 + maybepvalput!(pipe,rank,0) + @test isready(pipe.selfchannels.out) + @test take!(pipe.selfchannels.out) == 0 - pipe = BranchChannel{pval,pval}(myid(),0) - maybepvalput!(pipe,rank,0) - @test isready(pipe.selfchannels.out) - @test take!(pipe.selfchannels.out) == pval(rank,0) + pipe = BranchChannel{pval,pval}(myid(),0) + maybepvalput!(pipe,rank,0) + @test isready(pipe.selfchannels.out) + @test take!(pipe.selfchannels.out) == pval(rank,0) pipe = BranchChannel{pval{Int},pval{Int}}(myid(),0) maybepvalput!(pipe,rank,0) @@ -1463,54 +1494,54 @@ end; maybepvalput!(pipe,rank,val) @test isready(pipe.selfchannels.out) @test take!(pipe.selfchannels.out) == pval(rank,Int[1]) - end + end - function test_on_pipe(fn,iterator,pipe,result_expected) + function test_on_pipe(fn,iterator,pipe,result_expected) progressrc = nothing rank = 1 - @test_throws ErrorException mapTreeNode(x->error(""),iterator,rank,pipe,progressrc) - @test !isready(pipe.selfchannels.out) # should not have any result as there was an error - @test isready(pipe.selfchannels.err) - @test take!(pipe.selfchannels.err) # error flag should be true - @test !isready(pipe.selfchannels.err) # should not hold anything now - @test !isready(pipe.parentchannels.out) - @test !isready(pipe.parentchannels.err) - @test !isready(pipe.childrenchannels.out) - @test !isready(pipe.childrenchannels.err) - - mapTreeNode(fn,iterator,rank,pipe,progressrc) - @test isready(pipe.selfchannels.err) - @test !take!(pipe.selfchannels.err) # error flag should be false - @test !isready(pipe.selfchannels.err) - @test isready(pipe.selfchannels.out) - @test take!(pipe.selfchannels.out) == result_expected - @test !isready(pipe.selfchannels.out) - @test !isready(pipe.parentchannels.out) - @test !isready(pipe.parentchannels.err) - @test !isready(pipe.childrenchannels.out) - @test !isready(pipe.childrenchannels.err) - end + @test_throws ErrorException mapTreeNode(x->error(""),iterator,rank,pipe,progressrc) + @test !isready(pipe.selfchannels.out) # should not have any result as there was an error + @test isready(pipe.selfchannels.err) + @test take!(pipe.selfchannels.err) # error flag should be true + @test !isready(pipe.selfchannels.err) # should not hold anything now + @test !isready(pipe.parentchannels.out) + @test !isready(pipe.parentchannels.err) + @test !isready(pipe.childrenchannels.out) + @test !isready(pipe.childrenchannels.err) + + mapTreeNode(fn,iterator,rank,pipe,progressrc) + @test isready(pipe.selfchannels.err) + @test !take!(pipe.selfchannels.err) # error flag should be false + @test !isready(pipe.selfchannels.err) + @test isready(pipe.selfchannels.out) + @test take!(pipe.selfchannels.out) == result_expected + @test !isready(pipe.selfchannels.out) + @test !isready(pipe.parentchannels.out) + @test !isready(pipe.parentchannels.err) + @test !isready(pipe.childrenchannels.out) + @test !isready(pipe.childrenchannels.err) + end - @testset "range" begin - iterator = 1:10 - - pipe = BranchChannel{Int,Int}(myid(),0) - test_on_pipe(sum,iterator,pipe,sum(iterator)) - end - - @testset "ProductSplit" begin - iterators = (1:10,) - ps = ProductSplit(iterators,1,1) + @testset "range" begin + iterator = 1:10 + + pipe = BranchChannel{Int,Int}(myid(),0) + test_on_pipe(sum,iterator,pipe,sum(iterator)) + end + + @testset "ProductSplit" begin + iterators = (1:10,) + ps = ProductSplit(iterators,1,1) - pipe = BranchChannel{Int,Int}(myid(),0) - test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) + pipe = BranchChannel{Int,Int}(myid(),0) + test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) - pipe = BranchChannel{Int,Int}(myid(),1) - test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) + pipe = BranchChannel{Int,Int}(myid(),1) + test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) - pipe = BranchChannel{Int,Int}(myid(),2) - test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) - end + pipe = BranchChannel{Int,Int}(myid(),2) + test_on_pipe(x->sum(y[1] for y in x),ps,pipe,sum(iterators[1])) + end @testset "progress" begin @test isnothing(ParallelUtilities.indicatemapprogress!(nothing,1)) @@ -1519,14 +1550,14 @@ end; ParallelUtilities.indicatemapprogress!(progress,10) @test take!(progress) == (true,false,10) end - end; + end; - @testset "reduce" begin + @testset "reduce" begin - # Leaves just push results to the parent - # reduced value at a leaf is simply whatever is stored in the local output channel - @testset "at a leaf" begin - # These do not check for errors + # Leaves just push results to the parent + # reduced value at a leaf is simply whatever is stored in the local output channel + @testset "at a leaf" begin + # These do not check for errors result = 1 rank = 1 val = pval(rank,result) @@ -1538,34 +1569,46 @@ end; pipe = BranchChannel{typeof(result),typeof(result)}(myid(),0) put!(pipe.selfchannels.out,result) @test ParallelUtilities.reducedvalue(sum,rank,pipe,Unsorted()) == result - end; + end; - # # Values are collected at the intermediate nodes - @testset "at parent nodes" begin + # # Values are collected at the intermediate nodes + @testset "at parent nodes" begin - # Put some known values on the self and children channels - function putselfchildren!(pipe::BranchChannel,::Unsorted,rank=1) - if rank >= 1 - put!(pipe.selfchannels.out,0) - put!(pipe.selfchannels.err,false) + # Put some known values on the self and children channels + @everywhere begin + + function putselfchildren!(pipe::BranchChannel,ord::Ordering,rank=1, + args...) + putselfchildren!(pipe,ord, + rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank), + args...) + end + function putselfchildren!(pipe::BranchChannel,::Unsorted,::TopTreeNode) + for i=1:nchildren(pipe) + put!(pipe.childrenchannels.out,i) + put!(pipe.childrenchannels.err,false) end - for i=1:nchildren(pipe) - put!(pipe.childrenchannels.out,i) - put!(pipe.childrenchannels.err,false) - end - end - function putselfchildren!(pipe::BranchChannel{<:pval},::Sorted, - selfrank=2,leftchildrank=1,rightchildrank=3) - if selfrank >= 1 - put!(pipe.selfchannels.out,pval(selfrank,2)) - put!(pipe.selfchannels.err,false) + end + function putselfchildren!(pipe::BranchChannel,::Unsorted,::SubTreeNode) + put!(pipe.selfchannels.out,0) + put!(pipe.selfchannels.err,false) + for i=1:nchildren(pipe) + put!(pipe.childrenchannels.out,i) + put!(pipe.childrenchannels.err,false) end + end + function putselfchildren!(pipe::BranchChannel{<:pval},::Sorted, + node::SubTreeNode,leftchildrank=1,rightchildrank=3) + + selfrank = node.rank + put!(pipe.selfchannels.out,pval(selfrank,2)) + put!(pipe.selfchannels.err,false) N = nchildren(pipe) - + if N > 0 # left child - put!(pipe.childrenchannels.out,pval(leftchildrank,1)) - put!(pipe.childrenchannels.err,false) + put!(pipe.childrenchannels.out,pval(leftchildrank,1)) + put!(pipe.childrenchannels.err,false) end if N > 1 @@ -1573,44 +1616,71 @@ end; put!(pipe.childrenchannels.out,pval(rightchildrank,3)) put!(pipe.childrenchannels.err,false) end - end + end + function putselfchildren!(pipe::BranchChannel{<:pval},::Sorted, + node::TopTreeNode,leftchildrank=1,rightchildrank=3) - function clearerrors!(pipe::BranchChannel,rank=1) - if rank >= 1 - take!(pipe.selfchannels.err) + selfrank = node.rank + N = nchildren(pipe) + + if N > 0 + # left child + put!(pipe.childrenchannels.out,pval(leftchildrank,1)) + put!(pipe.childrenchannels.err,false) end - for i=1:nchildren(pipe) - take!(pipe.childrenchannels.err) - end - end - @testset "reducedvalue" begin - - function testreduction(freduce::Function,pipe::BranchChannel, - ifsorted::Unsorted,res_exp,rank=2) - - p = pipe.p - - try - putselfchildren!(pipe,ifsorted,rank) - @test value(reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp - clearerrors!(pipe,rank) - - @fetchfrom p putselfchildren!(pipe,ifsorted,rank) - @test value(@fetchfrom p reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp - clearerrors!(pipe,rank) - - @fetchfrom p putselfchildren!(pipe,ifsorted,rank) - @test value(reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp - clearerrors!(pipe,rank) - - putselfchildren!(pipe,ifsorted,rank) - @test value(@fetchfrom p reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp - clearerrors!(pipe,rank) - catch - rethrow() - end - end + if N > 1 + # right child + put!(pipe.childrenchannels.out,pval(rightchildrank,3)) + put!(pipe.childrenchannels.err,false) + end + end + + function clearerrors!(pipe::BranchChannel,rank=1) + clearerrors!(pipe, + rank > 0 ? SubTreeNode(rank) : TopTreeNode(rank)) + end + function clearerrors!(pipe::BranchChannel,node::SubTreeNode) + take!(pipe.selfchannels.err) + for i=1:nchildren(pipe) + take!(pipe.childrenchannels.err) + end + end + function clearerrors!(pipe::BranchChannel,node::TopTreeNode) + for i=1:nchildren(pipe) + take!(pipe.childrenchannels.err) + end + end + + end + + @testset "reducedvalue" begin + + function testreduction(freduce::Function,pipe::BranchChannel, + ifsorted::Unsorted,res_exp,rank=2) + + p = pipe.p + + try + putselfchildren!(pipe,ifsorted,rank) + @test value(reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp + clearerrors!(pipe,rank) + + @fetchfrom p putselfchildren!(pipe,ifsorted,rank) + @test value(@fetchfrom p reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp + clearerrors!(pipe,rank) + + @fetchfrom p putselfchildren!(pipe,ifsorted,rank) + @test value(reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp + clearerrors!(pipe,rank) + + putselfchildren!(pipe,ifsorted,rank) + @test value(@fetchfrom p reducedvalue(freduce,rank,pipe,ifsorted)) == res_exp + clearerrors!(pipe,rank) + catch + rethrow() + end + end function testreduction(freduce::Function,pipe::BranchChannel, ifsorted::Sorted,res_exp, @@ -1640,107 +1710,166 @@ end; end end - for n = 1:2 - @testset "Unsorted" begin - pipe = BranchChannel{Int,Int}(myid(),n) - res_exp = sum(0:n) - testreduction(sum,pipe,Unsorted(),res_exp,2) + for n = 1:2 + @testset "Unsorted" begin + pipe = BranchChannel{Int,Int}(myid(),n) + res_exp = sum(0:n) + testreduction(sum,pipe,Unsorted(),res_exp,2) + testreduction(sum,pipe,Unsorted(),res_exp,SubTreeNode(2)) @testset "toptree" begin testreduction(sum,pipe,Unsorted(),res_exp,0) + testreduction(sum,pipe,Unsorted(),res_exp,TopTreeNode(0)) end - end - @testset "Sorted" begin - pipe = BranchChannel{pval,pval}(myid(),n) - res_exp = collect(1:n+1) - testreduction(x->vcat(x...),pipe,Sorted(),res_exp) - - pipe = BranchChannel{pval,pval}(myid(),n) - res_exp = sum(1:n+1) - testreduction(sum,pipe,Sorted(),res_exp) + end + @testset "Sorted" begin + pipe = BranchChannel{pval,pval}(myid(),n) + res_exp = collect(1:n+1) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,SubTreeNode(2)) + + pipe = BranchChannel{pval,pval}(myid(),n) + res_exp = sum(1:n+1) + testreduction(sum,pipe,Sorted(),res_exp) + testreduction(sum,pipe,Sorted(),res_exp,SubTreeNode(2)) @testset "toptree" begin pipe = BranchChannel{pval,pval}(myid(),n) res_exp = n == 1 ? [1] : [1,3] testreduction(x->vcat(x...),pipe,Sorted(),res_exp,0,1,2) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,TopTreeNode(0),1,2) pipe = BranchChannel{pval,pval}(myid(),n) res_exp = n == 1 ? 1 : 1+3 testreduction(sum,pipe,Sorted(),res_exp,0,1,2) + testreduction(sum,pipe,Sorted(),res_exp,TopTreeNode(0),1,2) end - end - end - end + end + end + end - @testset "reduceTreeNode" begin + @testset "reduceTreeNode" begin - function testreduction(freduce::Function,pipe::BranchChannel, - ifsorted::Ordering,res_exp) + @everywhere begin + + function testfinalized(rank,pipe) + rank > 0 ? + testfinalized(SubTreeNode(rank),pipe) : + testfinalized(TopTreeNode(rank),pipe) + end - @test !isready(pipe.parentchannels.out) - @test !isready(pipe.parentchannels.err) + function testfinalized(::SubTreeNode,pipe) + @test pipe.selfchannels.out.where == 0 + @test pipe.selfchannels.err.where == 0 + @test pipe.childrenchannels.out.where == 0 + @test pipe.childrenchannels.err.where == 0 + end + function testfinalized(::TopTreeNode,pipe) + @test pipe.childrenchannels.out.where == 0 + @test pipe.childrenchannels.err.where == 0 + end + + function testreduction(freduce::Function,pipe::BranchChannel, + ifsorted::Ordering,res_exp,rank,args...) + + @test !isready(pipe.parentchannels.out) + @test !isready(pipe.parentchannels.err) progressrc = nothing - rank = 2 - - try - wait(@spawnat pipe.p putselfchildren!(pipe,ifsorted)) - reduceTreeNode(freduce,rank,pipe,ifsorted,progressrc) - catch - rethrow() - end - @test isready(pipe.parentchannels.out) - @test isready(pipe.parentchannels.err) - @test !take!(pipe.parentchannels.err) # there should be no error - @test value(take!(pipe.parentchannels.out)) == res_exp - - # The pipe should be finalized at this point - @test pipe.selfchannels.out.where == 0 - @test pipe.selfchannels.err.where == 0 - @test pipe.childrenchannels.out.where == 0 - @test pipe.childrenchannels.err.where == 0 - end - - for nchildren = 1:2 - @testset "Unsorted" begin - pipe = BranchChannel{Int,Int}(myid(),nchildren) - res_exp = sum(0:nchildren) - testreduction(sum,pipe,Unsorted(),res_exp) - - rc_parent = RemoteChannelContainer{Int}(1) - p = workers()[1] - pipe = BranchChannel(p,Int,rc_parent,nchildren) - testreduction(sum,pipe,Unsorted(),res_exp) - end - @testset "Sorted" begin - pipe = BranchChannel{pval,pval}(myid(),nchildren) - res_exp = collect(1:nchildren+1) - testreduction(x->vcat(x...),pipe,Sorted(),res_exp) - - rc_parent = RemoteChannelContainer{pval}(myid(),1) - p = workers()[1] - pipe = BranchChannel(p,pval,rc_parent,nchildren) - testreduction(x->vcat(x...),pipe,Sorted(),res_exp) - - pipe = BranchChannel{pval,pval}(myid(),nchildren) - res_exp = sum(1:nchildren+1) - testreduction(sum,pipe,Sorted(),res_exp) - rc_parent = RemoteChannelContainer{pval}(1) - p = workers()[1] - pipe = BranchChannel(p,pval,rc_parent,nchildren) - testreduction(sum,pipe,Sorted(),res_exp) - end - end + try + wait(@spawnat(pipe.p, + putselfchildren!(pipe,ifsorted,rank,args...) ) ) + reduceTreeNode(freduce,rank,pipe,ifsorted,progressrc) + catch + rethrow() + end + @test isready(pipe.parentchannels.out) + @test isready(pipe.parentchannels.err) + @test !take!(pipe.parentchannels.err) # there should be no error + @test value(take!(pipe.parentchannels.out)) == res_exp + + # The pipe should be finalized at this point + testfinalized(rank,pipe) + end + + end + + for n = 1:2 + @testset "Unsorted" begin + pipe = BranchChannel{Int,Int}(myid(),n) + res_exp = sum(0:n) + testreduction(sum,pipe,Unsorted(),res_exp,2) + + pipe = BranchChannel{Int,Int}(myid(),n) + testreduction(sum,pipe,Unsorted(),res_exp,SubTreeNode(2)) + + pipe = BranchChannel{Int,Int}(myid(),n) + testreduction(sum,pipe,Unsorted(),res_exp,TopTreeNode(0)) + + rc_parent = RemoteChannelContainer{Int}(1) + p = workers()[1] + + pipe = BranchChannel(p,Int,rc_parent,n) + testreduction(sum,pipe,Unsorted(),res_exp,2) + + pipe = BranchChannel(p,Int,rc_parent,n) + testreduction(sum,pipe,Unsorted(),res_exp,SubTreeNode(2)) + end + @testset "Sorted" begin + @testset "SubTreeNode" begin + res_exp = collect(1:n+1) + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,2) + + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,SubTreeNode(2)) + + rc_parent = RemoteChannelContainer{pval}(myid(),1) + p = workers()[1] + + pipe = BranchChannel(p,pval,rc_parent,n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,2) + + pipe = BranchChannel(p,pval,rc_parent,n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,SubTreeNode(2)) + + res_exp = sum(1:n+1) + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(sum,pipe,Sorted(),res_exp,2) + + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(sum,pipe,Sorted(),res_exp,SubTreeNode(2)) + + rc_parent = RemoteChannelContainer{pval}(1) + p = workers()[1] + + pipe = BranchChannel(p,pval,rc_parent,n) + testreduction(sum,pipe,Sorted(),res_exp,2) + + pipe = BranchChannel(p,pval,rc_parent,n) + testreduction(sum,pipe,Sorted(),res_exp,SubTreeNode(2)) + end + + @testset "TopTreeNode" begin + res_exp = n == 1 ? [1] : [1,3] + + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,0) + + pipe = BranchChannel{pval,pval}(myid(),n) + testreduction(x->vcat(x...),pipe,Sorted(),res_exp,TopTreeNode(0)) + end + end + end # The top tree must have children by definition pipe = BranchChannel{Int,Int}(myid(),0) putselfchildren!(pipe,Unsorted(),0) - err = ErrorException("nodes with rank <=0 must have children") - @test_throws err reducedvalue(sum,0,pipe,Unsorted()) + @test_throws ErrorException reducedvalue(sum,0,pipe,Unsorted()) clearerrors!(pipe,0) - end - end; + end + end; @testset "progress" begin @test isnothing(ParallelUtilities.indicatereduceprogress!(nothing,1)) @@ -1753,7 +1882,15 @@ end; ParallelUtilities.indicatefailure!(progress,10) @test take!(progress) == (false,false,10) end - end; + end; +end + +@testset "pmapbatch and pmapreduce" begin + + exceptiontype = RemoteException + if VERSION >= v"1.3" + exceptiontype = CompositeException + end @testsetwithinfo "pmapbatch" begin @testsetwithinfo "batch" begin