Skip to content

Commit

Permalink
Implement $read_ndjson() and $scan_ndjson() (#471)
Browse files Browse the repository at this point in the history
Co-authored-by: sorhawell <sorhawell@gmail.com>
  • Loading branch information
etiennebacher and sorhawell committed Nov 8, 2023
1 parent 07c10af commit cb53e75
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 3 deletions.
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Suggests:
curl,
data.table,
ggplot2,
jsonlite,
knitr,
lubridate,
nanoarrow,
Expand Down Expand Up @@ -92,6 +93,7 @@ Collate:
'group_by.R'
'info.R'
'ipc.R'
'json.R'
'lazyframe__group_by.R'
'lazyframe__lazy.R'
'namespace.R'
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
like objects. To use this feature, needs to build Rust library with full features
(#457).
- New methods `$peak_min()` and `$peak_max()` to find local minima and maxima in
a Series (#462).
an Expr (#462).
- New methods `$read_ndjson()` and `$scan_ndjson()` (#471).

# polars 0.9.0

Expand Down
6 changes: 5 additions & 1 deletion R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ pl$read_csv = function(
}


check_is_link = function(path, reuse_downloaded) {
check_is_link = function(path, reuse_downloaded, raise_error = FALSE) {
if (!file.exists(path)) {
con = NULL

Expand Down Expand Up @@ -277,6 +277,10 @@ check_is_link = function(path, reuse_downloaded) {

path = tmp_file # redirect path to tmp downloaded file
} else {

if(raise_error) {
stop("failed to locate file at path/url: ", path)
}
# do nothing let path fail on rust side
path = NULL
}
Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ new_from_csv <- function(path, paths, has_header, separator, comment_char, quote

import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, memmap) .Call(wrap__import_arrow_ipc, path, n_rows, cache, rechunk, row_name, row_count, memmap)

new_from_ndjson <- function(path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) .Call(wrap__new_from_ndjson, path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset)

new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning)

test_rpolarserr <- function() .Call(wrap__test_rpolarserr)
Expand Down
109 changes: 109 additions & 0 deletions R/json.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#' New LazyFrame from NDJSON
#'
#' @description
#' Read a file from path into a polars LazyFrame.
#' @name scan_ndjson
#' @rdname IO_scan_ndjson
#'
#' @param path Path to a file or URL. It is possible to provide multiple paths
#' provided that all NDJSON files have the same schema. It is not possible to
#' provide several URLs.
#' @param infer_schema_length Maximum number of rows to read to infer the column
#' types. If set to 0, all columns will be read as UTF-8. If `NULL`, a full
#' table scan will be done (slow).
#' @param batch_size Number of rows that will be processed per thread.
#' @param n_rows Maximum number of rows to read.
#' @param low_memory Reduce memory usage (will yield a lower performance).
#' @param rechunk Reallocate to contiguous memory when all chunks / files are
#' parsed.
#' @param row_count_name If not `NULL`, this will insert a row count column with
#' the given name into the DataFrame.
#' @param row_count_offset Offset to start the row_count column (only used if
#' the name is set).
#' @param reuse_downloaded If `TRUE`(default) and a URL was provided, cache the
#' downloaded files in session for an easy reuse.
#' @return A LazyFrame
#'
# we should use @examplesIf but altdoc doesn't know how to parse it yet
#' @examples
#' if (require("jsonlite", quietly = TRUE)) {
#' ndjson_filename = tempfile()
#' jsonlite::stream_out(iris, file(ndjson_filename), verbose = FALSE)
#' pl$scan_ndjson(ndjson_filename)$collect()
#' }

pl$scan_ndjson = function(
path,
infer_schema_length = 100,
batch_size = NULL,
n_rows = NULL,
low_memory = FALSE,
rechunk = TRUE,
row_count_name = NULL,
row_count_offset = 0,
reuse_downloaded = TRUE
) {

# capture all args and modify some to match lower level function
args = as.list(environment())

# check if url link and predownload, wrap in result, robj_to! can unpack R-result
args[['path']] = lapply(
path, check_is_link, reuse_downloaded = reuse_downloaded, raise_error = TRUE
) |>
result()

args[['reuse_downloaded']] = NULL

## call low level function with args
check_no_missing_args(new_from_ndjson, args)
do.call(new_from_ndjson, args) |>
unwrap("in pl$scan_ndjson")
}

#' New DataFrame from NDJSON
#'
#' @description
#' Read a file from path into a polars DataFrame.
#' @name read_ndjson
#' @rdname IO_read_ndjson
#'
#' @param path Path to a file or URL. It is possible to provide multiple paths
#' provided that all NDJSON files have the same schema. It is not possible to
#' provide several URLs.
#' @param infer_schema_length Maximum number of rows to read to infer the column
#' types. If set to 0, all columns will be read as UTF-8. If `NULL`, a full
#' table scan will be done (slow).
#' @param batch_size Number of rows that will be processed per thread.
#' @param n_rows Maximum number of rows to read.
#' @param low_memory Reduce memory usage (will yield a lower performance).
#' @param rechunk Reallocate to contiguous memory when all chunks / files are
#' parsed.
#' @param row_count_name If not `NULL`, this will insert a row count column with
#' the given name into the DataFrame.
#' @param row_count_offset Offset to start the row_count column (only used if
#' the name is set).
#'
#' @return A DataFrame
#'
# we should use @examplesIf but altdoc doesn't know how to parse it yet
#' @examples
#' if (require("jsonlite", quietly = TRUE)) {
#' ndjson_filename = tempfile()
#' jsonlite::stream_out(iris, file(ndjson_filename), verbose = FALSE)
#' pl$read_ndjson(ndjson_filename)
#' }
pl$read_ndjson = function(
path,
infer_schema_length = 100,
batch_size = NULL,
n_rows = NULL,
low_memory = FALSE,
rechunk = TRUE,
row_count_name = NULL,
row_count_offset = 0) {
mc = match.call()
mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_ndjson
eval.parent(mc)$collect()
}

42 changes: 42 additions & 0 deletions man/IO_read_ndjson.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions man/IO_scan_ndjson.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/pl_pl.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rust/src/rdataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use polars::prelude::{self as pl, IntoLazy, SerWriter};
use std::result::Result;
pub mod read_csv;
pub mod read_ipc;
pub mod read_ndjson;
pub mod read_parquet;
use crate::conversion_r_to_s::robjname2series;
use crate::lazy;
Expand Down Expand Up @@ -528,6 +529,7 @@ extendr_module! {
mod rdataframe;
use read_csv;
use read_ipc;
use read_ndjson;
use read_parquet;
use rdatatype;

Expand Down
52 changes: 52 additions & 0 deletions src/rust/src/rdataframe/read_ndjson.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//read ndjson

use crate::lazy::dataframe::LazyFrame;
use crate::robj_to;
use crate::rpolarserr::*;
use polars::io::RowCount;

//use crate::utils::wrappers::*;
use extendr_api::{extendr, prelude::*, Rinternals};
use polars::prelude as pl;
use polars::prelude::LazyFileListReader;
use std::result::Result;

#[allow(clippy::too_many_arguments)]
#[extendr]
pub fn new_from_ndjson(
path: Robj,
infer_schema_length: Robj,
batch_size: Robj,
n_rows: Robj,
low_memory: Robj,
rechunk: Robj,
row_count_name: Robj,
row_count_offset: Robj,
) -> RResult<LazyFrame> {
let offset = robj_to!(Option, u32, row_count_offset)?.unwrap_or(0);
let opt_rowcount =
robj_to!(Option, String, row_count_name)?.map(|name| RowCount { name, offset });

let vec_pathbuf = robj_to!(Vec, PathBuf, path)?;
let linereader = match vec_pathbuf.len() {
2.. => Ok(pl::LazyJsonLineReader::new_paths(vec_pathbuf.into())),
1 => Ok(pl::LazyJsonLineReader::new(&vec_pathbuf[0])),
_ => rerr().plain("path cannot have zero length").bad_arg("path"),
}?;

linereader
.with_infer_schema_length(robj_to!(Option, usize, infer_schema_length)?)
.with_batch_size(robj_to!(Option, usize, batch_size)?)
.with_n_rows(robj_to!(Option, usize, n_rows)?)
.low_memory(robj_to!(bool, low_memory)?)
.with_row_count(opt_rowcount)
.with_rechunk(robj_to!(bool, rechunk)?)
.finish()
.map_err(polars_to_rpolars_err)
.map(LazyFrame)
}

extendr_module! {
mod read_ndjson;
fn new_from_ndjson;
}
8 changes: 8 additions & 0 deletions src/rust/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ pub fn robj_to_string(robj: extendr_api::Robj) -> RResult<String> {
}
}

pub fn robj_to_pathbuf(robj: extendr_api::Robj) -> RResult<std::path::PathBuf> {
Ok(std::path::PathBuf::from(robj_to_string(robj)?))
}

pub fn robj_to_str<'a>(robj: extendr_api::Robj) -> RResult<&'a str> {
let robj = unpack_r_result_list(robj)?;
use extendr_api::Length;
Expand Down Expand Up @@ -1054,6 +1058,10 @@ macro_rules! robj_to_inner {
$crate::rdatatype::robj_to_join_type($a)
};

(PathBuf, $a:ident) => {
$crate::utils::robj_to_pathbuf($a)
};

(RArrow_schema, $a:ident) => {
$crate::utils::robj_to_rarrow_schema($a)
};
Expand Down
Loading

0 comments on commit cb53e75

Please sign in to comment.