Skip to content

Commit

Permalink
Add nelements
Browse files Browse the repository at this point in the history
  • Loading branch information
jishnub committed Jun 7, 2020
1 parent 271dd7b commit 3d323af
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ParallelUtilities"
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
authors = ["Jishnu Bhattacharya <jishnuonline@gmail.com>"]
version = "0.7.3"
version = "0.7.4"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
85 changes: 78 additions & 7 deletions src/productsplit.jl
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,73 @@ function _checknorollover(t,firstindchild,lastindchild)
end
_checknorollover(::Tuple{},::Tuple{},::Tuple{}) = true

function _nrollovers(ps::AbstractConstrainedProduct{<:Any,N},dim::Integer) where {N}
dim == N && return 0
nelements(ps,dim+1) - 1
end

"""
nelements(ps::AbstractConstrainedProduct; dim::Integer)
Compute the number of unique values in the element number `dim` of the tuples
that are returned when `ps` is iterated over.
# Examples
```jldoctest
julia> ps = ProductSplit((1:5,2:4,1:3),7,3);
julia> collect(ps)
7-element Array{Tuple{Int64,Int64,Int64},1}:
(5, 4, 1)
(1, 2, 2)
(2, 2, 2)
(3, 2, 2)
(4, 2, 2)
(5, 2, 2)
(1, 3, 2)
julia> ParallelUtilities.nelements(ps,3)
2
julia> ParallelUtilities.nelements(ps,2)
3
julia> ParallelUtilities.nelements(ps,1)
5
```
"""
nelements(ps::AbstractConstrainedProduct; dim::Integer) = nelements(ps,dim)
function nelements(ps::AbstractConstrainedProduct{<:Any,N},dim::Integer) where {N}
1 <= dim <= N || throw(ArgumentError("1 ⩽ dim ⩽ N=$N not satisfied for dim=$dim"))

iter = ps.iterators[dim]

if _nrollovers(ps,dim) == 0
st = first(ps)[dim]
en = last(ps)[dim]
stind = searchsortedfirst(iter,st)
enind = searchsortedfirst(iter,en)
nel = length(stind:enind)
elseif _nrollovers(ps,dim) > 1
nel = length(iter)
else
st = first(ps)[dim]
en = last(ps)[dim]
stind = searchsortedfirst(iter,st)
enind = searchsortedfirst(iter,en)
if stind > enind
# some elements are missed out
nel = length(stind:length(iter)) + length(1:enind)
else
nel = length(iter)
end
end
return nel
end


"""
maximum(ps::ProductSplit; dim::Integer)
maximum(ps::AbstractConstrainedProduct; dim::Integer)
Compute the maximum value of the range number `dim` that is
contained in `ps`.
Expand All @@ -355,7 +420,7 @@ julia> maximum(ps,dim=2)
4
```
"""
function Base.maximum(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N}
function Base.maximum(ps::AbstractConstrainedProduct{<:Any,N},dim::Integer) where {N}

@boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim))

Expand Down Expand Up @@ -390,7 +455,7 @@ function Base.maximum(ps::AbstractConstrainedProduct{<:Any,1})
end

"""
minimum(ps::ProductSplit; dim::Integer)
minimum(ps::AbstractConstrainedProduct; dim::Integer)
Compute the minimum value of the range number `dim` that is
contained in `ps`.
Expand All @@ -411,7 +476,7 @@ julia> minimum(ps,dim=2)
4
```
"""
function Base.minimum(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N}
function Base.minimum(ps::AbstractConstrainedProduct{<:Any,N},dim::Integer) where {N}

@boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim))

Expand Down Expand Up @@ -446,7 +511,7 @@ function Base.minimum(ps::AbstractConstrainedProduct{<:Any,1})
end

"""
extrema(ps::ProductSplit; dim::Integer)
extrema(ps::AbstractConstrainedProduct; dim::Integer)
Compute the minimum and maximum of the range number `dim` that is
contained in `ps`.
Expand All @@ -467,7 +532,7 @@ julia> extrema(ps,dim=2)
(4, 4)
```
"""
function Base.extrema(ps::AbstractConstrainedProduct{<:Any,N};dim::Integer) where {N}
function Base.extrema(ps::AbstractConstrainedProduct{<:Any,N},dim::Integer) where {N}

@boundscheck (1 <= dim <= N) || throw(BoundsError(ps.iterators,dim))

