Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support extendr_polars #326

Merged
merged 14 commits into from
Sep 2, 2023
10 changes: 8 additions & 2 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch

rb_list_to_df <- function(r_batches, names) .Call(wrap__rb_list_to_df, r_batches, names)

arrow_stream_to_rust <- function(rbr) invisible(.Call(wrap__arrow_stream_to_rust, rbr))

dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype)

new_arrow_stream <- function() .Call(wrap__new_arrow_stream)

arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str)

arrow_stream_to_s <- function(robj_str) .Call(wrap__arrow_stream_to_s, robj_str)

export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down
88 changes: 61 additions & 27 deletions src/rust/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::rpolarserr::*;
use extendr_api::prelude::*;
use polars::prelude as pl;
use polars_core::export::rayon::prelude::*;
Expand Down Expand Up @@ -39,33 +40,6 @@ unsafe fn wrap_make_external_ptr<T>(t: &mut T) -> Robj {
//use extendr_api::{Integers, Rinternals};
unsafe { <Integers>::make_external_ptr(t, r!(extendr_api::NULL)) }
}
//does not support chunked array
pub fn arrow_array_stream_to_rust(
arrow_stream_reader: Robj,
opt_f: Option<&Function>,
) -> Result<ArrayRef, String> {
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
//let mut schema = Box::new(ffi::ArrowSchema::empty());
let ext_stream = unsafe { wrap_make_external_ptr(&mut *stream) };

if let Some(f) = opt_f {
f.call(pairlist!(arrow_stream_reader, ext_stream))?;
} else {
call!(r"\(x,y) x$export_to_c(y)", arrow_stream_reader, ext_stream)?;
};
dbg!("after export");

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream) }.map_err(|err| err.to_string())?;
dbg!("after reader");

while let Some(array_res) = unsafe { iter.next() } {
let array = array_res.map_err(|err| err.to_string())?;
dbg!(&array);
}

todo!("not more for now");
}

