diff --git a/docs/src/man/byrow.md b/docs/src/man/byrow.md index 79d5218e..42a79eec 100644 --- a/docs/src/man/byrow.md +++ b/docs/src/man/byrow.md @@ -44,24 +44,27 @@ In the above benchmark, `byrow` should be even more performant when the data set Generally, `byrow` is efficient for any `fun` which returns a single value for each row, however, it is fine tuned for the following functions: -* `all` -* `any` -* `argmax` -* `argmin` -* `coalesce` -* `count` -* `hash` -* `isequal` -* `maximum` -* `mean` -* `minimum` -* `nunique` -* `prod` -* `std` -* `sum` -* `var` - -The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, and `isequal` is: +* `all` : Test whether all elements of a boolean collection are `true` +* `any` : Test whether any elements of a boolean collection are `true` +* `argmax` : Return the column name of the maximum element +* `argmin` : Return the column name of the minimum element +* `coalesce` : Return the first value which is not equal to `missing` +* `count` : Count the number of `trues` +* `findfirst` : Return the column name of the first true value +* `findlast` : Return the column name of the last true value +* `hash` : Compute an integer hash code +* `isequal` : Return `true` when all values are equal +* `issorted` : Check if the values are sorted +* `maximum` : Return the maximum value +* `mean` : Compute the mean value +* `minimum` : Return the minimum value +* `nunique` : Return the number of unique values +* `prod` : Return the product of values +* `std` : Compute the standard deviation of values +* `sum` : Return the sum of values +* `var` : Compute the variance of values + +The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, `isequal`, and `issorted` is: `byrow(ds, fun, cols; [by , threads = true])` @@ -69,7 +72,8 @@ The `by` keyword argument is for specifying a function to call on each value bef The `nunique` function doesn't accept `threads` argument, however, it has an extra keyword argument `count_missing`. `nunique` counts the number of unique values of each row, and `count_missing = true` counts missings as a unique value. -The `coalesce` and `isequal` functions don't accept `by` argument. +The `coalesce`, `isequal`, and `issorted` functions don't accept `by` argument, however, `issorted` accepts extra keyword argument `rev` which is set to `false` by default. + ### Examples diff --git a/src/byrow/byrow.jl b/src/byrow/byrow.jl index b1648b22..915f4dae 100644 --- a/src/byrow/byrow.jl +++ b/src/byrow/byrow.jl @@ -44,6 +44,11 @@ function byrow(ds::AbstractDataset, ::typeof(all), cols::MultiColumnIndex = :; b end byrow(ds::AbstractDataset, ::typeof(all), col::ColumnIndex; by = x->isequal(true, x), threads = nrow(ds)>1000) = byrow(ds, all, [col]; by = by, threads = threads) +byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = row_isequal(ds, cols, threads = threads) +byrow(ds::AbstractDataset, ::typeof(findfirst), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findfirst(ds, by, cols; threads = threads) +byrow(ds::AbstractDataset, ::typeof(findlast), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findlast(ds, by, cols; threads = threads) + + byrow(ds::AbstractDataset, ::typeof(coalesce), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_coalesce(ds, cols) : row_coalesce(ds, cols) byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_isequal(ds, cols) : row_isequal(ds, cols) @@ -103,6 +108,8 @@ byrow(ds::AbstractDataset, ::typeof(sort), col::ColumnIndex; threads = true, kwa byrow(ds::AbstractDataset, ::typeof(sort!), cols::MultiColumnIndex = names(ds, Union{Missing, Number}); threads = true, kwargs...) = threads ? hp_row_sort!(ds, cols; kwargs...) : row_sort!(ds, cols; kwargs...) byrow(ds::AbstractDataset, ::typeof(sort!), col::ColumnIndex; threads = true, kwargs...) = byrow(ds, sort!, [col]; threads = threads, kwargs...) +byrow(ds::AbstractDataset, ::typeof(issorted), cols::MultiColumnIndex; threads = nrow(ds)>1000, rev = false) = threads ? hp_row_issorted(ds, cols; rev = rev) : row_issorted(ds, cols; rev = rev) + byrow(ds::AbstractDataset, ::typeof(stdze), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze(ds, cols) byrow(ds::AbstractDataset, ::typeof(stdze!), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze!(ds, cols) diff --git a/src/byrow/hp_row_functions.jl b/src/byrow/hp_row_functions.jl index 16b76c09..84145e3a 100644 --- a/src/byrow/hp_row_functions.jl +++ b/src/byrow/hp_row_functions.jl @@ -257,6 +257,30 @@ function hp_row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number dscopy end +function hp_op_for_issorted!(x, y, res) + Threads.@threads for i in 1:length(x) + res[i] &= !isless(y[i], x[i]) + end + y +end +function hp_op_for_issorted_rev!(x, y, res) + Threads.@threads for i in 1:length(x) + res[i] &= !isless(x[i], y[i]) + end + y +end + +function hp_row_issorted(ds::AbstractDataset, cols; rev = false) + colsidx = index(ds)[cols] + init0 = ones(Bool, nrow(ds)) + if rev + mapreduce(identity, (x, y)->hp_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx)) + else + mapreduce(identity, (x, y)->hp_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx)) + end + init0 +end + function hp_row_generic(ds::AbstractDataset, f::Function, cols::MultiColumnIndex) colsidx = index(ds)[cols] diff --git a/src/byrow/row_functions.jl b/src/byrow/row_functions.jl index 4c6d7b86..89dcee80 100644 --- a/src/byrow/row_functions.jl +++ b/src/byrow/row_functions.jl @@ -69,6 +69,96 @@ function row_all(ds::AbstractDataset, f::Function, cols = :) end row_all(ds::AbstractDataset, cols = :) = row_all(ds, isequal(true), cols) +function _op_for_isequal!(x,y, x1) + x .&= isequal.(y, x1) + x +end +function hp_op_for_isequal!(x,y, x1) + Threads.@threads for i in 1:length(x) + x[i] &= isequal(y[i], x1[i]) + end + x +end + +function row_isequal(ds::AbstractDataset, cols = :; threads = true) + colsidx = index(ds)[cols] + init0 = ones(Bool, nrow(ds)) + length(colsidx) == 1 && return init0 + x1 = _columns(ds)[colsidx[1]] + if threads + mapreduce(identity, (x,y)->hp_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0) + else + mapreduce(identity, (x,y)->_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0) + end +end + + + + +# TODO probably we should use this approach instead of mapreduce_indexed +function _op_for_findfirst!(x, y, f, idx, missref) + idx[] += 1 + x .= ifelse.(isequal.(missref, x) .& isequal.(true, f.(y)), idx, x) + x +end + +function hp_op_for_findfirst!(x, y, f, idx, missref) + idx[] += 1 + Threads.@threads for i in 1:length(x) + x[i] = ifelse(isequal(missref, x[i]) & isequal(true, f(y[i])), idx[], x[i]) + end + x +end + +function _op_for_findlast!(x, y, f, idx, missref) + idx[] += 1 + x .= ifelse.(isequal.(true, f.(y)), idx, x) + x +end + +function hp_op_for_findlast!(x, y, f, idx, missref) + idx[] += 1 + Threads.@threads for i in 1:length(x) + x[i] = ifelse(isequal(true, f(y[i])), idx[], x[i]) + end + x +end + +# TODO probably we should use threads argument instead of seperate functions for hp version +function row_findfirst(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true) + colsidx = index(ds)[cols] + idx = Ref{Int}(0) + colnames_pa = allowmissing(PooledArray(names(ds, colsidx))) + push!(colnames_pa, missing) + missref = get(colnames_pa.invpool, missing, 0) + init0 = fill(missref, nrow(ds)) + if threads + mapreduce(identity, (x,y)->hp_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0) + else + mapreduce(identity, (x,y)->_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0) + end + colnames_pa.refs = init0 + colnames_pa +end + +function row_findlast(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true) + colsidx = index(ds)[cols] + idx = Ref{Int}(0) + colnames_pa = allowmissing(PooledArray(names(ds, colsidx))) + push!(colnames_pa, missing) + missref = get(colnames_pa.invpool, missing, 0) + init0 = fill(missref, nrow(ds)) + if threads + mapreduce(identity, (x,y)->hp_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0) + else + mapreduce(identity, (x,y)->_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0) + end + colnames_pa.refs = init0 + colnames_pa +end + + + function _op_for_coalesce!(x, y) if all(!ismissing, x) x @@ -399,6 +489,26 @@ function row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number}); dscopy end +function _op_for_issorted!(x, y, res) + res .&= .!isless.(y, x) + y +end +function _op_for_issorted_rev!(x, y, res) + res .&= .!isless.(x, y) + y +end + +function row_issorted(ds::AbstractDataset, cols; rev = false) + colsidx = index(ds)[cols] + init0 = ones(Bool, nrow(ds)) + if rev + mapreduce(identity, (x, y)->_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx)) + else + mapreduce(identity, (x, y)->_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx)) + end + init0 +end + # TODO is it possible to have a faster row_count_unique?? function _fill_prehashed!(prehashed, y, f, n, j) @views copy!(prehashed[:, j] , _Prehashed.(hash.(f.(y)))) diff --git a/src/sort/sort.jl b/src/sort/sort.jl index fbb9b091..f4af9f35 100644 --- a/src/sort/sort.jl +++ b/src/sort/sort.jl @@ -182,12 +182,9 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats inbits[1] = true for j in 1:length(colsidx) v = _columns(ds)[colsidx[j]] - for rng in 1:lastvalid - lo = starts[rng] - rng == lastvalid ? hi = nrow(ds) : hi = starts[rng+1] - 1 - part_res = _issorted_barrier(v, Base.Order.ord(isless, by[j], revs[j]), lo, hi) - !part_res && return false, starts, lastvalid, colsidx, revs, mapformats - end + _ord = Base.Order.ord(isless, by[j], revs[j]) + part_res = _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrow(ds)) + !part_res && return false, starts, lastvalid, colsidx, revs, mapformats _find_starts_of_groups!(_columns(ds)[colsidx[j]], 1:nrow(ds), by[j], inbits) lastvalid = _fill_starts_from_inbits!(starts, inbits) lastvalid == nrow(ds) && return true, starts, lastvalid, colsidx, revs, mapformats @@ -196,6 +193,17 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats res, starts, lastvalid, colsidx, revs, mapformats end +function _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrows) + part_res = ones(Bool, Threads.nthreads()) + Threads.@threads for rng in 1:lastvalid + lo = starts[rng] + rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 + part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) + !part_res[Threads.threadid()] && break + end + all(part_res) +end + function _fill_starts_from_inbits!(starts, inbits) lastvalid = 1 @inbounds for i in 1:length(inbits) @@ -209,7 +217,7 @@ end function _issorted_barrier(v, _ord, lo, hi) lo >= hi && return true - for i in lo+1:hi + @inbounds for i in lo+1:hi Base.Order.lt(_ord, v[i], v[i-1]) && return false end true diff --git a/test/byrow.jl b/test/byrow.jl index 217c8156..b1fa7d92 100644 --- a/test/byrow.jl +++ b/test/byrow.jl @@ -44,6 +44,56 @@ sds = view(ds, [1,2,2,1,3,4,5,5,5], [2,1]) @test byrow(sds, isequal, :, threads = true) == [0,1,1,0,0,0, 1,1,1] @test byrow(sds, isequal, [1], threads = true) == ones(9) + + ds = Dataset(x1 = [1,2,3,4,missing], x2 = [3,2,4,5, missing]) + @test byrow(ds, issorted, :) == [true, true, true, true, true] + @test byrow(ds, issorted, :, rev = true) == [false, true, false, false, true] + + ds = Dataset(randn(10000, 3), :auto) + map!(ds, x->rand()<.1 ? missing : x, :) + dsm = Matrix(ds) + @test byrow(ds, issorted, :) == issorted.(eachrow(dsm)) + @test byrow(ds, issorted, :, rev = true) == issorted.(eachrow(dsm), rev = true) + insertcols!(ds, 1, :y=>rand(-1:1, nrow(ds))) + dsm = Matrix(ds) + @test byrow(ds, issorted, :) == byrow(ds, issorted, :, threads = false) == issorted.(eachrow(dsm)) + @test byrow(ds, issorted, :, rev = true) == byrow(ds, issorted, :, rev = true, threads = false) == issorted.(eachrow(dsm), rev = true) + + ds = Dataset(g = [1, 1, 1, 2, 2], + x1_int = [0, 0, 1, missing, 2], + x2_int = [3, 2, 1, 3, -2], + x1_float = [1.2, missing, -1.0, 2.3, 10], + x2_float = [missing, missing, 3.0, missing, missing], + x3_float = [missing, missing, -1.4, 3.0, -100.0]) + @test isequal(byrow(ds, findfirst, :, by = ismissing), ["x2_float", "x1_float", missing, "x1_int", "x2_float"]) + @test isequal(byrow(ds, findlast, :, by = ismissing), ["x3_float", "x3_float", missing, "x2_float", "x2_float"]) + @test isequal(byrow(ds, findfirst, :, by = x->isless(x,0)), [missing, missing, "x1_float", missing, "x2_int"]) + @test isequal(byrow(ds, findlast, :, by = x->isless(x,0)), [missing, missing, "x3_float", missing, "x3_float"]) + @test isequal(byrow(ds, findfirst, :, by = x->1), ["g","g","g", "g","g"]) + @test isequal(byrow(ds, findfirst, :), ["g","g","g", missing, missing]) + @test isequal(byrow(ds, findlast, :), ["g","g","x2_int", missing, missing]) + @test isequal(byrow(ds, findfirst, [3,2,1], by = isequal(2)) ,byrow(ds, findlast, 1:3, by = isequal(2))) + @test isequal(byrow(ds, findfirst, 1:3, by = isequal(2)) ,byrow(ds, findlast, [3,2,1], by = isequal(2))) + + + sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4]) + @test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0))) + sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4]) + @test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0))) + + sds = view(ds, rand(1:5, 100), [2,1,3,4]) + @test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0))) + @test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0))) + @test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0))) + + end @testset "cum*/!" begin