diff --git a/src/ThreadsX.jl b/src/ThreadsX.jl index c07baddc..99408c86 100644 --- a/src/ThreadsX.jl +++ b/src/ThreadsX.jl @@ -67,6 +67,13 @@ else @eval const $(Symbol("@spawn")) = $(Symbol("@async")) end +if isdefined(Transducers, :foldxt) + using Transducers: foldxt +else + foldxt(rf, xf, xs; kw...) = reduce(rf, xf, xs; kw...) + foldxt(rf, xs; kw...) = reduce(rf, Map(identity), xs; kw...) +end + include("utils.jl") include("basesizes.jl") include("reduce.jl") diff --git a/src/foreach.jl b/src/foreach.jl index 5c2ba6b0..cdca7a15 100644 --- a/src/foreach.jl +++ b/src/foreach.jl @@ -115,7 +115,7 @@ ThreadsX.foreach(f, array::AbstractArray, arrays::AbstractArray; kw...) = @inline return_nothing(_...) = nothing -ThreadsX.foreach(f::F, itr; kw...) where {F} = reduce( +ThreadsX.foreach(f::F, itr; kw...) where {F} = foldxt( return_nothing, Map(f), itr; @@ -124,7 +124,7 @@ ThreadsX.foreach(f::F, itr; kw...) where {F} = reduce( kw..., ) -ThreadsX.foreach(f::F, itr, itrs...; kw...) where {F} = reduce( +ThreadsX.foreach(f::F, itr, itrs...; kw...) where {F} = foldxt( return_nothing, MapSplat(f), zip(itr, itrs...); diff --git a/src/reduce.jl b/src/reduce.jl index 7344fb0f..f8bf5759 100644 --- a/src/reduce.jl +++ b/src/reduce.jl @@ -2,7 +2,7 @@ without_basesize(; basesize = nothing, kw...) = kw function ThreadsX.reduce(op, itr; kw...) xf, reducible = extract_transducer(itr) - result = reduce( + result = foldxt( op, xf, reducible; @@ -16,7 +16,7 @@ end function ThreadsX.mapreduce(f, op, itr; kw...) xf, reducible = extract_transducer(itr) - result = reduce( + result = foldxt( op, opcompose(xf, Map(f)), reducible; @@ -33,7 +33,7 @@ function ThreadsX.mapreduce(f, op, itr, itrs...; kw...) # `Base` just does `reduce(op, map(f, ...))`: return mapreduce(f, op, itr, itrs...; without_basesize(; kw...)...) end - return reduce( + return foldxt( op, MapSplat(f), zip(itr, itrs...); @@ -64,7 +64,7 @@ asbool(f) = x -> f(x)::Bool # TODO: `any` and `all` should be done with "unordered" version ThreadsX.any(itr; kw...) = ThreadsX.any(identity, itr; kw...) -ThreadsX.any(f, itr; kw...) = reduce( +ThreadsX.any(f, itr; kw...) = foldxt( right, # no need to use `|` opcompose(Map(asbool(f)), ReduceIf(identity)), simd = Val(true), @@ -75,7 +75,7 @@ ThreadsX.any(f, itr; kw...) = reduce( ) ThreadsX.all(itr; kw...) = ThreadsX.all(identity, itr; kw...) -ThreadsX.all(f, itr; kw...) = reduce( +ThreadsX.all(f, itr; kw...) = foldxt( right, # no need to use `&` opcompose(Map(asbool(f)), ReduceIf(!)), itr; @@ -86,7 +86,7 @@ ThreadsX.all(f, itr; kw...) = reduce( ) ThreadsX.findfirst(itr; kw...) = ThreadsX.findfirst(identity, itr; kw...) -ThreadsX.findfirst(f, array::AbstractArray; kw...) = reduce( +ThreadsX.findfirst(f, array::AbstractArray; kw...) = foldxt( right, ReduceIf(i -> f(@inbounds array[i])), keys(array); @@ -99,7 +99,7 @@ ThreadsX.findfirst(f, array::AbstractArray; kw...) = reduce( ThreadsX.findlast(itr; kw...) = ThreadsX.findlast(identity, itr; kw...) function ThreadsX.findlast(f, array::AbstractArray; kw...) idx = keys(array) - return reduce( + return foldxt( right, opcompose(Map(i -> (@inbounds idx[i])), ReduceIf(i -> f(@inbounds array[i]))), lastindex(idx):-1:firstindex(idx); @@ -125,7 +125,7 @@ end _minmax((min0, max0), (min1, max1)) = (min(min0, min1), max(max0, max1)) ThreadsX.extrema(itr; kw...) = ThreadsX.extrema(identity, itr; kw...) -ThreadsX.extrema(f, itr; kw...) = reduce( +ThreadsX.extrema(f, itr; kw...) = foldxt( asmonoid(_minmax), Map(x -> (y = f(x); (y, y))), itr; @@ -189,7 +189,7 @@ function ThreadsX.unique(f::F, itr::AbstractVector{X}; kw...) where {F,X} # Using inference as an optimization. The result of this inference # does not affect the result: Y = Core.Compiler.return_type(f, Tuple{X}) - ys, = reduce( + ys, = foldxt( PushUnique(f), Map(identity), itr;