Skip to content

Commit

Permalink
Implement group_by_dynamic() for DataFrame and LazyFrame (#691)
Browse files Browse the repository at this point in the history
Co-authored-by: eitsupi <ts1s1andn@gmail.com>
  • Loading branch information
etiennebacher and eitsupi committed Jan 14, 2024
1 parent 3545ee2 commit b0a70e9
Show file tree
Hide file tree
Showing 27 changed files with 1,313 additions and 111 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Collate:
'functions__lazy.R'
'functions__whenthen.R'
'group_by.R'
'group_by_dynamic.R'
'group_by_rolling.R'
'info.R'
'ipc.R'
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ S3method("$",RPolarsChainedWhen)
S3method("$",RPolarsDataFrame)
S3method("$",RPolarsDataType)
S3method("$",RPolarsDataTypeVector)
S3method("$",RPolarsDynamicGroupBy)
S3method("$",RPolarsErr)
S3method("$",RPolarsExpr)
S3method("$",RPolarsExprBinNameSpace)
Expand Down Expand Up @@ -69,6 +70,7 @@ S3method("[[",RPolarsChainedWhen)
S3method("[[",RPolarsDataFrame)
S3method("[[",RPolarsDataType)
S3method("[[",RPolarsDataTypeVector)
S3method("[[",RPolarsDynamicGroupBy)
S3method("[[",RPolarsErr)
S3method("[[",RPolarsExpr)
S3method("[[",RPolarsGroupBy)
Expand All @@ -90,6 +92,7 @@ S3method("|",RPolarsExpr)
S3method(.DollarNames,RPolarsChainedThen)
S3method(.DollarNames,RPolarsChainedWhen)
S3method(.DollarNames,RPolarsDataFrame)
S3method(.DollarNames,RPolarsDynamicGroupBy)
S3method(.DollarNames,RPolarsErr)
S3method(.DollarNames,RPolarsExpr)
S3method(.DollarNames,RPolarsGroupBy)
Expand All @@ -115,6 +118,7 @@ S3method(as.matrix,RPolarsLazyFrame)
S3method(as.vector,RPolarsSeries)
S3method(as_polars_df,ArrowTabular)
S3method(as_polars_df,RPolarsDataFrame)
S3method(as_polars_df,RPolarsDynamicGroupBy)
S3method(as_polars_df,RPolarsGroupBy)
S3method(as_polars_df,RPolarsLazyFrame)
S3method(as_polars_df,RPolarsLazyGroupBy)
Expand Down Expand Up @@ -162,6 +166,7 @@ S3method(print,RPolarsChainedThen)
S3method(print,RPolarsChainedWhen)
S3method(print,RPolarsDataFrame)
S3method(print,RPolarsDataType)
S3method(print,RPolarsDynamicGroupBy)
S3method(print,RPolarsErr)
S3method(print,RPolarsExpr)
S3method(print,RPolarsGroupBy)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- New method `$rolling()` for `DataFrame` and `LazyFrame`. When this is
applied, it creates an object of class `RPolarsRollingGroupBy` (#682, #694).
- New method `$group_by_dynamic()` for `DataFrame` and `LazyFrame`. When this
is applied, it creates an object of class `RPolarsDynamicGroupBy` (#691).
- New method `$sink_ndjson()` for LazyFrame (#681).
- New function `pl$duration()` to create a duration by components (week, day,
hour, etc.), and use them with date(time) variables (#692).
Expand Down
4 changes: 4 additions & 0 deletions R/as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ as_polars_df.RPolarsGroupBy = function(x, ...) {
#' @export
as_polars_df.RPolarsRollingGroupBy = as_polars_df.RPolarsGroupBy

#' @rdname as_polars_df
#' @export
as_polars_df.RPolarsDynamicGroupBy = as_polars_df.RPolarsGroupBy

#' @rdname as_polars_df
#' @export
as_polars_df.RPolarsSeries = function(x, ...) {
Expand Down
88 changes: 88 additions & 0 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1833,3 +1833,91 @@ DataFrame_rolling = function(index_column, period, offset = NULL, closed = "righ
}
construct_rolling_group_by(self, index_column, period, offset, closed, by, check_sorted)
}

#' @inherit LazyFrame_group_by_dynamic title description details params
#' @return A [GroupBy][GroupBy_class] object
#'
#' @examples
#' df = pl$DataFrame(
#' time = pl$date_range(
#' start = strptime("2021-12-16 00:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' end = strptime("2021-12-16 03:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' interval = "30m",
#' eager = TRUE,
#' ),
#' n = 0:6
#' )
#'
#' # get the sum in the following hour relative to the "time" column
#' df$group_by_dynamic("time", every = "1h")$agg(
#' vals = pl$col("n"),
#' sum = pl$col("n")$sum()
#' )
#'
#' # using "include_boundaries = TRUE" is helpful to see the period considered
#' df$group_by_dynamic("time", every = "1h", include_boundaries = TRUE)$agg(
#' vals = pl$col("n")
#' )
#'
#' # in the example above, the values didn't include the one *exactly* 1h after
#' # the start because "closed = 'left'" by default.
#' # Changing it to "right" includes values that are exactly 1h after. Note that
#' # the value at 00:00:00 now becomes included in the interval [23:00:00 - 00:00:00],
#' # even if this interval wasn't there originally
#' df$group_by_dynamic("time", every = "1h", closed = "right")$agg(
#' vals = pl$col("n")
#' )
#' # To keep both boundaries, we use "closed = 'both'". Some values now belong to
#' # several groups:
#' df$group_by_dynamic("time", every = "1h", closed = "both")$agg(
#' vals = pl$col("n")
#' )
#'
#' # Dynamic group bys can also be combined with grouping on normal keys
#' df = df$with_columns(groups = pl$Series(c("a", "a", "a", "b", "b", "a", "a")))
#' df
#'
#' df$group_by_dynamic(
#' "time",
#' every = "1h",
#' closed = "both",
#' by = "groups",
#' include_boundaries = TRUE
#' )$agg(pl$col("n"))
#'
#' # We can also create a dynamic group by based on an index column
#' df = pl$LazyFrame(
#' idx = 0:5,
#' A = c("A", "A", "B", "B", "B", "C")
#' )$with_columns(pl$col("idx")$set_sorted())
#' df
#'
#' df$group_by_dynamic(
#' "idx",
#' every = "2i",
#' period = "3i",
#' include_boundaries = TRUE,
#' closed = "right"
#' )$agg(A_agg_list = pl$col("A"))
DataFrame_group_by_dynamic = function(
index_column,
every,
period = NULL,
offset = NULL,
include_boundaries = FALSE,
closed = "left",
label = "left",
by = NULL,
start_by = "window",
check_sorted = TRUE) {
if (is.null(offset)) {
offset = paste0("-", every)
}
if (is.null(period)) {
period = every
}
construct_group_by_dynamic(
self, index_column, every, period, offset, include_boundaries, closed, label,
by, start_by, check_sorted
)
}
8 changes: 6 additions & 2 deletions R/expr__expr.R
Original file line number Diff line number Diff line change
Expand Up @@ -3507,6 +3507,7 @@ Expr_peak_max = function() {
#' column represents an index, it has to be either Int32 or Int64. Note that
#' Int32 gets temporarily cast to Int64, so if performance matters use an Int64
#' column.
#' @param ... Ignored.
#' @param period Length of the window, must be non-negative.
#' @param offset Offset of the window. Default is `-period`.
#' @param closed Define which sides of the temporal interval are closed
Expand Down Expand Up @@ -3569,8 +3570,11 @@ Expr_peak_max = function() {
#' df$with_columns(
#' sum_a_offset1 = pl$sum("a")$rolling(index_column = "dt", period = "2d", offset = "1d")
#' )
Expr_rolling = function(index_column, period, offset = NULL,
closed = "right", check_sorted = TRUE) {
Expr_rolling = function(
index_column,
...,
period, offset = NULL,
closed = "right", check_sorted = TRUE) {
if (is.null(offset)) {
offset = paste0("-", period)
}
Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ RPolarsLazyFrame$with_context <- function(contexts) .Call(wrap__RPolarsLazyFrame

RPolarsLazyFrame$rolling <- function(index_column, period, offset, closed, by, check_sorted) .Call(wrap__RPolarsLazyFrame__rolling, self, index_column, period, offset, closed, by, check_sorted)

RPolarsLazyFrame$group_by_dynamic <- function(index_column, every, period, offset, label, include_boundaries, closed, by, start_by, check_sorted) .Call(wrap__RPolarsLazyFrame__group_by_dynamic, self, index_column, every, period, offset, label, include_boundaries, closed, by, start_by, check_sorted)

#' @export
`$.RPolarsLazyFrame` <- function (self, name) { func <- RPolarsLazyFrame[[name]]; environment(func) <- environment(); func }

Expand Down
133 changes: 133 additions & 0 deletions R/group_by_dynamic.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#' Operations on Polars DataFrame grouped on time or integer values
#'
#' @return not applicable
#' @name DynamicGroupBy_class
NULL

RPolarsDynamicGroupBy = new.env(parent = emptyenv())

#' @export
`$.RPolarsDynamicGroupBy` = function(self, name) {
func = RPolarsDynamicGroupBy[[name]]
environment(func) = environment()
func
}

#' @export
`[[.RPolarsDynamicGroupBy` = `$.RPolarsDynamicGroupBy`

#' @export
#' @noRd
.DollarNames.RPolarsDynamicGroupBy = function(x, pattern = "") {
paste0(ls(RPolarsDynamicGroupBy, pattern = pattern), "()")
}

#' The internal DynamicGroupBy constructor
#' @return The input as grouped DataFrame
#' @noRd
construct_group_by_dynamic = function(
df, index_column, every, period, offset, include_boundaries, closed, label,
by, start_by, check_sorted) {
if (!inherits(df, "RPolarsDataFrame")) {
stop("internal error: construct_group called not on DataFrame")
}
# Make an empty object. Store everything (including data) in attributes, so
# that we can keep the RPolarsDataFrame class on the data but still return
# a RPolarsDynamicGroupBy object here.
out = c(" ")
attr(out, "private") = list(
dat = df$clone(),
index_column = index_column,
every = every,
period = period,
offset = offset,
include_boundaries = include_boundaries,
closed = closed,
label = label,
by = by,
start_by = start_by,
check_sorted = check_sorted
)
class(out) = "RPolarsDynamicGroupBy"
out
}

#' print DynamicGroupBy
#'
#' @param x DataFrame
#' @param ... not used
#' @noRd
#' @return self
#' @export
#'
#' @examples
#' df = pl$DataFrame(
#' time = pl$date_range(
#' start = strptime("2021-12-16 00:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' end = strptime("2021-12-16 03:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' interval = "30m",
#' eager = TRUE,
#' ),
#' n = 0:6
#' )
#'
#' # get the sum in the following hour relative to the "time" column
#' df$group_by_dynamic("time", every = "1h")
print.RPolarsDynamicGroupBy = function(x, ...) {
.pr$DataFrame$print(attr(x, "private")$dat)
}


#' Aggregate over a DynamicGroupBy
#'
#' Aggregate a DataFrame over a time or integer window created with
#' `$group_by_dynamic()`.
#'
#' @param ... Exprs to aggregate over. Those can also be passed wrapped in a
#' list, e.g `$agg(list(e1,e2,e3))`.
#'
#' @return An aggregated [DataFrame][DataFrame_class]
#' @inherit DataFrame_group_by_dynamic examples
DynamicGroupBy_agg = function(...) {
prv = attr(self, "private")
prv$dat$
lazy()$
group_by_dynamic(
index_column = prv$index_column,
every = prv$every,
period = prv$period,
offset = prv$offset,
include_boundaries = prv$include_boundaries,
closed = prv$closed,
label = prv$label,
by = prv$by,
start_by = prv$start_by,
check_sorted = prv$check_sorted
)$
agg(unpack_list(..., .context = "in $agg():"))$
collect(no_optimization = TRUE)
}

#' Ungroup a DynamicGroupBy object
#'
#' Revert the `$group_by_dynamic()` operation. Doing
#' `<DataFrame>$group_by_dynamic(...)$ungroup()` returns the original `DataFrame`.
#'
#' @return [DataFrame][DataFrame_class]
#' @examples
#' df = pl$DataFrame(
#' time = pl$date_range(
#' start = strptime("2021-12-16 00:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' end = strptime("2021-12-16 03:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),
#' interval = "30m",
#' eager = TRUE,
#' ),
#' n = 0:6
#' )
#' df
#'
#' df$group_by_dynamic("time", every = "1h")$ungroup()
DynamicGroupBy_ungroup = function() {
prv = attr(self, "private")
prv$dat
}
8 changes: 1 addition & 7 deletions R/group_by_rolling.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ construct_rolling_group_by = function(df, index_column, period, offset, closed,
#'
#' df$rolling(index_column = "dt", period = "2d")
print.RPolarsRollingGroupBy = function(x, ...) {
prv = attr(x, "private")
.pr$DataFrame$print(prv$dat)
cat(paste("index column:", prv$index))
cat(paste("\nother groups:", toString(prv$by)))
cat(paste("\nperiod:", prv$period))
cat(paste("\noffset:", prv$offset))
cat(paste("\nclosed:", prv$closed))
.pr$DataFrame$print(attr(x, "private")$dat)
}


Expand Down
Loading

0 comments on commit b0a70e9

Please sign in to comment.