Skip to content

Commit

Permalink
Refactor lit, col, DataFrame, Series (#369)
Browse files Browse the repository at this point in the history
Co-authored-by: Etienne Bacher <52219252+etiennebacher@users.noreply.github.com>
  • Loading branch information
sorhawell and etiennebacher committed Aug 31, 2023
1 parent afb7650 commit 21524b0
Show file tree
Hide file tree
Showing 32 changed files with 417 additions and 412 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Collate:
'rust_result.R'
's3_methods.R'
'series__series.R'
'series__trait.R'
'translation.R'
'vctrs.R'
'zzz.R'
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ S3method(as.list,Expr)
S3method(as.matrix,DataFrame)
S3method(as.matrix,LazyFrame)
S3method(as.vector,Series)
S3method(as_polars_series,POSIXlt)
S3method(as_polars_series,default)
S3method(as_polars_series,vctrs_rcrd)
S3method(c,Series)
S3method(dim,DataFrame)
S3method(dim,LazyFrame)
Expand Down Expand Up @@ -152,6 +155,7 @@ S3method(tail,LazyFrame)
S3method(unique,DataFrame)
S3method(unique,LazyFrame)
export(.pr)
export(as_polars_series)
export(knit_print.DataFrame)
export(pl)
importFrom(stats,median)
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ features. Unrelated breaking changes and new features are put in separate sectio
and allowed as default (#357).
- Added methods `pl$enable_string_cache()`, `pl$with_string_cache()` and `pl$using_string_cache()`
for joining/comparing Categorical series/columns (#361).
- Added an S3 generic `as_polars_series()` where users or developers of extensions
can define a custom way to convert their format to Polars format. This generic
must return a Polars series. See #368 for an example (#369).

# polars 0.7.0

Expand Down
122 changes: 26 additions & 96 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ DataFrame
#' placeholder name.
#'
#' @param make_names_unique default TRUE, any duplicated names will be prefixed a running number
#' @param parallel bool default FALSE, experimental multithreaded interpretation of R vectors
#' into a polars DataFrame. This is experimental as multiple threads read from R mem simultaneously.
#' So far no issues parallel read from R has been found.
#'
#' @return DataFrame
#' @keywords DataFrame_new
Expand All @@ -128,8 +125,10 @@ DataFrame
#' d = list(1L, 1:2, 1:3, 1:4, 1:5)
#' ))
#'
pl$DataFrame = function(..., make_names_unique = TRUE, parallel = FALSE) {
largs = list2(...)
pl$DataFrame = function(..., make_names_unique = TRUE, parallel = FALSE, via_select =TRUE) {


largs = unpack_list(...)

# no args crete empty DataFrame
if (length(largs) == 0L) {
Expand All @@ -141,16 +140,6 @@ pl$DataFrame = function(..., make_names_unique = TRUE, parallel = FALSE) {
return(largs[[1L]])
}

# if input is one list of expression unpack this one
Data = if (length(largs) == 1L && is.list(largs[[1]])) {
largs = largs[[1L]]
if (length(largs) == 0) {
return(.pr$DataFrame$default())
}
largs
}



# input guard
if (!is_DataFrame_data_input(largs)) {
Expand All @@ -163,8 +152,8 @@ pl$DataFrame = function(..., make_names_unique = TRUE, parallel = FALSE) {


## step 00 get max length to allow cycle 1-length inputs
largs_lengths = sapply(largs, length)
largs_lengths_max = if (is.integer(largs_lengths)) max(largs_lengths) else NULL
# largs_lengths = sapply(largs, length)
# largs_lengths_max = if (is.integer(largs_lengths)) max(largs_lengths) else NULL

## step1 handle column names
# keys are tentative new column names
Expand Down Expand Up @@ -200,64 +189,14 @@ pl$DataFrame = function(..., make_names_unique = TRUE, parallel = FALSE) {
}
}

## step 4

if (parallel) {
# interpret R vectors into series in DataFrame in parallel
aux_df = NULL # save Series temp to here
l = mapply(largs, keys, SIMPLIFY = FALSE, FUN = function(column, key) {
if (inherits(column, "Series")) {
if (is.null(aux_df)) {
aux_df <<- .pr$DataFrame$new_with_capacity(length(largs))
}
.pr$Series$rename_mut(column, key)
unwrap(.pr$DataFrame$set_column_from_series(aux_df, column))
column = NULL
} else {
if (length(column) == 1L && isTRUE(largs_lengths_max > 1L)) {
column = rep(column, largs_lengths_max)
}
column = convert_to_fewer_types(column) # type conversions on R side
}
column
})
names(l) = keys
# drop series from converted columns
l = l |> (\(x) if (length(x)) x[!sapply(x, is.null)] else x)()



if (length(l)) {
self = unwrap(.pr$DataFrame$new_par_from_list(l))
} else {
self = aux_df
}

# combine DataFrame if both defined and reorder columns
if (!is.null(aux_df) && length(l)) {
self = pl$concat(list(self, aux_df), rechunk = FALSE, how = "horizontal")
self = do.call(self$select, unname(lapply(keys, pl$col))) # reorder columns by keys
}
} else {
# buildDataFrame one column at the time
self = .pr$DataFrame$new_with_capacity(length(largs))
mapply(largs, keys, FUN = function(column, key) {
if (inherits(column, "Series")) {
.pr$Series$rename_mut(column, key)

unwrap(.pr$DataFrame$set_column_from_series(self, column))
} else {
if (length(column) == 1L && isTRUE(largs_lengths_max > 1L)) {
column = rep(column, largs_lengths_max)
}
column = convert_to_fewer_types(column) # type conversions on R side
unwrap(.pr$DataFrame$set_column_from_robj(self, column, key))
}
return(NULL)
})
}
## pass to pl$
names(largs) = keys
result(
lapply(largs, pl$lit) |>
do.call(what = pl$select)
) |>
unwrap("in pl$DataFrame()")

return(self)
}


Expand Down Expand Up @@ -716,19 +655,7 @@ DataFrame_sort = function(
#' (pl$col("Sepal.Length") + 2)$alias("add_2_SL")
#' )
DataFrame_select = function(...) {
args = unpack_list(...)
.pr$DataFrame$select(self, args) |>
and_then(\(df) result(msg = "internal error while renaming columns", {
expr_names = names(args)
if (!is.null(expr_names)) {
old_names = df$columns
new_names = old_names
has_expr_name = nchar(expr_names) >= 1L
new_names[has_expr_name] = expr_names[has_expr_name]
df$columns = new_names
}
df
})) |>
.pr$DataFrame$select(self, unpack_list(...)) |>
unwrap("in $select()")
}

Expand Down Expand Up @@ -791,7 +718,8 @@ DataFrame_shift_and_fill = function(fill_value, periods = 1) {
#' @description Add or modify columns with expressions
#' @name DataFrame_with_columns
#' @aliases with_columns
#' @param ... any expressions or string column name, or same wrapped in a list
#' @param ... any expressions or string column name, or same wrapped in a list. If first and only
#' element is a list, it is unwrap as a list of args.
#' @keywords DataFrame
#' @return DataFrame
#' @details Like dplyr `mutate()` as it keeps unmentioned columns unlike $select().
Expand All @@ -801,20 +729,22 @@ DataFrame_shift_and_fill = function(fill_value, periods = 1) {
#' (pl$col("Sepal.Length") + 2)$alias("add_2_SL")
#' )
#'
#' # same query
#' l_expr = list(
#' pl$col("Sepal.Length")$abs()$alias("abs_SL"),
#' (pl$col("Sepal.Length") + 2)$alias("add_2_SL")
#' )
#' pl$DataFrame(iris)$with_columns(l_expr)
#'
#'
#' # rename columns by naming expression is concidered experimental
#' pl$DataFrame(iris)$with_columns(
#' pl$col("Sepal.Length")$abs(), # not named expr will keep name "Sepal.Length"
#' SW_add_2 = (pl$col("Sepal.Width") + 2)
#' )
DataFrame_with_columns = function(...) {
largs = list2(...)

# unpack a single list
if (length(largs) == 1 && is.list(largs[[1]])) {
largs = largs[[1]]
}

do.call(self$lazy()$with_columns, largs)$collect()
.pr$DataFrame$with_columns(self, unpack_list(...)) |>
unwrap("in $with_columns()")
}

#' modify/append one column
Expand Down
15 changes: 2 additions & 13 deletions R/expr__expr.R
Original file line number Diff line number Diff line change
Expand Up @@ -921,19 +921,8 @@ Expr_apply = function(f, return_type = NULL, strict_return_type = TRUE, allow_fa
#' # vectors to literal implicitly
#' (pl$lit(2) + 1:4) / 4:1
Expr_lit = function(x) {
pcase(
is.null(x),
.pr$Expr$lit(NULL),
inherits(x, "Expr"),
Ok(x),
inherits(x, "Series"),
.pr$Expr$lit(x),
length(x) != 1L || inherits(x, c("list", "POSIXct", "PTime", "Date")),
{
result(pl$Series(x)) |> and_then(.pr$Expr$lit)
},
or_else = .pr$Expr$lit(x)
) |> unwrap("in lit()")
.Call(wrap__Expr__lit, x) |> #use .call reduces eval from 22us to 15us, not a bottle-next anyways
unwrap("in $lit()")
}

#' polars suffix
Expand Down
20 changes: 12 additions & 8 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ concat_list <- function(exprs) .Call(wrap__concat_list, exprs)

concat_str <- function(dotdotdot, separator) .Call(wrap__concat_str, dotdotdot, separator)

r_date_range_lazy <- function(start, end, every, closed, time_unit, time_zone) .Call(wrap__r_date_range_lazy, start, end, every, closed, time_unit, time_zone)
r_date_range_lazy <- function(start, end, every, closed, time_unit, time_zone, explode) .Call(wrap__r_date_range_lazy, start, end, every, closed, time_unit, time_zone, explode)

as_struct <- function(exprs) .Call(wrap__as_struct, exprs)

Expand All @@ -47,8 +47,6 @@ arrow_stream_to_rust <- function(rbr) invisible(.Call(wrap__arrow_stream_to_rust

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

internal_wrap_e <- function(robj, str_to_lit) .Call(wrap__internal_wrap_e, robj, str_to_lit)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -85,6 +83,10 @@ test_rthreadhandle <- function() .Call(wrap__test_rthreadhandle)

test_serde_df <- function(df) .Call(wrap__test_serde_df, df)

internal_wrap_e <- function(robj, str_to_lit) .Call(wrap__internal_wrap_e, robj, str_to_lit)

robj_to_col <- function(name, dotdotdot) .Call(wrap__robj_to_col, name, dotdotdot)

enable_string_cache <- function(toggle) .Call(wrap__enable_string_cache, toggle)

using_string_cache <- function() .Call(wrap__using_string_cache)
Expand All @@ -109,8 +111,6 @@ DataFrame$set_column_from_robj <- function(robj, name) .Call(wrap__DataFrame__se

DataFrame$set_column_from_series <- function(x) .Call(wrap__DataFrame__set_column_from_series, self, x)

DataFrame$new_par_from_list <- function(robj_list) .Call(wrap__DataFrame__new_par_from_list, robj_list)

DataFrame$with_row_count <- function(name, offset) .Call(wrap__DataFrame__with_row_count, self, name, offset)

DataFrame$print <- function() .Call(wrap__DataFrame__print, self)
Expand Down Expand Up @@ -143,6 +143,8 @@ DataFrame$drop_in_place <- function(names) .Call(wrap__DataFrame__drop_in_place,

DataFrame$select <- function(exprs) .Call(wrap__DataFrame__select, self, exprs)

DataFrame$with_columns <- function(exprs) .Call(wrap__DataFrame__with_columns, self, exprs)

DataFrame$by_agg <- function(group_exprs, agg_exprs, maintain_order) .Call(wrap__DataFrame__by_agg, self, group_exprs, agg_exprs, maintain_order)

DataFrame$to_struct <- function(name) .Call(wrap__DataFrame__to_struct, self, name)
Expand Down Expand Up @@ -979,8 +981,12 @@ LazyFrame$fill_null <- function(fill_value) .Call(wrap__LazyFrame__fill_null, se

LazyFrame$slice <- function(offset, length) .Call(wrap__LazyFrame__slice, self, offset, length)

LazyFrame$with_columns <- function(exprs) .Call(wrap__LazyFrame__with_columns, self, exprs)

LazyFrame$select <- function(exprs) .Call(wrap__LazyFrame__select, self, exprs)

LazyFrame$select_str_as_lit <- function(exprs) .Call(wrap__LazyFrame__select_str_as_lit, self, exprs)

LazyFrame$limit <- function(n) .Call(wrap__LazyFrame__limit, self, n)

LazyFrame$tail <- function(n) .Call(wrap__LazyFrame__tail, self, n)
Expand All @@ -993,8 +999,6 @@ LazyFrame$unique <- function(subset, keep, maintain_order) .Call(wrap__LazyFrame

LazyFrame$groupby <- function(exprs, maintain_order) .Call(wrap__LazyFrame__groupby, self, exprs, maintain_order)

LazyFrame$with_columns <- function(exprs) .Call(wrap__LazyFrame__with_columns, self, exprs)

LazyFrame$with_column <- function(expr) .Call(wrap__LazyFrame__with_column, self, expr)

LazyFrame$with_row_count <- function(name, offset) .Call(wrap__LazyFrame__with_row_count, self, name, offset)
Expand All @@ -1017,7 +1021,7 @@ LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, pro

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)

LazyFrame$explode <- function(dotdotdot_args) .Call(wrap__LazyFrame__explode, self, dotdotdot_args)
LazyFrame$explode <- function(dotdotdot) .Call(wrap__LazyFrame__explode, self, dotdotdot)

LazyFrame$clone_see_me_macro <- function() .Call(wrap__LazyFrame__clone_see_me_macro, self)

Expand Down
7 changes: 5 additions & 2 deletions R/functions__eager.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pl$concat = function(
#' @param name name of series
#' @param time_unit option string ("ns" "us" "ms") duration of one int64 value on polars side
#' @param time_zone optional string describing a timezone.
#' @param explode if TRUE (default) all created ranges will be "unlisted" into on column, if FALSE
#' output will be a list of ranges.
#'
#' @details
#' If param time_zone is not defined the Series will have no time zone.
Expand Down Expand Up @@ -133,7 +135,8 @@ pl$date_range = function(
closed = "both", # : ClosedInterval = "both",
name = NULL, # : str | None = None,
time_unit = "us",
time_zone = NULL # : str | None = None
time_zone = NULL, # : str | None = None
explode = TRUE
) {
if (missing(end)) {
end = start
Expand All @@ -154,7 +157,7 @@ pl$date_range = function(
start = cast_naive_value_to_datetime_expr(start)
end = cast_naive_value_to_datetime_expr(end)

r_date_range_lazy(start, end, interval, closed, time_unit, time_zone) |>
r_date_range_lazy(start, end, interval, closed, time_unit, time_zone, explode) |>
and_then(f_eager_eval) |>
unwrap("in pl$date_range()")
}
Expand Down
Loading

0 comments on commit 21524b0

Please sign in to comment.