Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions docs/src/man/byrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,36 @@ In the above benchmark, `byrow` should be even more performant when the data set

Generally, `byrow` is efficient for any `fun` which returns a single value for each row, however, it is fine tuned for the following functions:

* `all`
* `any`
* `argmax`
* `argmin`
* `coalesce`
* `count`
* `hash`
* `isequal`
* `maximum`
* `mean`
* `minimum`
* `nunique`
* `prod`
* `std`
* `sum`
* `var`

The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, and `isequal` is:
* `all` : Test whether all elements of a boolean collection are `true`
* `any` : Test whether any elements of a boolean collection are `true`
* `argmax` : Return the column name of the maximum element
* `argmin` : Return the column name of the minimum element
* `coalesce` : Return the first value which is not equal to `missing`
* `count` : Count the number of `trues`
* `findfirst` : Return the column name of the first true value
* `findlast` : Return the column name of the last true value
* `hash` : Compute an integer hash code
* `isequal` : Return `true` when all values are equal
* `issorted` : Check if the values are sorted
* `maximum` : Return the maximum value
* `mean` : Compute the mean value
* `minimum` : Return the minimum value
* `nunique` : Return the number of unique values
* `prod` : Return the product of values
* `std` : Compute the standard deviation of values
* `sum` : Return the sum of values
* `var` : Compute the variance of values

The common syntax of `byrow` for all of these functions except `nunique`, `coalesce`, `isequal`, and `issorted` is:

`byrow(ds, fun, cols; [by , threads = true])`

The `by` keyword argument is for specifying a function to call on each value before calling `fun` to aggregate the result, and `threads = true` causes `byrow` to exploit all cores available to Julia for performing the computations.

The `nunique` function doesn't accept `threads` argument, however, it has an extra keyword argument `count_missing`. `nunique` counts the number of unique values of each row, and `count_missing = true` counts missings as a unique value.

The `coalesce` and `isequal` functions don't accept `by` argument.
The `coalesce`, `isequal`, and `issorted` functions don't accept `by` argument, however, `issorted` accepts extra keyword argument `rev` which is set to `false` by default.


### Examples

