Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: export/import DataFrame as raw vector #1072

Merged
merged 5 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ Collate:
'zzz.R'
Config/rextendr/version: 0.3.1
VignetteBuilder: knitr
Config/polars/LibVersion: 0.39.2
Config/polars/LibVersion: 0.39.3
Config/polars/RustToolchainVersion: nightly-2024-04-15
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Polars R Package (development version)

### New features

- `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072).
- New method `<DataFrame>$to_raw_ipc()` to serialize a DataFrame to a raw vector
of Apache Arrow IPC file format (#1072).

## Polars R Package 0.16.3

### New features
Expand Down
2 changes: 2 additions & 0 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,8 @@ DataFrame_write_csv = function(
#' This functionality is considered **unstable**.
#' It may be changed at any point without it being considered a breaking change.
#' @rdname IO_write_ipc
#' @seealso
#' - [`<DataFrame>$to_raw_ipc()`][DataFrame_to_raw_ipc]
#' @examples
#' dat = pl$DataFrame(mtcars)
#'
Expand Down
6 changes: 4 additions & 2 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ test_rbackgroundhandler <- function(lambda, arg) .Call(wrap__test_rbackgroundhan

test_rthreadhandle <- function() .Call(wrap__test_rthreadhandle)

test_serde_df <- function(df) .Call(wrap__test_serde_df, df)

internal_wrap_e <- function(robj, str_to_lit) .Call(wrap__internal_wrap_e, robj, str_to_lit)

create_col <- function(name) .Call(wrap__create_col, name)
Expand Down Expand Up @@ -230,6 +228,10 @@ RPolarsDataFrame$write_csv <- function(file, include_bom, include_header, separa

RPolarsDataFrame$write_ipc <- function(file, compression, future) .Call(wrap__RPolarsDataFrame__write_ipc, self, file, compression, future)

RPolarsDataFrame$to_raw_ipc <- function(compression, future) .Call(wrap__RPolarsDataFrame__to_raw_ipc, self, compression, future)

RPolarsDataFrame$from_raw_ipc <- function(bits, n_rows, row_name, row_index, memory_map) .Call(wrap__RPolarsDataFrame__from_raw_ipc, bits, n_rows, row_name, row_index, memory_map)

RPolarsDataFrame$write_parquet <- function(file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit) .Call(wrap__RPolarsDataFrame__write_parquet, self, file, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit)

RPolarsDataFrame$write_json <- function(file, pretty, row_oriented) .Call(wrap__RPolarsDataFrame__write_json, self, file, pretty, row_oriented)
Expand Down
62 changes: 57 additions & 5 deletions R/io_ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pl_scan_ipc = function(
#'
#' @inherit pl_read_csv return
#' @inheritParams pl_scan_ipc
#' @param source A single character or a raw vector of Apache Arrow IPC file.
#' You can use globbing with `*` to scan/read multiple files in the same directory
#' (see examples).
#' @rdname IO_read_ipc
#' @examplesIf requireNamespace("arrow", quietly = TRUE) && arrow::arrow_with_dataset()
#' temp_dir = tempfile()
Expand All @@ -73,6 +76,15 @@ pl_scan_ipc = function(
#' pl$read_ipc(
#' file.path(temp_dir, "**/*.arrow")
#' )
#'
#' # Read a raw vector
#' arrow::arrow_table(
#' foo = 1:5,
#' bar = 6:10,
#' ham = letters[1:5]
#' ) |>
#' arrow::write_to_raw(format = "file") |>
#' pl$read_ipc()
pl_read_ipc = function(
source,
...,
Expand All @@ -82,9 +94,49 @@ pl_read_ipc = function(
row_index_offset = 0L,
rechunk = FALSE,
cache = TRUE) {
.args = as.list(environment())
result({
do.call(pl$scan_ipc, .args)$collect()
}) |>
unwrap("in pl$read_ipc():")
uw = function(res) unwrap(res, "in pl$read_ipc():")

if (isTRUE(is.raw(source))) {
.pr$DataFrame$from_raw_ipc(
source,
n_rows,
row_index_name,
row_index_offset,
memory_map
) |>
uw()
} else {
.args = as.list(environment())
result(do.call(pl$scan_ipc, .args)$collect()) |>
uw()
}
}


#' Write Arrow IPC data to a raw vector
#'
#' @inheritParams DataFrame_write_ipc
#' @return A raw vector
#' @seealso
#' - [`<DataFrame>$write_ipc()`][DataFrame_write_ipc]
#' @examples
#' df = pl$DataFrame(
#' foo = 1:5,
#' bar = 6:10,
#' ham = letters[1:5]
#' )
#'
#' raw_ipc = df$to_raw_ipc()
#'
#' pl$read_ipc(raw_ipc)
#'
#' if (require("arrow", quietly = TRUE)) {
#' arrow::read_ipc_file(raw_ipc, as_data_frame = FALSE)
#' }
DataFrame_to_raw_ipc = function(
compression = c("uncompressed", "zstd", "lz4"),
...,
future = FALSE) {
.pr$DataFrame$to_raw_ipc(self, compression, future) |>
unwrap("in $to_raw_ipc():")
}
51 changes: 51 additions & 0 deletions man/DataFrame_to_raw_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions man/IO_read_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions man/IO_write_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "r-polars"
version = "0.39.2"
version = "0.39.3"
edition = "2021"
rust-version = "1.76.0"
publish = false
Expand Down
42 changes: 24 additions & 18 deletions src/rust/src/rbackground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use extendr_api::{
use flume::{bounded, Sender};
use ipc_channel::ipc;
use once_cell::sync::Lazy;
use polars::prelude::Series as PSeries;
use polars::prelude as pl;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::thread;
Expand Down Expand Up @@ -93,31 +93,45 @@ pub fn deserialize_robj(bits: Vec<u8>) -> RResult<Robj> {
.when("deserializing an R object")
}

pub fn serialize_dataframe(dataframe: &mut polars::prelude::DataFrame) -> RResult<Vec<u8>> {
pub fn serialize_dataframe(
dataframe: &mut polars::prelude::DataFrame,
compression: Option<pl::IpcCompression>,
future: bool,
) -> RResult<Vec<u8>> {
use polars::io::SerWriter;

let mut dump = Vec::new();
polars::io::ipc::IpcWriter::new(&mut dump)
.with_compression(compression)
.with_pl_flavor(future)
.finish(dataframe)
.map_err(polars_to_rpolars_err)?;
Ok(dump)
}

pub fn deserialize_dataframe(bits: &[u8]) -> RResult<polars::prelude::DataFrame> {
pub fn deserialize_dataframe(
bits: &[u8],
n_rows: Option<usize>,
row_index: Option<polars::io::RowIndex>,
memory_map: bool,
) -> RResult<polars::prelude::DataFrame> {
use polars::io::SerReader;

polars::io::ipc::IpcReader::new(std::io::Cursor::new(bits))
.with_n_rows(n_rows)
.with_row_index(row_index)
.memory_mapped(memory_map)
.finish()
.map_err(polars_to_rpolars_err)
}

pub fn serialize_series(series: PSeries) -> RResult<Vec<u8>> {
serialize_dataframe(&mut std::iter::once(series).collect())
pub fn serialize_series(series: pl::Series) -> RResult<Vec<u8>> {
serialize_dataframe(&mut std::iter::once(series).collect(), None, true)
}

pub fn deserialize_series(bits: &[u8]) -> RResult<PSeries> {
let tn = std::any::type_name::<PSeries>();
deserialize_dataframe(bits)?
pub fn deserialize_series(bits: &[u8]) -> RResult<pl::Series> {
let tn = std::any::type_name::<pl::Series>();
deserialize_dataframe(bits, None, None, true)?
.get_columns()
.split_first()
.ok_or(RPolarsErr::new())
Expand Down Expand Up @@ -480,8 +494,8 @@ impl RBackgroundPool {
pub fn rmap_series(
&self,
raw_func: Vec<u8>,
series: PSeries,
) -> RResult<impl FnOnce() -> RResult<PSeries> + '_> {
series: pl::Series,
) -> RResult<impl FnOnce() -> RResult<pl::Series> + '_> {
#[cfg(feature = "rpolars_debug_print")]
dbg!("rmap_series");
let handler = self.lease()?;
Expand Down Expand Up @@ -579,13 +593,6 @@ pub fn test_rthreadhandle() -> RPolarsRThreadHandle<RResult<RPolarsDataFrame>> {
})
}

#[extendr]
pub fn test_serde_df(df: &RPolarsDataFrame) -> RResult<RPolarsDataFrame> {
let x = serialize_dataframe(&mut df.0.clone())?;
let df2 = deserialize_dataframe(x.as_slice())?;
Ok(RPolarsDataFrame(df2))
}

extendr_module! {
mod rbackground;
impl RPolarsRThreadHandle<RResult<RPolarsDataFrame>>;
Expand All @@ -595,5 +602,4 @@ extendr_module! {
fn handle_background_request;
fn test_rbackgroundhandler;
fn test_rthreadhandle;
fn test_serde_df;
}
27 changes: 27 additions & 0 deletions src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,33 @@ impl RPolarsDataFrame {
.map_err(polars_to_rpolars_err)
}

pub fn to_raw_ipc(&self, compression: Robj, future: Robj) -> RResult<Vec<u8>> {
let compression = rdatatype::new_ipc_compression(compression)?;
let future = robj_to!(bool, future)?;

crate::rbackground::serialize_dataframe(&mut self.0.clone(), compression, future)
}

pub fn from_raw_ipc(
bits: Robj,
n_rows: Robj,
row_name: Robj,
row_index: Robj,
memory_map: Robj,
) -> RResult<Self> {
let bits = robj_to!(Raw, bits)?;
let n_rows = robj_to!(Option, usize, n_rows)?;
let row_index = robj_to!(Option, String, row_name)?
.map(|name| {
robj_to!(u32, row_index).map(|offset| polars::io::RowIndex { name, offset })
})
.transpose()?;
let memory_map = robj_to!(bool, memory_map)?;
let df = crate::rbackground::deserialize_dataframe(&bits, n_rows, row_index, memory_map)?;

Ok(RPolarsDataFrame(df))
}

pub fn write_parquet(
&self,
file: Robj,
Expand Down
6 changes: 3 additions & 3 deletions src/rust/src/rdataframe/read_ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::lazy::dataframe::RPolarsLazyFrame as RLazyFrame;
use crate::lazy::dataframe::RPolarsLazyFrame;
use crate::robj_to;
use crate::rpolarserr::RResult;
use extendr_api::prelude::*;
Expand All @@ -14,7 +14,7 @@ pub fn import_arrow_ipc(
row_name: Robj,
row_index: Robj,
memory_map: Robj,
) -> RResult<RLazyFrame> {
) -> RResult<RPolarsLazyFrame> {
let args = ScanArgsIpc {
n_rows: robj_to!(Option, usize, n_rows)?,
cache: robj_to!(bool, cache)?,
Expand All @@ -27,7 +27,7 @@ pub fn import_arrow_ipc(
};
let lf = LazyFrame::scan_ipc(robj_to!(String, path)?, args)
.map_err(crate::rpolarserr::polars_to_rpolars_err)?;
Ok(RLazyFrame(lf))
Ok(RPolarsLazyFrame(lf))
}

extendr_module! {
Expand Down
Loading
Loading