Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

<LazyFrame>$fetch() #319

Merged
merged 21 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ features. Unrelated breaking changes and new features are put in separate sectio
- Stream query to file with `pl$sink_ipc()` and `pl$sink_parquet()` (#343)
- New method `$explode()` for `DataFrame` and `LazyFrame` (#314).
- New method `$clone()` for `LazyFrame` (#347).
- New method `$fetch()` for `LazyFrame` (#319).
- New methods `$optimization_toggle()` and `$profile()` for `LazyFrame` (#323).
- `$with_column()` is now deprecated (following upstream `polars`). It will be
removed in 0.9.0. It should be replaced with `$with_columns()` (#313).
Expand Down
1 change: 1 addition & 0 deletions R/error__trait.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ call_to_string = function(call) paste(capture.output(print(call)), collapse = "\
#' @param err any type which impl as.character
#' @param context calling context
#' @keywords internal
#' @noRd
#' @return err as string
#' @examples
#' #
Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,8 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self,

LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$fetch <- function(n_rows) .Call(wrap__LazyFrame__fetch, self, n_rows)

LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)
Expand Down
117 changes: 109 additions & 8 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,17 @@ LazyFrame_filter = "use_extendr_wrapper"
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
#' scan level.
#' @param projection_pushdown Boolean. Select only the columns that are needed at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant folding
#' and replacing expensive operations with faster alternatives.
#' @param projection_pushdown Boolean. Select only the columns that are needed
#' at the scan level.
#' @param simplify_expression Boolean. Various optimizations, such as constant
#' folding and replacing expensive operations with faster alternatives.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' Don't materialize sliced outputs
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that occur on self-joins
#' or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and reused.
#' or unions.
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
Expand All @@ -297,6 +298,11 @@ LazyFrame_filter = "use_extendr_wrapper"
#' @return A `DataFrame`
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
#' @seealso
#' - [`$fetch()`][LazyFrame_fetch] - fast limited query check
#' - [`$profile()`][LazyFrame_profile] - returns as `$collect()` but also table with each operation
#' profiled.
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' - [`$sink_parquet()`][LazyFrame_sink_parquet()] stream query to a parquet file.
#' - [`$sink_ipc()`][LazyFrame_sink_ipc()] stream query to a arrow file.
LazyFrame_collect = function(
Expand Down Expand Up @@ -1183,13 +1189,108 @@ LazyFrame_dtypes = method_as_property(function() {
unwrap("in $dtypes()")
})


#' Fetch `n` rows of a LazyFrame
#'
#' This is similar to `$collect()` but limit the number of rows to collect. It
#' is mostly useful to check that a query works as expected.
#'
#' @keywords LazyFrame
#' @details
#' `$fetch()` does not guarantee the final number of rows in the DataFrame output.
#' It only guarantees that `n` rows are used at the beginning of the query.
#' Filters, join operations and a lower number of rows available in the scanned
#' file influence the final number of rows.
#'
#' @param n_rows Integer. Maximum number of rows to fetch.
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param projection_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param simplify_expression Boolean. Cache subtrees/file scans that are used
#' by multiple subtrees in the query plan.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param comm_subplan_elim Boolean. Will try to cache branching subplans that
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @return A DataFrame of maximum n_rows
#' @seealso
#' - [`$collect()`][LazyFrame_collect] - regular collect.
#' - [`$profile()`][LazyFrame_profile] - returns as `$collect()` but also table with each operation
#' profiled.
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' @examples
#'
#' # fetch 3 rows
#' pl$LazyFrame(iris)$fetch(3)
#'
#' # this fetch-query returns 4 rows, because we started with 3 and appended one
#' # row in the query (see section 'Details')
#' pl$LazyFrame(iris)$select(pl$col("Species")$append("flora gigantica, alien"))$fetch(3)
LazyFrame_fetch = function(
n_rows = 500,
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
no_optimization = FALSE,
streaming = FALSE) {

if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
comm_subplan_elim = FALSE
comm_subexpr_elim = FALSE
}

if (isTRUE(streaming)) {
comm_subplan_elim = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(\(self) .pr$LazyFrame$fetch(self, n_rows)) |>
unwrap("in $fetch()")
}

#' @title Collect and profile a lazy query.
#' @description This will run the query and return a list containing the materialized DataFrame and
#' a DataFrame that contains profiling information of each node that is executed.
#' @details The units of the timings are microseconds.
#'
#' @keywords LazyFrame
#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of each step.
#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of
#' each step.
#' @seealso
#' - [`$collect()`][LazyFrame_collect] - regular collect.
#' - [`$fetch()`][LazyFrame_fetch] - fast limited query check
#' - [`$collect_in_background()`][LazyFrame_collect_in_background] - non-blocking collect returns
#' a future handle. Can also just be used via `$collect(collect_in_background = TRUE)`.
#' @examples
#'
#' ## Simplest use case
Expand Down
20 changes: 13 additions & 7 deletions man/LazyFrame_collect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions man/LazyFrame_fetch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion man/LazyFrame_profile.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 0 additions & 23 deletions man/where_in.Rd

This file was deleted.

16 changes: 16 additions & 0 deletions src/rust/src/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,19 @@ pub fn profile_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult<(DataFrame
.map_err(polars_to_rpolars_err)
.map(|(result_df, profile_df)| (DataFrame(result_df), DataFrame(profile_df)))
}

pub fn fetch_with_r_func_support(lazy_df: pl::LazyFrame, n_rows: usize) -> RResult<DataFrame> {
concurrent_handler(
move |tc| {
let retval = lazy_df.fetch(n_rows);
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
.map_err(polars_to_rpolars_err)
.map(DataFrame)
}
9 changes: 8 additions & 1 deletion src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::concurrent::{collect_with_r_func_support, profile_with_r_func_support};
use crate::concurrent::{
collect_with_r_func_support, fetch_with_r_func_support, profile_with_r_func_support,
};
use crate::conversion::strings_to_smartstrings;

use crate::lazy::dsl::*;

use crate::rdataframe::DataFrame as RDF;
Expand Down Expand Up @@ -428,6 +431,10 @@ impl LazyFrame {
))
}

fn fetch(&self, n_rows: Robj) -> RResult<RDF> {
fetch_with_r_func_support(self.0.clone(), robj_to!(usize, n_rows)?)
}

#[allow(clippy::too_many_arguments)]
fn optimization_toggle(
&self,
Expand Down
Loading