Expand Down
7 changes: 7 additions & 0 deletions src/byrow/byrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ function byrow(ds::AbstractDataset, ::typeof(all), cols::MultiColumnIndex = :; b
end
byrow(ds::AbstractDataset, ::typeof(all), col::ColumnIndex; by = x->isequal(true, x), threads = nrow(ds)>1000) = byrow(ds, all, [col]; by = by, threads = threads)

byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = row_isequal(ds, cols, threads = threads)
byrow(ds::AbstractDataset, ::typeof(findfirst), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findfirst(ds, by, cols; threads = threads)
byrow(ds::AbstractDataset, ::typeof(findlast), cols::MultiColumnIndex; by = identity, threads = nrow(ds)> 1000) = row_findlast(ds, by, cols; threads = threads)


byrow(ds::AbstractDataset, ::typeof(coalesce), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_coalesce(ds, cols) : row_coalesce(ds, cols)

byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; threads = nrow(ds)>1000) = threads ? hp_row_isequal(ds, cols) : row_isequal(ds, cols)
Expand Down Expand Up @@ -103,6 +108,8 @@ byrow(ds::AbstractDataset, ::typeof(sort), col::ColumnIndex; threads = true, kwa
byrow(ds::AbstractDataset, ::typeof(sort!), cols::MultiColumnIndex = names(ds, Union{Missing, Number}); threads = true, kwargs...) = threads ? hp_row_sort!(ds, cols; kwargs...) : row_sort!(ds, cols; kwargs...)
byrow(ds::AbstractDataset, ::typeof(sort!), col::ColumnIndex; threads = true, kwargs...) = byrow(ds, sort!, [col]; threads = threads, kwargs...)

byrow(ds::AbstractDataset, ::typeof(issorted), cols::MultiColumnIndex; threads = nrow(ds)>1000, rev = false) = threads ? hp_row_issorted(ds, cols; rev = rev) : row_issorted(ds, cols; rev = rev)

byrow(ds::AbstractDataset, ::typeof(stdze), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze(ds, cols)

byrow(ds::AbstractDataset, ::typeof(stdze!), cols::MultiColumnIndex = names(ds, Union{Missing, Number})) = row_stdze!(ds, cols)
Expand Down
24 changes: 24 additions & 0 deletions src/byrow/hp_row_functions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,30 @@ function hp_row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number
dscopy
end

function hp_op_for_issorted!(x, y, res)
Threads.@threads for i in 1:length(x)
res[i] &= !isless(y[i], x[i])
end
y
end
function hp_op_for_issorted_rev!(x, y, res)
Threads.@threads for i in 1:length(x)
res[i] &= !isless(x[i], y[i])
end
y
end

function hp_row_issorted(ds::AbstractDataset, cols; rev = false)
colsidx = index(ds)[cols]
init0 = ones(Bool, nrow(ds))
if rev
mapreduce(identity, (x, y)->hp_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx))
else
mapreduce(identity, (x, y)->hp_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx))
end
init0
end


function hp_row_generic(ds::AbstractDataset, f::Function, cols::MultiColumnIndex)
colsidx = index(ds)[cols]
Expand Down
110 changes: 110 additions & 0 deletions src/byrow/row_functions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,96 @@ function row_all(ds::AbstractDataset, f::Function, cols = :)
end
row_all(ds::AbstractDataset, cols = :) = row_all(ds, isequal(true), cols)

function _op_for_isequal!(x,y, x1)
x .&= isequal.(y, x1)
x
end
function hp_op_for_isequal!(x,y, x1)
Threads.@threads for i in 1:length(x)
x[i] &= isequal(y[i], x1[i])
end
x
end

function row_isequal(ds::AbstractDataset, cols = :; threads = true)
colsidx = index(ds)[cols]
init0 = ones(Bool, nrow(ds))
length(colsidx) == 1 && return init0
x1 = _columns(ds)[colsidx[1]]
if threads
mapreduce(identity, (x,y)->hp_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0)
else
mapreduce(identity, (x,y)->_op_for_isequal!(x,y,x1), view(_columns(ds),colsidx), init = init0)
end
end




# TODO probably we should use this approach instead of mapreduce_indexed
function _op_for_findfirst!(x, y, f, idx, missref)
idx[] += 1
x .= ifelse.(isequal.(missref, x) .& isequal.(true, f.(y)), idx, x)
x
end

function hp_op_for_findfirst!(x, y, f, idx, missref)
idx[] += 1
Threads.@threads for i in 1:length(x)
x[i] = ifelse(isequal(missref, x[i]) & isequal(true, f(y[i])), idx[], x[i])
end
x
end

function _op_for_findlast!(x, y, f, idx, missref)
idx[] += 1
x .= ifelse.(isequal.(true, f.(y)), idx, x)
x
end

function hp_op_for_findlast!(x, y, f, idx, missref)
idx[] += 1
Threads.@threads for i in 1:length(x)
x[i] = ifelse(isequal(true, f(y[i])), idx[], x[i])
end
x
end

# TODO probably we should use threads argument instead of seperate functions for hp version
function row_findfirst(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true)
colsidx = index(ds)[cols]
idx = Ref{Int}(0)
colnames_pa = allowmissing(PooledArray(names(ds, colsidx)))
push!(colnames_pa, missing)
missref = get(colnames_pa.invpool, missing, 0)
init0 = fill(missref, nrow(ds))
if threads
mapreduce(identity, (x,y)->hp_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
else
mapreduce(identity, (x,y)->_op_for_findfirst!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
end
colnames_pa.refs = init0
colnames_pa
end

function row_findlast(ds::AbstractDataset, f, cols = names(ds, Union{Missing, Number}); threads = true)
colsidx = index(ds)[cols]
idx = Ref{Int}(0)
colnames_pa = allowmissing(PooledArray(names(ds, colsidx)))
push!(colnames_pa, missing)
missref = get(colnames_pa.invpool, missing, 0)
init0 = fill(missref, nrow(ds))
if threads
mapreduce(identity, (x,y)->hp_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
else
mapreduce(identity, (x,y)->_op_for_findlast!(x,y,f,idx, missref), view(_columns(ds),colsidx), init = init0)
end
colnames_pa.refs = init0
colnames_pa
end



function _op_for_coalesce!(x, y)
if all(!ismissing, x)
x
Expand Down Expand Up @@ -399,6 +489,26 @@ function row_sort(ds::AbstractDataset, cols = names(ds, Union{Missing, Number});
dscopy
end

function _op_for_issorted!(x, y, res)
res .&= .!isless.(y, x)
y
end
function _op_for_issorted_rev!(x, y, res)
res .&= .!isless.(x, y)
y
end

function row_issorted(ds::AbstractDataset, cols; rev = false)
colsidx = index(ds)[cols]
init0 = ones(Bool, nrow(ds))
if rev
mapreduce(identity, (x, y)->_op_for_issorted_rev!(x, y, init0), view(_columns(ds),colsidx))
else
mapreduce(identity, (x, y)->_op_for_issorted!(x, y, init0), view(_columns(ds),colsidx))
end
init0
end

# TODO is it possible to have a faster row_count_unique??
function _fill_prehashed!(prehashed, y, f, n, j)
@views copy!(prehashed[:, j] , _Prehashed.(hash.(f.(y))))
Expand Down
22 changes: 15 additions & 7 deletions src/sort/sort.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,9 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats
inbits[1] = true
for j in 1:length(colsidx)
v = _columns(ds)[colsidx[j]]
for rng in 1:lastvalid
lo = starts[rng]
rng == lastvalid ? hi = nrow(ds) : hi = starts[rng+1] - 1
part_res = _issorted_barrier(v, Base.Order.ord(isless, by[j], revs[j]), lo, hi)
!part_res && return false, starts, lastvalid, colsidx, revs, mapformats
end
_ord = Base.Order.ord(isless, by[j], revs[j])
part_res = _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrow(ds))
!part_res && return false, starts, lastvalid, colsidx, revs, mapformats
_find_starts_of_groups!(_columns(ds)[colsidx[j]], 1:nrow(ds), by[j], inbits)
lastvalid = _fill_starts_from_inbits!(starts, inbits)
lastvalid == nrow(ds) && return true, starts, lastvalid, colsidx, revs, mapformats
Expand All @@ -196,6 +193,17 @@ function _issorted(ds, cols::MultiColumnIndex, ::Val{T}; rev = false, mapformats
res, starts, lastvalid, colsidx, revs, mapformats
end

function _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrows)
part_res = ones(Bool, Threads.nthreads())
Threads.@threads for rng in 1:lastvalid
lo = starts[rng]
rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1
part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi)
!part_res[Threads.threadid()] && break
end
all(part_res)
end

function _fill_starts_from_inbits!(starts, inbits)
lastvalid = 1
@inbounds for i in 1:length(inbits)
Expand All @@ -209,7 +217,7 @@ end

function _issorted_barrier(v, _ord, lo, hi)
lo >= hi && return true
for i in lo+1:hi
@inbounds for i in lo+1:hi
Base.Order.lt(_ord, v[i], v[i-1]) && return false
end
true
Expand Down
50 changes: 50 additions & 0 deletions test/byrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,56 @@
sds = view(ds, [1,2,2,1,3,4,5,5,5], [2,1])
@test byrow(sds, isequal, :, threads = true) == [0,1,1,0,0,0, 1,1,1]
@test byrow(sds, isequal, [1], threads = true) == ones(9)

ds = Dataset(x1 = [1,2,3,4,missing], x2 = [3,2,4,5, missing])
@test byrow(ds, issorted, :) == [true, true, true, true, true]
@test byrow(ds, issorted, :, rev = true) == [false, true, false, false, true]

ds = Dataset(randn(10000, 3), :auto)
map!(ds, x->rand()<.1 ? missing : x, :)
dsm = Matrix(ds)
@test byrow(ds, issorted, :) == issorted.(eachrow(dsm))
@test byrow(ds, issorted, :, rev = true) == issorted.(eachrow(dsm), rev = true)
insertcols!(ds, 1, :y=>rand(-1:1, nrow(ds)))
dsm = Matrix(ds)
@test byrow(ds, issorted, :) == byrow(ds, issorted, :, threads = false) == issorted.(eachrow(dsm))
@test byrow(ds, issorted, :, rev = true) == byrow(ds, issorted, :, rev = true, threads = false) == issorted.(eachrow(dsm), rev = true)

ds = Dataset(g = [1, 1, 1, 2, 2],
x1_int = [0, 0, 1, missing, 2],
x2_int = [3, 2, 1, 3, -2],
x1_float = [1.2, missing, -1.0, 2.3, 10],
x2_float = [missing, missing, 3.0, missing, missing],
x3_float = [missing, missing, -1.4, 3.0, -100.0])
@test isequal(byrow(ds, findfirst, :, by = ismissing), ["x2_float", "x1_float", missing, "x1_int", "x2_float"])
@test isequal(byrow(ds, findlast, :, by = ismissing), ["x3_float", "x3_float", missing, "x2_float", "x2_float"])
@test isequal(byrow(ds, findfirst, :, by = x->isless(x,0)), [missing, missing, "x1_float", missing, "x2_int"])
@test isequal(byrow(ds, findlast, :, by = x->isless(x,0)), [missing, missing, "x3_float", missing, "x3_float"])
@test isequal(byrow(ds, findfirst, :, by = x->1), ["g","g","g", "g","g"])
@test isequal(byrow(ds, findfirst, :), ["g","g","g", missing, missing])
@test isequal(byrow(ds, findlast, :), ["g","g","x2_int", missing, missing])
@test isequal(byrow(ds, findfirst, [3,2,1], by = isequal(2)) ,byrow(ds, findlast, 1:3, by = isequal(2)))
@test isequal(byrow(ds, findfirst, 1:3, by = isequal(2)) ,byrow(ds, findlast, [3,2,1], by = isequal(2)))


sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4])
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
sds = view(ds, rand(1:5, 100), [2,1,6,5,3,4])
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0)), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0)), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findfirst,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, :, by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,:, by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, :, by = x->isless(x,0)))

sds = view(ds, rand(1:5, 100), [2,1,3,4])
@test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0)), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0)))
@test isequal(byrow(sds, findfirst,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findfirst, [1,4,3,2], by = x->isless(x,0)))
@test isequal(byrow(sds, findlast,[1,4,3,2], by = x->isless(x,0), threads = true), byrow(Dataset(sds), findlast, [1,4,3,2], by = x->isless(x,0)))


end

@testset "cum*/!" begin
Expand Down