Expand Down Expand Up @@ -503,6 +568,12 @@ function Base.extrema(ps::AbstractConstrainedProduct{<:Any,1})
(iter[fic_dim],iter[lic_dim])
end

for f in [:maximum,:minimum,:extrema]
@eval function Base.$f(ps::AbstractConstrainedProduct;dim::Integer)
$f(ps,dim)
end
end

"""
extremadims(ps::AbstractConstrainedProduct)
Expand Down Expand Up @@ -693,7 +764,7 @@ function whichproc(iterators, val, np::Integer)
# If np is greater than the number of ntasks then it's possible
# that ps is empty. In this case the value must be somewhere in
# the previous workers. Otherwise each worker has some tasks and
# these are sorted, so carry out a binary seaarch
# these are sorted, so carry out a binary search

if isempty(ps) || val_t < ReverseLexicographicTuple(first(ps))
right = mid - 1
Expand Down
107 changes: 92 additions & 15 deletions test/tests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
parentnoderank, nchildren,
maybepvalput!, createbranchchannels, nworkersactive, workersactive,
procs_node, leafrankfoldedtree,
TopTreeNode, SubTreeNode, ProductSection, indexinproduct, dropleading
TopTreeNode, SubTreeNode, ProductSection, indexinproduct, dropleading,
nelements
end

macro testsetwithinfo(str,ex)
Expand Down Expand Up @@ -414,7 +415,43 @@ end
@test dropleading(ps) isa ProductSection
@test collect(dropleading(ps)) == [(4,1),(2,2),(3,2)]
@test collect(dropleading(dropleading(ps))) == [(1,),(2,)]

ps = ProductSection((1:5,2:4,1:3),5:8);
@test dropleading(ps) isa ProductSection
@test collect(dropleading(ps)) == [(2,1),(3,1)]
@test collect(dropleading(dropleading(ps))) == [(1,)]
end
@testset "nelements" begin
ps = ProductSplit((1:5,2:4,1:3),7,3);
@test nelements(ps,dim=1) == 5
@test nelements(ps,dim=2) == 3
@test nelements(ps,dim=3) == 2
@test_throws ArgumentError nelements(ps,dim=0)
@test_throws ArgumentError nelements(ps,dim=4)

ps = ProductSection((1:5,2:4,1:3),5:8);
@test nelements(ps,1) == 4
@test nelements(ps,2) == 2
@test nelements(ps,3) == 1

ps = ProductSection((1:5,2:4,1:3),5:11);
@test nelements(ps,1) == 5
@test nelements(ps,2) == 3
@test nelements(ps,3) == 1

ps = ProductSection((1:5,2:4,1:3),4:8);
@test nelements(ps,1) == 5
@test nelements(ps,2) == 2
@test nelements(ps,3) == 1

ps = ProductSection((1:5,2:4,1:3),4:9);
@test nelements(ps,1) == 5
@test nelements(ps,2) == 2
@test nelements(ps,3) == 1
end

@test ParallelUtilities._checknorollover((),(),())
@test ParallelUtilities.c2l_rec(3,1,(),()) == 3
end;

@testset "ReverseLexicographicTuple" begin
Expand Down Expand Up @@ -856,6 +893,13 @@ end;
@test_throws BoundsError(tree,16) nchildren(tree,16)
end;
end;

@testset "fulltree-toptree indexing" begin
procs = 1:5
tree = SequentialBinaryTree(procs)
@test ParallelUtilities.toptree_to_fulltree_index(tree,3) == 3
@test ParallelUtilities.fulltree_to_toptree_index(tree,3) == 3
end
end

@testsetwithinfo "SegmentedOrderedBinaryTree" begin
Expand Down Expand Up @@ -1061,6 +1105,11 @@ end;
end;
end;
end

@testsetwithinfo "unsegmentedtree" begin
@test ParallelUtilities.unsegmentedtree(SegmentedSequentialBinaryTree) == SequentialBinaryTree
@test ParallelUtilities.unsegmentedtree(SegmentedOrderedBinaryTree) == OrderedBinaryTree
end
end

@testsetwithinfo "RemoteChannelContainer" begin
Expand Down Expand Up @@ -1496,10 +1545,9 @@ end;
@test take!(pipe.selfchannels.out) == pval(rank,Int[1])
end

function test_on_pipe(fn,iterator,pipe,result_expected)
progressrc = nothing
function test_on_pipe(fn,iterator,pipe,result_expected,progressrc=nothing)
rank = 1
@test_throws ErrorException mapTreeNode(x->error(""),iterator,rank,pipe,progressrc)
@test_throws ErrorException mapTreeNode(x->error("fmap"),iterator,rank,pipe,progressrc)
@test !isready(pipe.selfchannels.out) # should not have any result as there was an error
@test isready(pipe.selfchannels.err)
@test take!(pipe.selfchannels.err) # error flag should be true
Expand All @@ -1508,6 +1556,10 @@ end;
@test !isready(pipe.parentchannels.err)
@test !isready(pipe.childrenchannels.out)
@test !isready(pipe.childrenchannels.err)
if progressrc isa RemoteChannel
@test isready(progressrc)
@test take!(progressrc) == (false,false,rank)
end

mapTreeNode(fn,iterator,rank,pipe,progressrc)
@test isready(pipe.selfchannels.err)
Expand All @@ -1520,13 +1572,21 @@ end;
@test !isready(pipe.parentchannels.err)
@test !isready(pipe.childrenchannels.out)
@test !isready(pipe.childrenchannels.err)
if progressrc isa RemoteChannel
@test isready(progressrc)
@test take!(progressrc) == (true,false,rank)
end
end

@testset "range" begin
iterator = 1:10

pipe = BranchChannel{Int,Int}(myid(),0)
test_on_pipe(sum,iterator,pipe,sum(iterator))

pipe = BranchChannel{Int,Int}(myid(),0)
progress = RemoteChannel(()->Channel{Tuple{Bool,Bool,Int}}(1))
test_on_pipe(sum,iterator,pipe,sum(iterator),progress)
end

@testset "ProductSplit" begin
Expand Down Expand Up @@ -1652,7 +1712,7 @@ end;
end
end

end
end # everwhere

@testset "reducedvalue" begin

Expand Down Expand Up @@ -1769,31 +1829,35 @@ end;
@test pipe.childrenchannels.err.where == 0
end

strippedrank(t::ParallelUtilities.ReductionNode) = t.rank
strippedrank(t::Integer) = t

function testreduction(freduce::Function,pipe::BranchChannel,
ifsorted::Ordering,res_exp,rank,args...)
ifsorted::Ordering,res_exp,rank,
progressrc=nothing,args...)

@test !isready(pipe.parentchannels.out)
@test !isready(pipe.parentchannels.err)

progressrc = nothing
wait(@spawnat(pipe.p,
putselfchildren!(pipe,ifsorted,rank,args...) ) )
reduceTreeNode(freduce,rank,pipe,ifsorted,progressrc)

try
wait(@spawnat(pipe.p,
putselfchildren!(pipe,ifsorted,rank,args...) ) )
reduceTreeNode(freduce,rank,pipe,ifsorted,progressrc)
catch
rethrow()
end
@test isready(pipe.parentchannels.out)
@test isready(pipe.parentchannels.err)
@test !take!(pipe.parentchannels.err) # there should be no error
@test value(take!(pipe.parentchannels.out)) == res_exp

if progressrc isa RemoteChannel
@test isready(progressrc)
@test take!(progressrc) == (false,true,strippedrank(rank))
end

# The pipe should be finalized at this point
testfinalized(rank,pipe)
end

end
end # everywhere

for n = 1:2
@testset "Unsorted" begin
Expand All @@ -1807,6 +1871,19 @@ end;
pipe = BranchChannel{Int,Int}(myid(),n)
testreduction(sum,pipe,Unsorted(),res_exp,TopTreeNode(0))

pipe = BranchChannel{Int,Int}(myid(),n)
progress = RemoteChannel(()->Channel{Tuple{Bool,Bool,Int}}(1))
@test_throws ErrorException testreduction(
x->error("fred"),pipe,Unsorted(),
res_exp,TopTreeNode(0),progress)
@test isready(progress)
@test take!(progress) == (false,false,0)

pipe = BranchChannel{Int,Int}(myid(),n)
progress = RemoteChannel(()->Channel{Tuple{Bool,Bool,Int}}(1))
testreduction(sum,pipe,Unsorted(),
res_exp,TopTreeNode(0),progress)

rc_parent = RemoteChannelContainer{Int}(1)
p = workers()[1]

Expand Down

2 comments on commit 3d323af

@jishnub
Copy link
Owner Author

@jishnub jishnub commented on 3d323af Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/16029

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.7.4 -m "<description of version>" 3d323af69bf4c9bb3ad71a6838482a5ff50fa1c5
git push origin v0.7.4

Please sign in to comment.