Skip to content

Commit

Permalink
support extendr_polars (#326)
Browse files Browse the repository at this point in the history
  • Loading branch information
sorhawell committed Sep 2, 2023
1 parent 21524b0 commit cc124b3
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 51 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ features. Unrelated breaking changes and new features are put in separate sectio
- Added an S3 generic `as_polars_series()` where users or developers of extensions
can define a custom way to convert their format to Polars format. This generic
must return a Polars series. See #368 for an example (#369).
- Private API Support for Arrow Stream import/export of DataFrame between two R packages that uses
rust-polars. [See R package example here](https://github.com/rpolars/extendrpolarsexamples)
(#326).

# polars 0.7.0

Expand Down
10 changes: 8 additions & 2 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch

rb_list_to_df <- function(r_batches, names) .Call(wrap__rb_list_to_df, r_batches, names)

arrow_stream_to_rust <- function(rbr) invisible(.Call(wrap__arrow_stream_to_rust, rbr))

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

new_arrow_stream <- function() .Call(wrap__new_arrow_stream)

arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str)

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down
88 changes: 61 additions & 27 deletions src/rust/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::rpolarserr::*;
use extendr_api::prelude::*;
use polars::prelude as pl;
use polars_core::export::rayon::prelude::*;
Expand Down Expand Up @@ -39,33 +40,6 @@ unsafe fn wrap_make_external_ptr<T>(t: &mut T) -> Robj {
//use extendr_api::{Integers, Rinternals};
unsafe { <Integers>::make_external_ptr(t, r!(extendr_api::NULL)) }
}
//does not support chunked array
pub fn arrow_array_stream_to_rust(
arrow_stream_reader: Robj,
opt_f: Option<&Function>,
) -> Result<ArrayRef, String> {
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
//let mut schema = Box::new(ffi::ArrowSchema::empty());
let ext_stream = unsafe { wrap_make_external_ptr(&mut *stream) };

if let Some(f) = opt_f {
f.call(pairlist!(arrow_stream_reader, ext_stream))?;
} else {
call!(r"\(x,y) x$export_to_c(y)", arrow_stream_reader, ext_stream)?;
};
dbg!("after export");

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream) }.map_err(|err| err.to_string())?;
dbg!("after reader");

while let Some(array_res) = unsafe { iter.next() } {
let array = array_res.map_err(|err| err.to_string())?;
dbg!(&array);
}

todo!("not more for now");
}

