Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement sink stream for LazyFrame #343

Merged
merged 14 commits into from
Aug 11, 2023
5 changes: 3 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Suggests:
rmarkdown,
testthat (>= 3.0.0),
tibble,
withr
withr,
callr
Config/Needs/website:
etiennebacher/altdoc,
here,
Expand Down Expand Up @@ -97,5 +98,5 @@ Collate:
'translation.R'
'vctrs.R'
'zzz.R'
Config/rextendr/version: 0.3.1
Config/rextendr/version: 0.3.1.9000
VignetteBuilder: knitr
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# polars (development version)

## BREAKING CHANGES
- `pl$scan_arrow_ipc` is now called `pl$scan_ipc` (#343).

## What's changed
- New method `$explode()` for `DataFrame` and `LazyFrame` (#314).
- Stream query to file with `pl$sink_ipc()` and `pl$sink_parquet()` (#343)

- New method `$explode()` for `DataFrame` and `LazyFrame`.

# polars 0.7.0

Expand Down
4 changes: 4 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,10 @@ LazyFrame$collect <- function() .Call(wrap__LazyFrame__collect, self)

LazyFrame$collect_handled <- function() .Call(wrap__LazyFrame__collect_handled, self)

LazyFrame$sink_parquet <- function(path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) .Call(wrap__LazyFrame__sink_parquet, self, path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order)

LazyFrame$sink_ipc <- function(path, compression_method, maintain_order) .Call(wrap__LazyFrame__sink_ipc, self, path, compression_method, maintain_order)

LazyFrame$first <- function() .Call(wrap__LazyFrame__first, self)

LazyFrame$last <- function() .Call(wrap__LazyFrame__last, self)
Expand Down
4 changes: 2 additions & 2 deletions R/ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
#' @param memmap bool, mapped memory
#'
#' @return LazyFrame
#' @name scan_arrow_ipc
pl$scan_arrow_ipc = function(
#' @name scan_ipc
pl$scan_ipc = function(
path, # : str | Path,
n_rows = NULL, # : int | None = None,
cache = TRUE, # : bool = True,
Expand Down
56 changes: 56 additions & 0 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,62 @@ LazyFrame_collect_background = function() {
.pr$LazyFrame$collect_background(self)
}

#' @title LazyFrame stream output to parquet file
#' @name sink_parquet
#' @description Stream the content of LazyFrame into a parquet file.
#' @param path string, the path of the parquet file
#' @param compression string, the compression method. One of {'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
#' @param compression_level null or int. Only used if method is one of {'gzip', 'brotli', 'zstd'}
#' @param statistics bool, whether compute and write column statistics.
#' @param row_group_size NULL or positive integer. If set NULL a single row group will be created.
#' @param data_pagesize_limit NULL or positive integer. If set NULL the limit will be 2^20 bytes.
#' @param maintain_order bool, whether maintain the order the data was processed.
LazyFrame_sink_parquet = function(
sorhawell marked this conversation as resolved.
Show resolved Hide resolved
path,
Sicheng-Pan marked this conversation as resolved.
Show resolved Hide resolved
compression = "zstd",
compression_level = 3,
statistics = FALSE,
row_group_size = NULL,
data_pagesize_limit = NULL,
maintain_order = TRUE
) {
.pr$LazyFrame$sink_parquet(
self,
path,
compression,
compression_level,
statistics,
row_group_size,
data_pagesize_limit,
maintain_order
) |>
unwrap("in LazyFrame$sink_parquet(...)") |>
invisible()
}


#' @title LazyFrame stream output to arrow ipc file
#' @name sink_ipc
#' @description Stream the content of LazyFrame into an arrow ipc file.
#' @param path string, the path of the arrow ipc file
#' @param compression NULL or string, the compression method. One of {'lz4', 'zstd'} if not NULL.
#' @param maintain_order bool, whether maintain the order the data was processed.
LazyFrame_sink_ipc = function(
path,
Sicheng-Pan marked this conversation as resolved.
Show resolved Hide resolved
compression = "zstd",
maintain_order = TRUE
) {
.pr$LazyFrame$sink_ipc(
self,
path,
compression,
maintain_order
) |>
unwrap("in LazyFrame$sink_ipc(...)") |>
invisible()
}


#' @title Limits
#' @description take limit of n rows of query
#' @keywords LazyFrame
Expand Down
4 changes: 2 additions & 2 deletions man/scan_arrow_ipc.Rd → man/scan_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions man/sink_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions man/sink_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 41 additions & 10 deletions src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::concurrent::{handle_thread_r_requests, PolarsBackgroundHandle};
use crate::conversion::strings_to_smartstrings;
use crate::lazy::dsl::*;
use crate::rdatatype::new_join_type;
use crate::rdatatype::new_quantile_interpolation_option;
use crate::rdatatype::new_unique_keep_strategy;
use crate::rdatatype::{new_asof_strategy, RPolarsDataType};
use crate::rdatatype::{
new_asof_strategy, new_ipc_compression, new_join_type, new_parquet_compression,
new_quantile_interpolation_option, new_unique_keep_strategy, RPolarsDataType,
};
use crate::robj_to;
use crate::rpolarserr::rerr;
use crate::rpolarserr::RResult;
use crate::rpolarserr::{Rctx, WithRctx};
use crate::utils::wrappers::null_to_opt;
use crate::utils::{r_result_list, try_f64_into_usize};
use crate::rpolarserr::{polars_to_rpolars_err, rerr, RResult, Rctx, WithRctx};
use crate::utils::{r_result_list, try_f64_into_usize, wrappers::null_to_opt};
use extendr_api::prelude::*;
use polars::chunked_array::object::AsOfOptions;
use polars::frame::explode::MeltArgs;
Expand Down Expand Up @@ -80,11 +77,45 @@ impl LazyFrame {
})
}

pub fn collect_handled(&self) -> crate::rpolarserr::RResult<crate::rdataframe::DataFrame> {
pub fn collect_handled(&self) -> RResult<crate::rdataframe::DataFrame> {
use crate::rpolarserr::WithRctx;
handle_thread_r_requests(self.clone().0).when("calling $collect() on LazyFrame")
}

pub fn sink_parquet(
&self,
path: Robj,
compression_method: Robj,
compression_level: Robj,
statistics: Robj,
row_group_size: Robj,
data_pagesize_limit: Robj,
maintain_order: Robj,
) -> RResult<()> {
let pqwo = polars::prelude::ParquetWriteOptions {
compression: new_parquet_compression(compression_method, compression_level)?,
statistics: robj_to!(bool, statistics)?,
row_group_size: robj_to!(Option, usize, row_group_size)?,
data_pagesize_limit: robj_to!(Option, usize, data_pagesize_limit)?,
maintain_order: robj_to!(bool, maintain_order)?,
};
self.0
.clone()
.sink_parquet(robj_to!(String, path)?.into(), pqwo)
.map_err(polars_to_rpolars_err)
}

fn sink_ipc(&self, path: Robj, compression_method: Robj, maintain_order: Robj) -> RResult<()> {
let ipcwo = polars::prelude::IpcWriterOptions {
compression: new_ipc_compression(compression_method)?,
maintain_order: robj_to!(bool, maintain_order)?,
};
self.0
.clone()
.sink_ipc(robj_to!(String, path)?.into(), ipcwo)
.map_err(polars_to_rpolars_err)
}

fn first(&self) -> Self {
self.0.clone().first().into()
}
Expand Down
46 changes: 45 additions & 1 deletion src/rust/src/rdatatype.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::utils::r_result_list;
use crate::utils::wrappers::Wrap;
use extendr_api::prelude::*;
use polars::prelude::{self as pl};
use polars::prelude as pl;
use polars_core::prelude::QuantileInterpolOptions;
//expose polars DateType in R
use crate::robj_to;
use crate::rpolarserr::{polars_to_rpolars_err, rerr, RResult, WithRctx};
use crate::utils::collect_hinted_result;
use crate::utils::wrappers::null_to_opt;
use std::result::Result;
Expand Down Expand Up @@ -497,6 +499,48 @@ pub fn new_categorical_ordering(s: &str) -> Result<pl::CategoricalOrdering, Stri
}
}

pub fn new_parquet_compression(
compression_method: Robj,
compression_level: Robj,
) -> RResult<pl::ParquetCompression> {
use pl::ParquetCompression::*;
match robj_to!(String, compression_method)?.as_str() {
"uncompressed" => Ok(Uncompressed),
"snappy" => Ok(Snappy),
"gzip" => robj_to!(Option, u8, compression_level)?
.map(polars::prelude::GzipLevel::try_new)
.transpose()
.map(Gzip),
"lzo" => Ok(Lzo),
"brotli" => robj_to!(Option, u32, compression_level)?
.map(polars::prelude::BrotliLevel::try_new)
.transpose()
.map(Brotli),
"zstd" => robj_to!(Option, i64, compression_level)?
.map(|cl| polars::prelude::ZstdLevel::try_new(cl as i32))
Sicheng-Pan marked this conversation as resolved.
Show resolved Hide resolved
.transpose()
.map(Zstd),
m => Err(polars::prelude::PolarsError::ComputeError(
format!("Failed to set parquet compression method as [{m}]").into(),
)),
}
.map_err(polars_to_rpolars_err)
.misvalued("should be one of ['uncompressed', 'snappy', 'gzip', 'brotli', 'zstd']")
}

pub fn new_ipc_compression(compression_method: Robj) -> RResult<Option<pl::IpcCompression>> {
use pl::IpcCompression::*;
robj_to!(Option, String, compression_method)?
.map(|cm| match cm.as_str() {
"lz4" => Ok(LZ4),
"zstd" => Ok(ZSTD),
m => rerr()
.bad_val(m)
.misvalued("should be one of ['lz4', 'zstd']"),
})
.transpose()
}

extendr_module! {
mod rdatatype;
impl RPolarsDataType;
Expand Down
20 changes: 10 additions & 10 deletions tests/testthat/test-ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ test_that("Test reading data from Apache Arrow IPC", {
# Collect data from Apache Arrow IPC
read_limit = 27
testthat::expect_equal(
pl$scan_arrow_ipc(tmpf)$collect()$to_data_frame(),
pl$scan_ipc(tmpf)$collect()$to_data_frame(),
iris
)
testthat::expect_equal(
pl$scan_arrow_ipc(tmpf, n_rows = read_limit)$collect()$to_data_frame(),
pl$scan_ipc(tmpf, n_rows = read_limit)$collect()$to_data_frame(),
droplevels(head(iris, read_limit))
)
testthat::expect_equal(
as.integer(pl$scan_arrow_ipc(
as.integer(pl$scan_ipc(
tmpf,
n_rows = read_limit,
row_count_name = "rc",
Expand All @@ -28,11 +28,11 @@ test_that("Test reading data from Apache Arrow IPC", {
)

# Test error handling
testthat::expect_error(pl$scan_arrow_ipc(0))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, n_rows = "?"))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, cache = 0L))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, rechunk = list()))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, row_count_name = c("x", "y")))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, row_count_name = "name", row_count_offset = data.frame()))
testthat::expect_error(pl$scan_arrow_ipc(tmpf, memmap = NULL))
testthat::expect_error(pl$scan_ipc(0))
testthat::expect_error(pl$scan_ipc(tmpf, n_rows = "?"))
testthat::expect_error(pl$scan_ipc(tmpf, cache = 0L))
testthat::expect_error(pl$scan_ipc(tmpf, rechunk = list()))
testthat::expect_error(pl$scan_ipc(tmpf, row_count_name = c("x", "y")))
testthat::expect_error(pl$scan_ipc(tmpf, row_count_name = "name", row_count_offset = data.frame()))
testthat::expect_error(pl$scan_ipc(tmpf, memmap = NULL))
})
Loading