Skip to content

Commit

Permalink
feat(nodejs): add ipc & parquet buffer & stream writers (#2463)
Browse files Browse the repository at this point in the history
* feat(nodejs): add ipc & parquet buffer & stream writers
  • Loading branch information
universalmind303 committed Jan 25, 2022
1 parent e72a38c commit ea383f6
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 21 deletions.
7 changes: 3 additions & 4 deletions nodejs-polars/__tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ describe("ipc", () => {
const df = pl.readIPC(ipcpath);
expect(df.shape).toStrictEqual({height: 27, width: 4});
});
test("read:buffer", () => {
const buff = fs.readFileSync(ipcpath);
test("read/write:buffer", () => {

const buff = pl.readCSV(csvpath).toIPC();
const df = pl.readIPC(buff);
expect(df.shape).toStrictEqual({height: 27, width: 4});
});
Expand All @@ -191,7 +192,6 @@ describe("ipc", () => {
expect(ipcDF).toFrameEqual(csvDF);
});

// // https://github.com/pola-rs/polars/issues/2403
test.skip("read:options", () => {
const df = pl.readIPC(ipcpath, {numRows: 4});
expect(df.shape).toStrictEqual({height: 4, width: 4});
Expand All @@ -202,7 +202,6 @@ describe("ipc", () => {
expect(df.shape).toStrictEqual({height: 27, width: 4});
});

// https://github.com/pola-rs/polars/issues/2403
test.skip("scan:options", () => {
const df = pl.scanIPC(ipcpath, {numRows: 4}).collectSync();
expect(df.shape).toStrictEqual({height: 4, width: 4});
Expand Down
53 changes: 45 additions & 8 deletions nodejs-polars/polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,7 @@ export interface DataFrame extends Arithmetic<DataFrame> {
*/
toObject(): object
toObject(options: {orient: "row" | "col" | "dataframe"}): object

/**
* Write Dataframe to JSON string, file, or write stream
* @param destination file or write stream
Expand Down Expand Up @@ -1214,14 +1215,16 @@ export interface DataFrame extends Arithmetic<DataFrame> {
* @param file File path to which the file should be written.
* @param options.compression Compression method *defaults to "uncompressed"*
* */
toIPC(path: string, options?: WriteIPCOptions): void
toIPC(options?: WriteIPCOptions): Buffer
toIPC(destination: string | Writable, options?: WriteIPCOptions): void

/**
* Write the DataFrame disk in parquet format.
* @param file File path to which the file should be written.
* @param options.compression Compression method *defaults to "uncompressed"*
* */
toParquet(path: string, options?: WriteParquetOptions): void
toParquet(options?: WriteParquetOptions): Buffer
toParquet(destination: string | Writable, options?: WriteParquetOptions): void
toSeries(index: number): Series<any>
toString(): string
/**
Expand Down Expand Up @@ -1353,6 +1356,32 @@ export const dfWrapper = (_df: JsDataFrame): DataFrame => {
return body;
}
};

const writeToBufferOrStream = (dest, format: "ipc" | "parquet", options) => {
if(dest instanceof Writable) {
unwrap(`write_${format}_stream`, {writeStream: dest, ...options});

dest.end("");


} else if (typeof dest === "string") {
return unwrap(`write_${format}_path`, {path: dest, ...options});

} else {
let buffers: Buffer[] = [];
const writeStream = new Stream.Writable({
write(chunk, _encoding, callback) {
buffers.push(chunk);
callback(null);
}
});

unwrap(`write_${format}_stream`, {writeStream, ...options, ...dest});
writeStream.end("");

return Buffer.concat(buffers);
}
};
const df = {
/** @ignore */
_df,
Expand Down Expand Up @@ -1640,6 +1669,7 @@ export const dfWrapper = (_df: JsDataFrame): DataFrame => {
return wrap("slice", opts);
},
sort(arg, reverse=false) {

if(arg?.by !== undefined) {
return this.sort(arg.by, arg.reverse);
}
Expand All @@ -1650,7 +1680,7 @@ export const dfWrapper = (_df: JsDataFrame): DataFrame => {

}

return wrap("sort", {by: arg, reverse});
return wrap("sort", {by: [arg].flat(), reverse});

},
std: noArgWrap("std"),
Expand All @@ -1669,7 +1699,6 @@ export const dfWrapper = (_df: JsDataFrame): DataFrame => {
return writeToStreamOrString(null, "csv", {...options, ...dest});

} else {

// toCSV()
// toCSV("path/to/some/file", options)
// toCSV(writeStream, options)
Expand Down Expand Up @@ -1719,11 +1748,19 @@ export const dfWrapper = (_df: JsDataFrame): DataFrame => {
return writeToStreamOrString(arg0, "json", options);
}
},
toParquet(path, {compression} = {compression: "uncompressed"}) {
return unwrap("writeParquet", {path, compression});
toParquet(dest?, options = {compression: "uncompressed"}) {
if(dest?.compression !== undefined) {
return writeToBufferOrStream(null, "parquet", dest);
} else {
return writeToBufferOrStream(dest, "parquet", options);
}
},
toIPC(path, {compression} = {compression: "uncompressed"}) {
return unwrap("writeIPC", {path, compression});
toIPC(dest?, options = {compression: "uncompressed"}) {
if(dest?.compression !== undefined) {
return writeToBufferOrStream(null, "ipc", dest);
} else {
return writeToBufferOrStream(dest, "ipc", options);
}
},
toSeries: (index) => seriesWrapper(unwrap("select_at_idx", {index})),
toString: () => noArgUnwrap<any>("as_str")().toString(),
Expand Down
4 changes: 2 additions & 2 deletions nodejs-polars/src/dataframe/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ pub fn take_with_series(cx: CallContext) -> JsResult<JsExternal> {
pub fn sort(cx: CallContext) -> JsResult<JsExternal> {
let params = get_params(&cx)?;
let df = params.get_external::<DataFrame>(&cx, "_df")?;
let by_column = params.get_as::<&str>("by")?;
let by_columns = params.get_as::<Vec<String>>("by")?;
let reverse = params.get_as::<bool>("reverse")?;
df.sort(by_column, reverse)
df.sort(by_columns, reverse)
.map_err(JsPolarsEr::from)?
.try_into_js(&cx)
}
Expand Down
59 changes: 54 additions & 5 deletions nodejs-polars/src/dataframe/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,35 @@ pub(crate) fn write_parquet_path(cx: CallContext) -> JsResult<JsUndefined> {

cx.env.get_undefined()
}

#[js_function(1)]
pub(crate) fn write_parquet_stream(_cx: CallContext) -> JsResult<JsUndefined> {
// JSWriteStream needs to implement 'Seek'
todo!()
pub(crate) fn write_parquet_stream(cx: CallContext) -> JsResult<JsUndefined> {
let params = get_params(&cx)?;
let compression = params.get_as::<String>("compression")?;
let df = params.get_external::<DataFrame>(&cx, "_df")?;
let stream = params.get::<JsObject>("writeStream")?;
let writeable = JsWriteStream {
inner: stream,
env: cx.env,
};
let compression = params.get_as::<String>("compression")?;
let compression = match compression.as_str() {
"uncompressed" => ParquetCompression::Uncompressed,
"snappy" => ParquetCompression::Snappy,
"gzip" => ParquetCompression::Gzip,
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli,
"lz4" => ParquetCompression::Lz4,
"zstd" => ParquetCompression::Zstd,
s => return Err(JsPolarsEr::Other(format!("compression {} not supported", s)).into()),
};

ParquetWriter::new(writeable)
.with_compression(compression)
.finish(df)
.map_err(JsPolarsEr::from)?;

cx.env.get_undefined()
}

// ------
Expand Down Expand Up @@ -335,10 +360,9 @@ pub(crate) fn read_ipc_buffer(cx: CallContext) -> JsResult<JsExternal> {
#[js_function(1)]
pub(crate) fn write_ipc_path(cx: CallContext) -> JsResult<JsUndefined> {
let params = get_params(&cx)?;
let compression = params.get_as::<String>("compression")?;
let df = params.get_external::<DataFrame>(&cx, "_df")?;
let path = params.get_as::<String>("path")?;

let compression = params.get_as::<String>("compression")?;
let compression = match compression.as_str() {
"uncompressed" => None,
"lz4" => Some(IpcCompression::LZ4),
Expand All @@ -354,6 +378,31 @@ pub(crate) fn write_ipc_path(cx: CallContext) -> JsResult<JsUndefined> {

cx.env.get_undefined()
}
#[js_function(1)]
pub(crate) fn write_ipc_stream(cx: CallContext) -> JsResult<JsUndefined> {
let params = get_params(&cx)?;
let df = params.get_external::<DataFrame>(&cx, "_df")?;
let stream = params.get::<JsObject>("writeStream")?;
let writeable = JsWriteStream {
inner: stream,
env: cx.env,
};
let compression = params.get_as::<String>("compression")?;

let compression = match compression.as_str() {
"uncompressed" => None,
"lz4" => Some(IpcCompression::LZ4),
"zstd" => Some(IpcCompression::ZSTD),
s => return Err(JsPolarsEr::Other(format!("compression {} not supported", s)).into()),
};

IpcWriter::new(writeable)
.with_compression(compression)
.finish(df)
.map_err(JsPolarsEr::from)?;

cx.env.get_undefined()
}

// ------
// JSON
Expand Down
7 changes: 5 additions & 2 deletions nodejs-polars/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ impl JsDataFrame {
// parquet
napi::Property::new(env, "readParquetPath")?.with_method(io::read_parquet_path),
napi::Property::new(env, "readParquetBuffer")?.with_method(io::read_parquet_buffer),
napi::Property::new(env, "writeParquet")?.with_method(io::write_parquet_path),
napi::Property::new(env, "write_parquet_path")?.with_method(io::write_parquet_path),
napi::Property::new(env, "write_parquet_stream")?.with_method(io::write_parquet_stream),
// ipc
napi::Property::new(env, "readIPCPath")?.with_method(io::read_ipc_path),
napi::Property::new(env, "readIPCBuffer")?.with_method(io::read_ipc_buffer),
napi::Property::new(env, "writeIPC")?.with_method(io::write_ipc_path),
napi::Property::new(env, "write_ipc_path")?.with_method(io::write_ipc_path),
napi::Property::new(env, "write_ipc_stream")?.with_method(io::write_ipc_stream),

])?;
Ok(df_obj)
}
Expand Down

0 comments on commit ea383f6

Please sign in to comment.