Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sink stream for LazyFrame #343

Merged
merged 14 commits into from
Aug 11, 2023
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# polars (development version)

## BREAKING CHANGES

- `$rpow()` is removed. It should never have been translated. Use `^` and `$pow()`
instead (#346).
- `<LazyFrame>$collect_background()` renamed `<LazyFrame>$collect_in_background()`
and reworked. Likewise `PolarsBackgroundHandle` reworked and renamed to
`RThreadHandle` (#311).
- `pl$scan_arrow_ipc` is now called `pl$scan_ipc` (#343).

## What's changed

- Stream query to file with `pl$sink_ipc()` and `pl$sink_parquet()` (#343)
- New method `$explode()` for `DataFrame` and `LazyFrame` (#314).
- New method `$clone()` for `LazyFrame` (#347).
- New methods `$optimization_toggle()` and `$profile()` for `LazyFrame` (#323).
Expand Down
4 changes: 4 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,10 @@ LazyFrame$collect <- function() .Call(wrap__LazyFrame__collect, self)

LazyFrame$collect_in_background <- function() .Call(wrap__LazyFrame__collect_in_background, self)

LazyFrame$sink_parquet <- function(path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) .Call(wrap__LazyFrame__sink_parquet, self, path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order)

LazyFrame$sink_ipc <- function(path, compression_method, maintain_order) .Call(wrap__LazyFrame__sink_ipc, self, path, compression_method, maintain_order)

LazyFrame$first <- function() .Call(wrap__LazyFrame__first, self)

LazyFrame$last <- function() .Call(wrap__LazyFrame__last, self)
Expand Down
4 changes: 2 additions & 2 deletions R/ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
#' @param memmap bool, mapped memory
#'
#' @return LazyFrame
#' @name scan_arrow_ipc
pl$scan_arrow_ipc = function(
#' @name scan_ipc
pl$scan_ipc = function(
path, # : str | Path,
n_rows = NULL, # : int | None = None,
cache = TRUE, # : bool = True,
Expand Down
185 changes: 177 additions & 8 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -267,22 +267,22 @@ LazyFrame_filter = "use_extendr_wrapper"
#' @description collect DataFrame by lazy query
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible / at
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
#' scan level.
#' @param projection_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param simplify_expression Boolean. Cache subtrees/file scans that are used
#' by multiple subtrees in the query plan.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' @param projection_pushdown Boolean. Select only the columns that are needed at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant folding
#' and replacing expensive operations with faster alternatives.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' Don't materialize sliced outputs
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param common_subplan_elimination Boolean. Cache subtrees/file scans that
#' @param common_subplan_elimination Boolean. Cache subtrees/file scans that
#' are used by multiple subtrees in the query plan.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @param collect_in_background Boolean. Detach this query from R session.
#' Computation will start in background. Get a handle which later can be converted
Expand All @@ -293,6 +293,9 @@ LazyFrame_filter = "use_extendr_wrapper"
#' @keywords LazyFrame DataFrame_new
#' @return A `DataFrame`
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
#' @seealso
#' - [`$sink_parquet()`][LazyFrame_sink_parquet()] stream query to a parquet file.
#' - [`$sink_ipc()`][LazyFrame_sink_ipc()] stream query to a arrow file.
LazyFrame_collect = function(
type_coercion = TRUE,
predicate_pushdown = TRUE,
Expand Down Expand Up @@ -373,6 +376,172 @@ LazyFrame_collect_in_background = function() {
.pr$LazyFrame$collect_in_background(self)
}

#' @title LazyFrame stream output to parquet file
#' @description
#' Persists a LazyFrame at the provided path.
#' This allows streaming results that are larger than RAM to be written to disk.
#' @param path String. The path of the parquet file
#' @param compression String. The compression method. One of
#' `c('uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd')`
#' Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression.
#' Choose “snappy” for more backwards compatibility guarantees when you deal with older parquet
#' readers.
#' @param compression_level NULL or Integer. Only used if method is one of
#' `c('gzip', 'brotli', 'zstd'`. The level of compression to use. Higher compression means smaller
#' files on disk. “gzip” : min-level: 0, max-level: 10. “brotli” : min-level: 0, max-level: 11.
#' “zstd” : min-level: 1, max-level: 22.
#' @param statistics Boolean. Whether compute and write column statistics.
#' This requires extra compute.
#' @param row_group_size NULL or Integer. Size of the row groups in number of rows. If NULL
#' (default), the chunks of the DataFrame are used. Writing in smaller chunks may reduce memory
#' pressure and improve writing speeds.
#' @param data_pagesize_limit NULL or Integer. If set NULL the limit will be ~1MB.
#' @param maintain_order Boolean. Whether maintain the order the data was processed.
#' Setting this to False will be slightly faster.
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
#' scan level.
#' @param projection_pushdown Boolean. Select only the columns that are needed at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant folding
#' and replacing expensive operations with faster alternatives.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' Don't materialize sliced outputs
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @examples
#' # sink table 'mtcars' from mem to parquet
#' tmpf = tempfile()
#' pl$LazyFrame(mtcars)$sink_parquet(tmpf)
#'
#' # stream a query end-to-end
#' tmpf2 = tempfile()
#' pl$scan_parquet(tmpf)$select(pl$col("cyl") * 2)$sink_parquet(tmpf2)
#'
#' # load parquet directly into a DataFrame / memory
#' pl$scan_parquet(tmpf2)$collect()
LazyFrame_sink_parquet = function(
sorhawell marked this conversation as resolved.
Show resolved Hide resolved
path,
compression = "zstd",
compression_level = 3,
statistics = FALSE,
row_group_size = NULL,
data_pagesize_limit = NULL,
maintain_order = TRUE,
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
FALSE,
TRUE
) |>
unwrap("in $sink_parquet(...)") |>
.pr$LazyFrame$sink_parquet(
path,
compression,
compression_level,
statistics,
row_group_size,
data_pagesize_limit,
maintain_order
) |>
unwrap("in $sink_parquet(...)") |>
invisible()
}


#' @title LazyFrame stream output to arrow ipc file
#' @description
#' Persists a LazyFrame at the provided path.
#' This allows streaming results that are larger than RAM to be written to disk
#' @param path string, the path of the arrow ipc file
#' @param compression NULL or string, the compression method. One of `{'lz4', 'zstd'}` if not NULL.
#' Choose “zstd” for good compression performance. Choose “lz4” for fast compression/decompression.
#' @param maintain_order bool, whether maintain the order the data was processed.
#' Setting this to FALSE will be slightly faster.
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
#' scan level.
#' @param projection_pushdown Boolean. Select only the columns that are needed at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant folding
#' and replacing expensive operations with faster alternatives.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' Don't materialize sliced outputs
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @examples
#' # sink table 'mtcars' from mem to ipc
#' tmpf = tempfile()
#' pl$LazyFrame(mtcars)$sink_ipc(tmpf)
#'
#' # stream a query end-to-end (not supported yet) https://github.com/pola-rs/polars/issues/10406
#' # tmpf2 = tempfile()
#' # pl$scan_ipc(tmpf)$select(pl$col("cyl") * 2)$collect()$lazy()$sink_ipc(tmpf2)
#'
#' # load ipc directly into a DataFrame / memory
#' # pl$scanipc(tmpf2)$collect()
#'
LazyFrame_sink_ipc = function(
path,
compression = "zstd",
maintain_order = TRUE,
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
FALSE,
TRUE
) |>
unwrap("in $sink_ipc(...)") |>
.pr$LazyFrame$sink_ipc(
path,
compression,
maintain_order
) |>
unwrap("in LazyFrame$sink_ipc(...)") |>
invisible()
}


#' @title Limits
#' @description take limit of n rows of query
#' @keywords LazyFrame
Expand Down
16 changes: 11 additions & 5 deletions man/LazyFrame_collect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions man/LazyFrame_sink_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading