Skip to content

Commit

Permalink
feat(js): add extend & horizontal concat (#2177)
Browse files Browse the repository at this point in the history
* feat(js): add extend

* feat(js): add horizontal concat
  • Loading branch information
universalmind303 committed Dec 27, 2021
1 parent 8cedd5e commit 263754a
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 26 deletions.
2 changes: 2 additions & 0 deletions nodejs-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ features = [
"reinterpret", # "decompress-fast",
"mode",
"extract_jsonpath",
"dot_diagram",
"json",
"lazy_regex",
"cum_agg",
Expand All @@ -69,6 +70,7 @@ features = [
"serde", # "asof_join", # "cross_join",
"lazy",
"repeat_by",
"horizontal_concat",
]
path = "../polars"

Expand Down
14 changes: 14 additions & 0 deletions nodejs-polars/__tests__/expr.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ describe("expr", () => {
expect(actual).toFrameEqual(expected);

});
test("extend", () => {
const df = pl.DataFrame({
a: [1, 2, 3, 4, 5],
b: [2, 3, 4, 5, 6]
});
const other = pl.Series("c", ["a", "b", "c"]);
const expected = pl.DataFrame({
a: [1, 2, 3, 4, 5],
b: [2, 3, 4, 5, 6],
c: ["a", "b", "c", null, null]
});
const actual = df.withColumn(lit(other).extend({value: null, n: 2}));
expect(actual).toFrameEqual(expected);
});
test.each`
replacement | filled
${lit(1)} | ${1}
Expand Down
22 changes: 16 additions & 6 deletions nodejs-polars/__tests__/functions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,26 @@ describe("concat", () => {
const fn = () => pl.concat([]);
expect(fn).toThrowError();
});
it("only supports vertical concats", () => {
const s1 = pl.Series("a", [1, 2, 3]);
const s2 = pl.Series("a", [4, 5, 6]);
const fn = () => pl.concat([s1, s2], {rechunk: true, how: "diagonal" as any});
expect(fn).toThrowError();
});

it("can only concat series and df", () => {
const fn = () => pl.concat([[1] as any, [2] as any]);
expect(fn).toThrowError();
});
test("horizontal concat", () => {
const a = pl.DataFrame({"a": ["a", "b"], "b": [1, 2]});
const b = pl.DataFrame({"c": [5, 7, 8, 9], "d": [1, 2, 1, 2], "e": [1, 2, 1, 2]});
const actual = pl.concat([a, b], {how:"horizontal"});
const expected = pl.DataFrame(
{
"a": ["a", "b", null, null],
"b": [1, 2, null, null],
"c": [5, 7, 8, 9],
"d": [1, 2, 1, 2],
"e": [1, 2, 1, 2],
}
);
expect(actual).toFrameEqual(expected);
});
});
describe("repeat", () => {
it("repeats a value n number of times into a series", () => {
Expand Down
6 changes: 6 additions & 0 deletions nodejs-polars/__tests__/series.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,12 @@ describe("series", () => {
`("$# $name throws an error ", ({fn, errorType}) => {
expect(fn).toThrow(errorType);
});
test("extend", () => {
const s = pl.Series("extended", [1], pl.UInt16);
const expected = pl.Series("extended", [1, null, null], pl.UInt16);
const actual = s.extend(null, 2);
expect(actual).toSeriesStrictEqual(expected);
});
});

describe("StringFunctions", () => {
Expand Down
48 changes: 36 additions & 12 deletions nodejs-polars/polars/functions.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/* eslint-disable no-redeclare */
import {jsTypeToPolarsType} from "./internals/construction";
import {Series, seriesWrapper} from "./series";
import {DataFrame} from "./dataframe";
import {DataFrame, dfWrapper} from "./dataframe";
import pli from "./internals/polars_internal";
import {isDataFrameArray, isSeriesArray} from "./utils";

type ConcatOptions = {rechunk: boolean, how?: "vertical"}
type ConcatOptions = {rechunk?: boolean, how?: "vertical" | "horizontal"}

/**
* _Repeat a single value n times and collect into a Series._
Expand All @@ -29,22 +29,46 @@ export function repeat<V>(value: V, n: number, name= ""): Series<V>{
return seriesWrapper(s);
}

export function concat(item: Array<DataFrame>): DataFrame;
export function concat<T>(item: Array<Series<T>>): Series<T>;
export function concat(item: Array<DataFrame>, options: ConcatOptions): DataFrame;
export function concat<T>(item: Array<Series<T>>, options: ConcatOptions): Series<T>;
export function concat<T>(items, options: ConcatOptions = {rechunk: true, how: "vertical"}): DataFrame | Series<T> {
/**
* Aggregate all the Dataframes/Series in a List of DataFrames/Series to a single DataFrame/Series.
* @param items DataFrames/Series/LazyFrames to concatenate.
* @param options.rechunk rechunk the final DataFrame/Series.
* @param options.how Only used if the items are DataFrames. *Defaults to 'vertical'*
* - Vertical: Applies multiple `vstack` operations.
* - Horizontal: Stacks Series horizontall and fills with nulls if the lengths don't match.
*
* @example
* >>> const df1 = pl.DataFrame({"a": [1], "b": [3]})
* >>> const df2 = pl.DataFrame({"a": [2], "b": [4]})
* >>> pl.concat([df1, df2])
* shape: (2, 2)
* ┌─────┬─────┐
* │ a ┆ b │
* │ --- ┆ --- │
* │ i64 ┆ i64 │
* ╞═════╪═════╡
* │ 1 ┆ 3 │
* ├╌╌╌╌╌┼╌╌╌╌╌┤
* │ 2 ┆ 4 │
* └─────┴─────┘
*/
export function concat(items: Array<DataFrame>, options?: ConcatOptions): DataFrame;
export function concat<T>(items: Array<Series<T>>, options?: {rechunk: boolean}): Series<T>;
export function concat<T>(items, options: ConcatOptions = {rechunk: true, how: "vertical"}) {
const {rechunk, how} = options;

if(!items.length) {
throw new RangeError("cannot concat empty list");
}
if(how !== "vertical") {
throw new Error("unsupported operation. only 'vertical' is supported at this time");
}

if(isDataFrameArray(items)) {
const df = items.reduce((acc, curr) => acc.vstack(curr));
let df;
if(how === "vertical") {
df = items.reduce((acc, curr) => acc.vstack(curr));

} else {
df = dfWrapper(pli.horizontalConcatDF({items: items.map(i => i._df)}));
}

return rechunk ? df.rechunk() : df;
}
Expand All @@ -54,5 +78,5 @@ export function concat<T>(items, options: ConcatOptions = {rechunk: true, how:

return rechunk ? s.rechunk() : s;
}
throw new Error("can only concat series and dataframes");
throw new TypeError("can only concat series and dataframes");
}
14 changes: 14 additions & 0 deletions nodejs-polars/polars/lazy/expr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ export interface Expr {
* This means that every item is expanded to a new row.
*/
explode(): Expr
/**
* Extend the Series with given number of values.
* @param value The value to extend the Series with. This value may be null to fill with nulls.
* @param n The number of values to extend.
*/
extend(value: any, n: number): Expr
extend(opt: {value: any, n: number}): Expr
/** Fill nan value with a fill value */
fillNan(other: any): Expr
/** Fill null value with a fill value or strategy */
Expand Down Expand Up @@ -1132,6 +1139,13 @@ const _Expr = (_expr: JsExpr): Expr => {
eq: wrapExprArg("eq"),
exclude,
explode: wrapNullArgs("explode"),
extend(o, n?) {
if(n !== null && typeof n === "number") {
return wrap("extend", {value: o, n});
}

return wrap("extend", o);
},
fillNan: wrapExprArg("fillNan", true),
fillNull,
fillNullWithStrategy: wrapUnary("fillNullWithStrategy", "strategy"),
Expand Down
15 changes: 15 additions & 0 deletions nodejs-polars/polars/series.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ export interface Series<T> extends ArrayLike<T> {
* ```
*/
explode(): any
/**
* Extend the Series with given number of values.
* @param value The value to extend the Series with. This value may be null to fill with nulls.
* @param n The number of values to extend.
*/
extend(value: any, n: number): Series<T>
extend(opt: {value: any, n: number}): Series<T>
/**
* __Fill null values with a filling strategy.__
* ___
Expand Down Expand Up @@ -1239,6 +1246,7 @@ export interface Series<T> extends ArrayLike<T> {

export const seriesWrapper = <T>(_s: JsSeries): Series<T> => {
const unwrap = <U>(method: string, args?: object, _series = _s): U => {

return pli.series[method]({_series, ...args });
};
const wrap = <U>(method, args?, _series = _s): Series<U> => {
Expand Down Expand Up @@ -1457,6 +1465,13 @@ export const seriesWrapper = <T>(_s: JsSeries): Series<T> => {
return this.eq(field);
},
explode: noArgWrap("explode"),
extend(o, n?) {
if(n !== null && typeof n === "number") {
return wrap("extend", {value: o, n});
}

return wrap("extend", o);
},
fillNull(strategy) {
return typeof strategy === "string" ?
wrap("fill_null", {strategy}) :
Expand Down
4 changes: 3 additions & 1 deletion nodejs-polars/src/conversion/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ impl FromJsUnknown for AnyValue<'_> {
if val.is_date()? {
let d: JsDate = unsafe { val.cast() };
let d = d.value_of()?;
Ok(AnyValue::Datetime(d as i64))
let d = d as i64 * 1000000;
Ok(AnyValue::Datetime(d))
} else {
Err(JsPolarsEr::Other("Unsupported Data type".to_owned()).into())
}
Expand All @@ -65,6 +66,7 @@ impl FromJsUnknown for AnyValue<'_> {
}
}
}

impl FromJsUnknown for Wrap<Utf8Chunked> {
fn from_js(val: JsUnknown) -> Result<Self> {
if val.is_array()? {
Expand Down
4 changes: 2 additions & 2 deletions nodejs-polars/src/conversion/into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl IntoJs<JsUnknown> for Wrap<AnyValue<'_>> {
AnyValue::Int64(v) => cx.env.create_bigint_from_i64(v).map(|v| v.into_unknown())?,
AnyValue::Float32(v) => cx.env.create_double(v as f64).map(|v| v.into_unknown()),
AnyValue::Float64(v) => cx.env.create_double(v).map(|v| v.into_unknown()),
AnyValue::Date(v) => cx.env.create_date(v as f64).map(|v| v.into_unknown()),
AnyValue::Datetime(v) => cx.env.create_date(v as f64).map(|v| v.into_unknown()),
AnyValue::Date(v) => cx.env.create_date((v/ 1000000) as f64).map(|v| v.into_unknown()),
AnyValue::Datetime(v) => cx.env.create_date((v / 1000000) as f64).map(|v| v.into_unknown()),
AnyValue::List(v) => cx.env.to_js_value(&v).map(|v| v.into_unknown()),
_ => cx.env.get_null().map(|v| v.into_unknown()),
}
Expand Down
75 changes: 74 additions & 1 deletion nodejs-polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use napi::*;

use polars::lazy::dsl;
use polars::prelude::*;

use crate::error::JsPolarsEr;
pub struct JsExpr {}
pub struct JsWhen {}
pub struct JsWhenThen {}
Expand Down Expand Up @@ -474,6 +474,79 @@ pub fn quantile(cx: CallContext) -> JsResult<JsExternal> {
.quantile(quantile, QuantileInterpolOptions::default())
.try_into_js(&cx)
}

#[js_function(1)]
pub fn extend(cx: CallContext) -> JsResult<JsExternal> {
let params = get_params(&cx)?;
let expr = params.get_external::<Expr>(&cx, "_expr")?.clone();
let val = params.get::<JsUnknown>("value")?;
let n = params.get_as::<usize>("n")?;
match val.get_type()? {
ValueType::Undefined | ValueType::Null => {
expr.apply(move |s| s.extend(AnyValue::Null, n), GetOutput::same_type())
}
ValueType::Boolean => {
let val = bool::from_js(val)?;
expr.apply(
move |s| s.extend(AnyValue::Boolean(val), n),
GetOutput::same_type(),
)
}
ValueType::Number => {
let f_val = f64::from_js(val)?;
if f_val.round() == f_val {
let val = f_val as i64;
if val > 0 && val < i32::MAX as i64 || val < 0 && val > i32::MIN as i64 {
expr.apply(
move |s| s.extend(AnyValue::Int32(val as i32), n),
GetOutput::same_type(),
)
} else {
expr.apply(
move |s| s.extend(AnyValue::Int64(val), n),
GetOutput::same_type(),
)
}
} else {
expr.apply(
move |s| s.extend(AnyValue::Float64(f_val), n),
GetOutput::same_type(),
)
}
}
ValueType::String => {
let val = String::from_js(val)?;
expr.apply(
move |s| s.extend(AnyValue::Utf8(&val), n),
GetOutput::same_type(),
)
}
ValueType::Bigint => {
let val = u64::from_js(val)?;
expr.apply(
move |s| s.extend(AnyValue::UInt64(val), n),
GetOutput::same_type(),
)
}
ValueType::Object => {
if val.is_date()? {
let d: JsDate = unsafe { val.cast() };
let d = d.value_of()?;
let d = d as i64 * 1000000;
expr.apply(
move |s| s.extend(AnyValue::Datetime(d), n),
GetOutput::same_type(),
)
} else {
return Err(JsPolarsEr::Other("Unsupported Data type".to_owned()).into())
}
}

_ => return Err(JsPolarsEr::Other("Unsupported Data type".to_owned()).into())
}
.try_into_js(&cx)
}
//
macro_rules! impl_expr {
($name:ident) => {
#[js_function(1)]
Expand Down
1 change: 1 addition & 0 deletions nodejs-polars/src/lazy/dsl_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl dsl::JsExpr {
napi::Property::new(env, "eq")?.with_method(dsl::eq),
napi::Property::new(env, "exclude")?.with_method(dsl::exclude),
napi::Property::new(env, "explode")?.with_method(dsl::explode),
napi::Property::new(env, "extend")?.with_method(dsl::extend),
napi::Property::new(env, "fillNan")?.with_method(dsl::fill_nan),
napi::Property::new(env, "fillNullWithStrategy")?
.with_method(dsl::fill_null_with_strategy),
Expand Down
18 changes: 16 additions & 2 deletions nodejs-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,26 @@ use crate::lazy::dsl;
use crate::lazy::functions;
use crate::lazy::lazyframe_object::JsLazyFrame;
use crate::series::{repeat, JsSeries};
use napi::{JsObject, Result};
use napi::{JsObject, Result as JsResult, CallContext, JsExternal};
use polars_core::prelude::DataFrame;
use polars_core::functions as pl_functions;
use crate::conversion::prelude::*;

#[macro_use]
extern crate napi_derive;

#[js_function(1)]
pub fn hor_concat_df(cx: CallContext) -> JsResult<JsExternal> {
let params = get_params(&cx)?;

let dfs = params.get_external_vec::<DataFrame>(&cx, "items")?;
let df = pl_functions::hor_concat_df(&dfs).map_err(crate::error::JsPolarsEr::from)?;
df.try_into_js(&cx)

}

#[module_exports]
pub fn init(mut exports: JsObject, env: napi::Env) -> Result<()> {
pub fn init(mut exports: JsObject, env: napi::Env) -> JsResult<()> {
let ldf = JsLazyFrame::to_object(&env)?;
let series = JsSeries::to_object(&env)?;
let df = JsDataFrame::to_object(&env)?;
Expand Down Expand Up @@ -48,5 +61,6 @@ pub fn init(mut exports: JsObject, env: napi::Env) -> Result<()> {
exports.create_named_method("cov", functions::cov)?;
exports.create_named_method("pearsonCorr", functions::pearson_corr)?;
exports.create_named_method("spearmanRankCorr", functions::spearman_rank_corr)?;
exports.create_named_method("horizontalConcatDF", hor_concat_df)?;
Ok(())
}

0 comments on commit 263754a

Please sign in to comment.