Skip to content

Commit

Permalink
feat(nodejs): read row_count option (#2595)
Browse files Browse the repository at this point in the history
* feat(nodejs): read row_count option

* chore: remove printlns
  • Loading branch information
universalmind303 committed Feb 10, 2022
1 parent 34b75b7 commit 9f9223e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 1 deletion.
8 changes: 8 additions & 0 deletions nodejs-polars/__tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ describe("read:csv", () => {
expect(df.getColumn("a")[0]).toBeNull();
expect(df.getColumn("b")[1]).toBeNull();
});
test("csv with rowcount", () => {
const df = pl.readCSV(csvpath, {rowCount: {name: "rc", offset: 11}});
const expectedMaxRowCount = df.height + 10;

const maxRowCount = df.getColumn("rc").max();
expect(expectedMaxRowCount).toStrictEqual(maxRowCount);

});
it.todo("can read from a stream");
});

Expand Down
10 changes: 10 additions & 0 deletions nodejs-polars/polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ type ScanParquetOptions = {
parallel?: boolean;
cache?: boolean;
rechunk?: boolean;
rowCount?: RowCount
}
type RowCount = {
name: string;
offset?: number
}

type ReadCsvOptions = {
Expand All @@ -38,6 +43,7 @@ type ReadCsvOptions = {
rechunk?: boolean;
sep?: string;
startRows?: number;
rowCount?: RowCount
};

type ReadJsonOptions = {
Expand All @@ -51,12 +57,15 @@ type ReadParquetOptions = {
numRows?: number;
parallel?: boolean;
rechunk?: boolean;
rowCount?: RowCount

}

type ReadIPCOptions = {
columns?: string[];
projection?: number[];
numRows?: number;
rowCount?: RowCount
}


Expand Down Expand Up @@ -130,6 +139,7 @@ function readCSVBuffer(buff, options) {
return dfWrapper(pli.df.readCSVBuffer({...readCsvDefaultOptions, ...options, buff}));
}
function readCSVPath(path, options) {

return dfWrapper(pli.df.readCSVPath({...readCsvDefaultOptions, ...options, path}));
}
function readJSONBuffer(buff, options) {
Expand Down
2 changes: 1 addition & 1 deletion nodejs-polars/src/conversion/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl FromJsUnknown for RowCount {
let name: JsUnknown = obj.get_named_property("name")?;
let name: String = String::from_js(name)?;

let offset: JsUnknown = obj.get_named_property("name")?;
let offset: JsUnknown = obj.get_named_property("offset")?;
let offset: u32 = u32::from_js(offset)?;

Ok(RowCount { name, offset })
Expand Down
15 changes: 15 additions & 0 deletions nodejs-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars::prelude::*;
use std::fs::File;
use std::io::{BufReader, Cursor};
use std::path::{Path, PathBuf};
use polars::io::RowCount;

#[js_function(1)]
pub(crate) fn read_columns(cx: CallContext) -> JsResult<JsExternal> {
Expand Down Expand Up @@ -58,6 +59,7 @@ pub(crate) fn read_csv_buffer(cx: CallContext) -> JsResult<JsExternal> {
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
let buff = params.get::<napi::JsBuffer>("buff")?;
let buffer_value = buff.into_value()?;
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let cursor = Cursor::new(buffer_value.as_ref());

Expand Down Expand Up @@ -94,6 +96,7 @@ pub(crate) fn read_csv_buffer(cx: CallContext) -> JsResult<JsExternal> {
.with_null_values(null_values)
.with_parse_dates(parse_dates)
.with_quote_char(quote_char)
.with_row_count(row_count)
.finish()
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
Expand Down Expand Up @@ -123,6 +126,7 @@ pub(crate) fn read_csv_path(cx: CallContext) -> JsResult<JsExternal> {
let stop_after_n_rows: Option<usize> = params.get_as("endRows")?;
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let quote_char = if let Some(s) = quote_char {
if s.is_empty() {
Expand Down Expand Up @@ -158,6 +162,7 @@ pub(crate) fn read_csv_path(cx: CallContext) -> JsResult<JsExternal> {
.with_null_values(null_values)
.with_parse_dates(parse_dates)
.with_quote_char(quote_char)
.with_row_count(row_count)
.finish()
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
Expand Down Expand Up @@ -220,6 +225,7 @@ pub(crate) fn read_parquet_path(cx: CallContext) -> JsResult<JsExternal> {
let n_rows: Option<usize> = params.get_as("numRows")?;
let parallel: bool = params.get_or("parallel", true)?;
let rechunk: bool = params.get_or("rechunk", true)?;
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let f = File::open(&path)?;

Expand All @@ -228,6 +234,7 @@ pub(crate) fn read_parquet_path(cx: CallContext) -> JsResult<JsExternal> {
.with_columns(columns)
.read_parallel(parallel)
.with_n_rows(n_rows)
.with_row_count(row_count)
.set_rechunk(rechunk)
.finish()
.map_err(JsPolarsEr::from)?
Expand All @@ -242,6 +249,8 @@ pub(crate) fn read_parquet_buffer(cx: CallContext) -> JsResult<JsExternal> {
let n_rows: Option<usize> = params.get_as("numRows")?;
let parallel: bool = params.get_or("parallel", true)?;
let rechunk: bool = params.get_or("rechunk", true)?;
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let buff = params.get::<napi::JsBuffer>("buff")?;
let buffer_value = buff.into_value()?;

Expand All @@ -252,6 +261,7 @@ pub(crate) fn read_parquet_buffer(cx: CallContext) -> JsResult<JsExternal> {
.with_columns(columns)
.read_parallel(parallel)
.with_n_rows(n_rows)
.with_row_count(row_count)
.set_rechunk(rechunk)
.finish()
.map_err(JsPolarsEr::from)?
Expand Down Expand Up @@ -324,13 +334,15 @@ pub(crate) fn read_ipc_path(cx: CallContext) -> JsResult<JsExternal> {
let columns: Option<Vec<String>> = params.get_as("columns")?;
let projection: Option<Vec<usize>> = params.get_as("projection")?;
let n_rows: Option<usize> = params.get_as("numRows")?;
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let f = File::open(&path)?;

IpcReader::new(f)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_count(row_count)
.finish()
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
Expand All @@ -342,6 +354,8 @@ pub(crate) fn read_ipc_buffer(cx: CallContext) -> JsResult<JsExternal> {
let columns: Option<Vec<String>> = params.get_as("columns")?;
let projection: Option<Vec<usize>> = params.get_as("projection")?;
let n_rows: Option<usize> = params.get_as("numRows")?;
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

let buff = params.get::<napi::JsBuffer>("buff")?;
let buffer_value = buff.into_value()?;

Expand All @@ -351,6 +365,7 @@ pub(crate) fn read_ipc_buffer(cx: CallContext) -> JsResult<JsExternal> {
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_count(row_count)
.finish()
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
Expand Down
2 changes: 2 additions & 0 deletions nodejs-polars/src/lazy/lazyframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn scan_csv(cx: CallContext) -> JsResult<JsExternal> {
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
let quote_char = quote_char.as_bytes()[0];
let delimiter = sep.as_bytes()[0];
let row_count = params.get_as::<Option<RowCount>>("rowCount")?;

LazyCsvReader::new(path)
.with_infer_schema_length(Some(infer_schema_length))
Expand All @@ -52,6 +53,7 @@ pub fn scan_csv(cx: CallContext) -> JsResult<JsExternal> {
.with_comment_char(comment_char)
.with_quote_char(Some(quote_char))
.with_null_values(null_values)
.with_row_count(row_count)
.finish()
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
Expand Down

0 comments on commit 9f9223e

Please sign in to comment.