Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: <DataFrame>$write_ipc() and update the argument name of <LazyFrame>$scan_ipc() #1032

Merged
merged 10 commits into from
Apr 13, 2024
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@
- In `pl$Series()`, the argument `x` is renamed `values` (#933).
- In `<DataFrame>$write_*` functions, the first argument is now `file` (#935).
- In `<LazyFrame>$sink_*` functions, the first argument is now `path` (#935).
- In `<LazyFrame>$sink_ipc()`, the argument `memmap` is renamed to `memory_map` (#1032).
- In `<DataFrame>$rolling()`, `<LazyFrame>$rolling()`, `<DataFrame>$group_by_dynamic()`
and `<LazyFrame>$group_by_dynamic()`, the `by` argument is renamed to
`group_by` (#983).
- In `$dt$convert_time_zone()` and `$dt$replace_time_zone()`, the `tz`
argument is renamed to `time_zone` (#944).
- In `$str$strptime()`, the argument `datatype` is renamed `dtype` (#939).
- In `$str$strptime()`, the argument `datatype` is renamed to `dtype` (#939).

2. Change in the way arguments are passed:

Expand Down Expand Up @@ -169,6 +170,7 @@
- `$arr$to_list()` (#1018).
- `$str$extract_groups()` (#979).
- `$str$find()` (#985).
- `<DataFrame>$write_ipc` (#1032).
etiennebacher marked this conversation as resolved.
Show resolved Hide resolved

- New arguments or argument values:

Expand Down
73 changes: 56 additions & 17 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1936,7 +1936,7 @@ DataFrame_transpose = function(
#' invalid CSV data (e.g. by not quoting strings containing the separator).
#'
#' @return
#' This doesn't return anything but creates a CSV file.
#' This doesn't return anything.
#'
#' @rdname IO_write_csv
#'
Expand Down Expand Up @@ -1968,13 +1968,54 @@ DataFrame_write_csv = function(
batch_size, datetime_format, date_format, time_format, float_precision,
null_values, quote_style
) |>
unwrap("in $write_csv():") |>
invisible()
unwrap("in $write_csv():")

invisible(NULL)
}


#' Write to Arrow IPC file (a.k.a Feather file)
#'
#' @inherit DataFrame_write_csv params return
#' @inheritParams LazyFrame_sink_ipc
#' @param future Setting this to `TRUE` will write Polars' internal data structures that
#' might not be available by other Arrow implementations.
#' This functionality is considered **unstable**.
#' It may be changed at any point without it being considered a breaking change.
#' @rdname IO_write_ipc
#' @examples
#' dat = pl$DataFrame(mtcars)
#'
#' destination = tempfile(fileext = ".arrow")
#' dat$write_ipc(destination)
#'
#' if (require("arrow", quietly = TRUE)) {
#' arrow::read_ipc_file(destination, as_data_frame = FALSE)
#' }
DataFrame_write_ipc = function(
file,
compression = c("uncompressed", "zstd", "lz4"),
...,
future = FALSE) {
if (isTRUE(future)) {
warning("The `future` parameter of `$write_ipc()` is considered unstable.")
}

.pr$DataFrame$write_ipc(
self,
file,
compression %||% "uncompressed",
future
) |>
unwrap("in $write_ipc():")

invisible(NULL)
}


#' Write to parquet file
#' @inherit DataFrame_write_csv params return
#' @inheritParams LazyFrame_sink_parquet
#' @inheritParams DataFrame_write_csv
#'
#' @rdname IO_write_parquet
#'
Expand All @@ -2001,20 +2042,18 @@ DataFrame_write_parquet = function(
row_group_size,
data_pagesize_limit
) |>
unwrap("in $write_parquet():") |>
invisible()
unwrap("in $write_parquet():")

invisible(NULL)
}

#' Write to JSON file
#'
#' @inheritParams DataFrame_write_csv
#' @inherit DataFrame_write_csv params return
#' @param pretty Pretty serialize JSON.
#' @param row_oriented Write to row-oriented JSON. This is slower, but more
#' common.
#'
#' @return
#' This doesn't return anything.
#'
#' @rdname IO_write_json
#'
#' @examples
Expand All @@ -2034,17 +2073,16 @@ DataFrame_write_json = function(
pretty = FALSE,
row_oriented = FALSE) {
.pr$DataFrame$write_json(self, file, pretty, row_oriented) |>
unwrap("in $write_json():") |>
invisible()
unwrap("in $write_json():")

invisible(NULL)
}

#' Write to NDJSON file
#'
#' @inherit DataFrame_write_csv return
#' @inheritParams DataFrame_write_json
#'
#' @return
#' This doesn't return anything.
#'
#' @rdname IO_write_ndjson
#'
#' @examples
Expand All @@ -2056,8 +2094,9 @@ DataFrame_write_json = function(
#' pl$read_ndjson(destination)
DataFrame_write_ndjson = function(file) {
.pr$DataFrame$write_ndjson(self, file) |>
unwrap("in $write_ndjson():") |>
invisible()
unwrap("in $write_ndjson():")

invisible(NULL)
}

#' @inherit LazyFrame_rolling title description params details
Expand Down
4 changes: 3 additions & 1 deletion R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ RPolarsDataFrame$clear <- function() .Call(wrap__RPolarsDataFrame__clear, self)

RPolarsDataFrame$write_csv <- function(file, include_bom, include_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_value, quote_style) .Call(wrap__RPolarsDataFrame__write_csv, self, file, include_bom, include_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_value, quote_style)

RPolarsDataFrame$write_ipc <- function(file, compression, future) .Call(wrap__RPolarsDataFrame__write_ipc, self, file, compression, future)

RPolarsDataFrame$write_parquet <- function(file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit) .Call(wrap__RPolarsDataFrame__write_parquet, self, file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit)

RPolarsDataFrame$write_json <- function(file, pretty, row_oriented) .Call(wrap__RPolarsDataFrame__write_json, self, file, pretty, row_oriented)
Expand Down Expand Up @@ -1108,7 +1110,7 @@ RPolarsLazyFrame$collect_in_background <- function() .Call(wrap__RPolarsLazyFram

RPolarsLazyFrame$sink_parquet <- function(path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_parquet, self, path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order)

RPolarsLazyFrame$sink_ipc <- function(path, compression_method, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_ipc, self, path, compression_method, maintain_order)
RPolarsLazyFrame$sink_ipc <- function(path, compression, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_ipc, self, path, compression, maintain_order)

RPolarsLazyFrame$sink_csv <- function(path, include_bom, include_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_value, quote_style, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_csv, self, path, include_bom, include_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_value, quote_style, maintain_order)

Expand Down
10 changes: 6 additions & 4 deletions R/io_ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#' @keywords LazyFrame_new
#'
#' @inheritParams pl_scan_csv
#' @param memmap bool, mapped memory
#' @param memory_map A logical. If `TRUE`, try to memory map the file.
#' This can greatly improve performance on repeated queries as the OS may cache pages.
#' Only uncompressed Arrow IPC files can be memory mapped.
#'
#' @return LazyFrame
#' @rdname IO_scan_ipc
Expand All @@ -13,18 +15,18 @@ pl_scan_ipc = function(
...,
n_rows = NULL,
cache = TRUE,
rechunk = TRUE,
rechunk = FALSE,
row_index_name = NULL,
row_index_offset = 0L,
memmap = TRUE) {
memory_map = TRUE) {
import_arrow_ipc(
source,
n_rows,
cache,
rechunk,
row_index_name,
row_index_offset,
memmap
memory_map
) |>
unwrap("in pl$scan_ipc:")
}
13 changes: 7 additions & 6 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -660,13 +660,14 @@ LazyFrame_sink_parquet = function(
}


#' @title Stream the output of a query to an Arrow IPC file
#' @description
#' Stream the output of a query to an Arrow IPC file
#'
#' This writes the output of a query directly to an Arrow IPC file without collecting
#' it in the R session first. This is useful if the output of the query is still
#' larger than RAM as it would crash the R session if it was collected into R.
#' @param compression `NULL` or string, the compression method. One of `NULL`,
#' "lz4" or "zstd". Choose "zstd" for good compression performance. Choose "lz4"
#' @param compression `NULL` or a character of the compression method,
#' `"uncompressed"` or "lz4" or "zstd". `NULL` is equivalent to `"uncompressed"`.
#' Choose "zstd" for good compression performance. Choose "lz4"
#' for fast compression/decompression.
#' @inheritParams LazyFrame_sink_parquet
#' @inheritParams LazyFrame_collect
Expand All @@ -689,7 +690,7 @@ LazyFrame_sink_parquet = function(
LazyFrame_sink_ipc = function(
path,
...,
compression = "zstd",
compression = c("zstd", "lz4", "uncompressed"),
maintain_order = TRUE,
type_coercion = TRUE,
predicate_pushdown = TRUE,
Expand Down Expand Up @@ -722,7 +723,7 @@ LazyFrame_sink_ipc = function(
lf |>
.pr$LazyFrame$sink_ipc(
path,
compression,
compression %||% "uncompressed",
maintain_order
) |>
unwrap("in $sink_ipc()") |>
Expand Down
8 changes: 5 additions & 3 deletions man/IO_scan_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions man/IO_sink_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/IO_write_csv.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions man/IO_write_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions man/IO_write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ impl RPolarsLazyFrame {
.map_err(polars_to_rpolars_err)
}

fn sink_ipc(&self, path: Robj, compression_method: Robj, maintain_order: Robj) -> RResult<()> {
fn sink_ipc(&self, path: Robj, compression: Robj, maintain_order: Robj) -> RResult<()> {
let ipcwo = polars::prelude::IpcWriterOptions {
compression: new_ipc_compression(compression_method)?,
compression: new_ipc_compression(compression)?,
maintain_order: robj_to!(bool, maintain_order)?,
};
self.0
Expand Down
9 changes: 9 additions & 0 deletions src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,15 @@ impl RPolarsDataFrame {
.map_err(polars_to_rpolars_err)
}

pub fn write_ipc(&self, file: Robj, compression: Robj, future: Robj) -> RResult<()> {
let file = std::fs::File::create(robj_to!(str, file)?)?;
pl::IpcWriter::new(file)
.with_compression(rdatatype::new_ipc_compression(compression)?)
.with_pl_flavor(robj_to!(bool, future)?)
.finish(&mut self.0.clone())
.map_err(polars_to_rpolars_err)
}

pub fn write_parquet(
&self,
file: Robj,
Expand Down
18 changes: 9 additions & 9 deletions src/rust/src/rdatatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,15 +468,15 @@ pub fn new_parquet_compression(
.misvalued("must be one of 'uncompressed', 'snappy', 'gzip', 'brotli', 'zstd'")
}

pub fn new_ipc_compression(compression_method: Robj) -> RResult<Option<pl::IpcCompression>> {
use pl::IpcCompression::*;
robj_to!(Option, String, compression_method)?
.map(|cm| match cm.as_str() {
"lz4" => Ok(LZ4),
"zstd" => Ok(ZSTD),
m => rerr().bad_val(m).misvalued("must be one of 'lz4', 'zstd'"),
})
.transpose()
pub fn new_ipc_compression(robj: Robj) -> RResult<Option<pl::IpcCompression>> {
match robj_to_rchoice(robj)?.as_str() {
"uncompressed" => Ok(None),
"lz4" => Ok(Some(pl::IpcCompression::LZ4)),
"zstd" => Ok(Some(pl::IpcCompression::ZSTD)),
s => rerr().bad_val(format!(
"IpcCompression choice ('{s}') must be one of 'uncompressed', 'lz4', 'zstd'"
)),
}
}

pub fn new_rolling_cov_options(
Expand Down
Loading
Loading