pub fn rb_to_rust_df(r_rb_columns: List, names: &[String]) -> Result<pl::DataFrame, String> {
let n_col = r_rb_columns.len();
Expand Down Expand Up @@ -160,3 +134,63 @@ pub fn to_rust_df(rb: Robj) -> Result<pl::DataFrame, String> {
let dfs = crate::utils::collect_hinted_result(rb_len, dfs_iter)?;
Ok(accumulate_dataframes_vertical_unchecked(dfs))
}

// r-polars as consumer 1: create a new stream and wrap pointer in Robj as str.
pub fn new_arrow_stream_internal() -> Robj {
let aas = Box::new(ffi::ArrowArrayStream::empty());
let x = Box::leak(aas); // leak box to make lifetime static
let x = x as *mut ffi::ArrowArrayStream;
crate::utils::usize_to_robj_str(x as usize)
}

// r-polars as consumer 2: recieve to pointer to own stream, which producer has exported to. Consume it. Return Series.
pub fn arrow_stream_to_series_internal(robj_str: Robj) -> RResult<pl::Series> {
// reclaim ownership of leaked box, and then drop/release it when consumed.
let us = crate::utils::robj_str_ptr_to_usize(&robj_str)?;
let boxed_stream = unsafe { Box::from_raw(us as *mut ffi::ArrowArrayStream) };

//consume stream and produce a r-polars Series return as Robj
let s = consume_arrow_stream_to_series(boxed_stream)?;
Ok(s)
}

// implementation of consuming stream to Series. Stream is drop/released hereafter.
fn consume_arrow_stream_to_series(boxed_stream: Box<ffi::ArrowArrayStream>) -> RResult<pl::Series> {
let mut iter = unsafe { ffi::ArrowArrayStreamReader::try_new(boxed_stream) }?;

//import first array into pl::Series
let mut s = if let Some(array_res) = unsafe { iter.next() } {
let array = array_res?;
let series_res: pl::PolarsResult<pl::Series> =
std::convert::TryFrom::try_from(("df", array));
let series = series_res.map_err(polars_to_rpolars_err)?;
series
} else {
rerr()
.plain("Arrow array stream was empty")
.hint("producer did not export to stream")
.when("consuming arrow array stream")?;
unreachable!();
};

// append any other arrays to Series
while let Some(array_res) = unsafe { iter.next() } {
let array = array_res?;
let series_res: pl::PolarsResult<pl::Series> =
std::convert::TryFrom::try_from(("df", array));
let series = series_res.map_err(polars_to_rpolars_err)?;
s.append(&series).map_err(polars_to_rpolars_err)?;
}
Ok(s)
}

pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> {
let stream_ptr =
crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream;
let schema = df.schema().to_arrow();
let data_type = pl::ArrowDataType::Struct(schema.fields);
let field = pl::ArrowField::new("", data_type, false);
let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df));
unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) };
Ok(())
}
17 changes: 0 additions & 17 deletions src/rust/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1987,23 +1987,6 @@ impl Expr {
let infer_schema_len = robj_to!(Option, usize, infer_schema_len)?;
Ok(self
.0
// =======
// pub fn str_json_extract(&self, dtype: Nullable<&RPolarsDataType>) -> Self {
// let dtype = null_to_opt(dtype).map(|dt| dt.0.clone());
// use pl::*;
// let output_type = match dtype.clone() {
// Some(dtype) => pl::GetOutput::from_type(dtype),
// None => pl::GetOutput::from_type(DataType::Unknown),
// };
// let function = move |s: Series| {
// let ca = s.utf8()?;
// match ca.json_extract(dtype.clone()) {
// Ok(ca) => Ok(Some(ca.into_series())),
// Err(e) => Err(PolarsError::ComputeError(format!("{e:?}").into())),
// }
// };
// self.0
// >>>>>>> origin/main
.clone()
.str()
.json_extract(dtype, infer_schema_len)
Expand Down
2 changes: 1 addition & 1 deletion src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct OwnedDataFrameIterator {
}

impl OwnedDataFrameIterator {
fn new(df: polars::frame::DataFrame) -> Self {
pub fn new(df: polars::frame::DataFrame) -> Self {
let schema = df.schema().to_arrow();
let data_type = DataType::Struct(schema.fields);
let vs = df.get_columns().to_vec();
Expand Down
43 changes: 39 additions & 4 deletions src/rust/src/rlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::rdataframe::DataFrame;
use crate::robj_to;

use crate::rpolarserr::{rdbg, RResult};
use crate::series::Series;
use crate::{rdataframe::VecDataFrame, utils::r_result_list};
use extendr_api::prelude::*;
use polars::prelude as pl;
Expand Down Expand Up @@ -154,9 +155,37 @@ fn struct_(exprs: Robj, eager: Robj, schema: Robj) -> Result<Robj, String> {
}

#[extendr]
fn arrow_stream_to_rust(rbr: Robj) {
let x = crate::arrow_interop::to_rust::arrow_array_stream_to_rust(rbr, None).unwrap();
dbg!(x);
fn new_arrow_stream() -> Robj {
crate::arrow_interop::to_rust::new_arrow_stream_internal()
}
use crate::rpolarserr::*;
#[extendr]
fn arrow_stream_to_df(robj_str: Robj) -> RResult<Robj> {
let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?;
let ca = s
.struct_()
.map_err(polars_to_rpolars_err)
.when("unpack struct from producer")
.hint("producer exported a plain Series not a Struct series")?;
let df: pl::DataFrame = ca.clone().into();
Ok(DataFrame(df).into_robj())
}

#[extendr]
fn arrow_stream_to_series(robj_str: Robj) -> RResult<Robj> {
let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?;
Ok(Series(s).into_robj())
}

#[extendr]
unsafe fn export_df_to_arrow_stream(robj_df: Robj, robj_str: Robj) -> RResult<Robj> {
let res: ExternalPtr<DataFrame> = robj_df.try_into()?;
let pl_df = DataFrame(res.0.clone()).0;
//safety robj_str must be ptr to a arrow2 stream ready to export into
unsafe {
crate::arrow_interop::to_rust::export_df_as_stream(pl_df, &robj_str)?;
}
Ok(robj_str)
}

#[extendr]
Expand Down Expand Up @@ -256,9 +285,15 @@ extendr_module! {
//fn series_from_arrow;
//fn rb_to_df;
fn rb_list_to_df;
fn arrow_stream_to_rust;

fn dtype_str_repr;

// arrow conversions
fn new_arrow_stream;
fn arrow_stream_to_df;
fn arrow_stream_to_series;
fn export_df_to_arrow_stream;

//robj meta
fn mem_address;
fn clone_robj;
Expand Down
16 changes: 16 additions & 0 deletions src/rust/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,3 +1006,19 @@ pub fn collect_hinted_result_rerr<T>(
}
Ok(new_vec)
}

//keep error simple to interface with other libs
pub fn robj_str_ptr_to_usize(robj: &Robj) -> RResult<usize> {
|| -> RResult<usize> {
let str: &str = robj
.as_str()
.ok_or(RPolarsErr::new().plain("robj str ptr not a str".into()))?;
let us: usize = str.parse()?;
Ok(us)
}()
.when("converting robj str pointer to usize")
}

pub fn usize_to_robj_str(us: usize) -> Robj {
format!("{us}").into()
}
71 changes: 71 additions & 0 deletions tests/testthat/test-arrow_extendr_polars.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
test_that("rust-polars DataFrame import/export via arrow stream", {
# this round trip conversion is only a unit test, not an integration test.
# Arrow export/import of DataFrame is mainly useful to interface with other R packages using
# rust-polars

# see https://github.com/rpolars/extendrpolarsexamples/blob/main/src/rust/src/lib.rs
# for simple example of use to import/export polars DataFrames to another rust-polars
# compilation unit in another R package. Version of rust-polars does not have to match.

# These function are not a part of the public user API. But package developer can use them to
# import/export df's.

# ARROW STREAM HAS AN CONTRACT TO UPHOLD BY PRODUCER AND CONSUMER. WRONG BEHAVOIR CAUSES SEGFAULT.
# SEE OUTCOMMENTED EXAMPLES OF ILLEGAL BEHAVIOR LEADING TO SEGFAULT BELOW.

# PRODUCER has some df which could be chunked as here. Categoricals with global string cache
# are also ok.
pl$with_string_cache({
df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris)))
})

# CONSUMER creates a new arrow stream and return ptr which is passed to PRODUCER
str_ptr = new_arrow_stream()

# PRODUCER exports the df into CONSUMERs stream
export_df_to_arrow_stream(df_export, str_ptr) |> unwrap()

# CONSUMER can now import the df from stream
pl$with_string_cache({
df_import = arrow_stream_to_df(str_ptr) |> unwrap()
})

# check imported/exported df's are identical
expect_identical(df_import$to_list(), df_export$to_list())

##UNSAFE / Undefined behavior / will blow up eventually / STUFF NOT TO DO
# examples below of did segfault ~every 5-10th time, during development

# 1: DO NOT EXPORT TO STREAM MORE THAN ONCE
# new DataFrame can be exported to stream, but only the latest # BUT THIS SEGFAULTs sometimes
# export_df_to_arrow_stream(df_export, str_ptr) |> unwrap()
# export_df_to_arrow_stream(pl$DataFrame(mtcars), str_ptr) |> unwrap()
# mtcars_import = arrow_stream_to_df(str_ptr) |> unwrap()

# 2: DO NOT IMPORT FROM STREAM MORE THAN ONCE
# reading from released(exhuasted) stream results in error most times
# BUT THIS SEGFAULTs sometimes
#ctx = arrow_stream_to_df(str_ptr)$err$contexts()
#expect_equal(
# ctx$PlainErrorMessage,
# r"{InvalidArgumentError("The C stream was already released")}"
# )

# 3: DO NOT IMPORT/EXPORT ARROW STREAM ACROSS PROCESSES (use IPC for that, see <Expr>$map() docs)
# background process willSEGFAULT HERE
# str_ptr = new_arrow_stream()
# rsess = callr::r_bg(func = \(str_ptr) {
# library(polars)
# pl$with_string_cache({
# df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris)))
# })
# polars:::export_df_to_arrow_stream(df_export, str_ptr)
# },args = list(str_ptr=str_ptr))
#
# Sys.sleep(3)
# df_import = arrow_stream_to_df(str_ptr)
# print(df_import)
# str_ptr = new_arrow_stream()
# rsess$get_result()

})

0 comments on commit cc124b3

Please sign in to comment.