pub fn rb_to_rust_df(r_rb_columns: List, names: &[String]) -> Result<pl::DataFrame, String> {
let n_col = r_rb_columns.len();
Expand Down Expand Up @@ -160,3 +134,63 @@ pub fn to_rust_df(rb: Robj) -> Result<pl::DataFrame, String> {
let dfs = crate::utils::collect_hinted_result(rb_len, dfs_iter)?;
Ok(accumulate_dataframes_vertical_unchecked(dfs))
}

// r-polars as consumer 1: create a new stream and wrap pointer in Robj as str.
pub fn new_arrow_stream_internal() -> Robj {
let aas = Box::new(ffi::ArrowArrayStream::empty());
let x = Box::leak(aas); // leak box to make lifetime static
let x = x as *mut ffi::ArrowArrayStream;
crate::utils::usize_to_robj_str(x as usize)
}

// r-polars as consumer 2: recieve to pointer to own stream, which producer has exported to. Consume it. Return Series.
pub fn arrow_stream_to_s_internal(robj_str: Robj) -> RResult<pl::Series> {
// reclaim ownership of leaked box, and then drop/release it when consumed.
let us = crate::utils::robj_str_ptr_to_usize(&robj_str)?;
let boxed_stream = unsafe { Box::from_raw(us as *mut ffi::ArrowArrayStream) };

//consume stream and produce a r-polars Series return as Robj
let s = consume_arrow_stream_to_s(boxed_stream)?;
Ok(s)
}

// implementation of consuming stream to Series. Stream is drop/released hereafter.
fn consume_arrow_stream_to_s(boxed_stream: Box<ffi::ArrowArrayStream>) -> RResult<pl::Series> {
sorhawell marked this conversation as resolved.
Show resolved Hide resolved
let mut iter = unsafe { ffi::ArrowArrayStreamReader::try_new(boxed_stream) }?;

//import first array into pl::Series
let mut s = if let Some(array_res) = unsafe { iter.next() } {
let array = array_res?;
let series_res: pl::PolarsResult<pl::Series> =
std::convert::TryFrom::try_from(("df", array));
let series = series_res.map_err(polars_to_rpolars_err)?;
series
} else {
rerr()
.plain("Arrow array stream was empty")
.hint("producer did not export to stream")
.when("consuming arrow array stream")?;
unreachable!();
};

// append any other arrays to Series
while let Some(array_res) = unsafe { iter.next() } {
let array = array_res?;
let series_res: pl::PolarsResult<pl::Series> =
std::convert::TryFrom::try_from(("df", array));
let series = series_res.map_err(polars_to_rpolars_err)?;
s.append(&series).map_err(polars_to_rpolars_err)?;
}
Ok(s)
}

pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> {
let stream_ptr =
crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream;
let schema = df.schema().to_arrow();
let data_type = pl::ArrowDataType::Struct(schema.fields);
let field = pl::ArrowField::new("", data_type, false);
let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df));
unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) };
Ok(())
}
17 changes: 0 additions & 17 deletions src/rust/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1987,23 +1987,6 @@ impl Expr {
let infer_schema_len = robj_to!(Option, usize, infer_schema_len)?;
Ok(self
.0
// =======
// pub fn str_json_extract(&self, dtype: Nullable<&RPolarsDataType>) -> Self {
// let dtype = null_to_opt(dtype).map(|dt| dt.0.clone());
// use pl::*;
// let output_type = match dtype.clone() {
// Some(dtype) => pl::GetOutput::from_type(dtype),
// None => pl::GetOutput::from_type(DataType::Unknown),
// };
// let function = move |s: Series| {
// let ca = s.utf8()?;
// match ca.json_extract(dtype.clone()) {
// Ok(ca) => Ok(Some(ca.into_series())),
// Err(e) => Err(PolarsError::ComputeError(format!("{e:?}").into())),
// }
// };
// self.0
// >>>>>>> origin/main
.clone()
.str()
.json_extract(dtype, infer_schema_len)
Expand Down
2 changes: 1 addition & 1 deletion src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct OwnedDataFrameIterator {
}

impl OwnedDataFrameIterator {
fn new(df: polars::frame::DataFrame) -> Self {
pub fn new(df: polars::frame::DataFrame) -> Self {
let schema = df.schema().to_arrow();
let data_type = DataType::Struct(schema.fields);
let vs = df.get_columns().to_vec();
Expand Down
43 changes: 39 additions & 4 deletions src/rust/src/rlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::rdataframe::DataFrame;
use crate::robj_to;

use crate::rpolarserr::{rdbg, RResult};
use crate::series::Series;
use crate::{rdataframe::VecDataFrame, utils::r_result_list};
use extendr_api::prelude::*;
use polars::prelude as pl;
Expand Down Expand Up @@ -154,9 +155,37 @@ fn struct_(exprs: Robj, eager: Robj, schema: Robj) -> Result<Robj, String> {
}

#[extendr]
fn arrow_stream_to_rust(rbr: Robj) {
let x = crate::arrow_interop::to_rust::arrow_array_stream_to_rust(rbr, None).unwrap();
dbg!(x);
fn new_arrow_stream() -> Robj {
crate::arrow_interop::to_rust::new_arrow_stream_internal()
}
use crate::rpolarserr::*;
#[extendr]
fn arrow_stream_to_df(robj_str: Robj) -> RResult<Robj> {
let s = crate::arrow_interop::to_rust::arrow_stream_to_s_internal(robj_str)?;
let ca = s
.struct_()
.map_err(polars_to_rpolars_err)
.when("unpack struct from producer")
.hint("producer exported a plain Series not a Struct series")?;
let df: pl::DataFrame = ca.clone().into();
Ok(DataFrame(df).into_robj())
}

#[extendr]
fn arrow_stream_to_s(robj_str: Robj) -> RResult<Robj> {
let s = crate::arrow_interop::to_rust::arrow_stream_to_s_internal(robj_str)?;
Ok(Series(s).into_robj())
}

#[extendr]
unsafe fn export_df_to_arrow_stream(robj_df: Robj, robj_str: Robj) -> RResult<Robj> {
let res: ExternalPtr<DataFrame> = robj_df.try_into()?;
let pl_df = DataFrame(res.0.clone()).0;
//safety robj_str must be ptr to a arrow2 stream ready to export into
unsafe {
crate::arrow_interop::to_rust::export_df_as_stream(pl_df, &robj_str)?;
}
Ok(robj_str)
}

#[extendr]
Expand Down Expand Up @@ -256,9 +285,15 @@ extendr_module! {
//fn series_from_arrow;
//fn rb_to_df;
fn rb_list_to_df;
fn arrow_stream_to_rust;

fn dtype_str_repr;

// arrow conversions
fn new_arrow_stream;
fn arrow_stream_to_df;
fn arrow_stream_to_s;
fn export_df_to_arrow_stream;

//robj meta
fn mem_address;
fn clone_robj;
Expand Down
16 changes: 16 additions & 0 deletions src/rust/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,3 +1006,19 @@ pub fn collect_hinted_result_rerr<T>(
}
Ok(new_vec)
}

//keep error simple to interface with other libs
pub fn robj_str_ptr_to_usize(robj: &Robj) -> RResult<usize> {
|| -> RResult<usize> {
let str: &str = robj
.as_str()
.ok_or(RPolarsErr::new().plain("robj str ptr not a str".into()))?;
let us: usize = str.parse()?;
Ok(us)
}()
.when("converting robj str pointer to usize")
}

pub fn usize_to_robj_str(us: usize) -> Robj {
format!("{us}").into()
}