Skip to content

Commit

Permalink
Nodejs groupbyrolling (#3670)
Browse files Browse the repository at this point in the history
* wip node groupbyrolling

* feat(nodejs): groupby rolling

* rename groupby interface
  • Loading branch information
universalmind303 committed Jun 13, 2022
1 parent 2427ad4 commit 4154c6a
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 15 deletions.
107 changes: 107 additions & 0 deletions nodejs-polars/__tests__/groupby.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,110 @@ describe("groupby", () => {
test.todo("groups");

});
describe("groupby ops", () => {
test("rolling", () => {
let dates = [
"2020-01-01 13:45:48",
"2020-01-01 16:42:13",
"2020-01-01 16:45:09",
"2020-01-02 18:12:48",
"2020-01-03 19:45:32",
"2020-01-08 23:16:43",
];

const df = pl
.DataFrame({"dt": dates, "a": [3, 7, 5, 9, 2, 1]})
.withColumn(pl.col("dt").str.strptime(pl.Datetime));

const a = pl.col("a");
const out = df.groupByRolling({indexColumn:"dt", period:"2d"}).agg(
a.sum().as("sum_a"),
a.min().as("min_a"),
a.max().as("max_a"),
);
expect(out["sum_a"].toArray()).toEqual([3, 10, 15, 24, 11, 1]);
expect(out["max_a"].toArray()).toEqual([3, 7, 7, 9, 9, 1]);
expect(out["min_a"].toArray()).toEqual([3, 3, 3, 3, 2, 1]);
});
test("dynamic - 1", () => {
const df = pl.DataFrame({
"event_date": [
new Date("2021-04-11"),
new Date("2021-04-29"),
new Date("2021-05-29"),
],
"adm1_code": [1, 2, 1],
});
const out = df.groupByDynamic({
indexColumn: "event_date",
every: "1mo",
period: "2mo",
offset: "-1mo",
includeBoundaries: true
}).agg(pl.col("adm1_code"));
const expected = [
new Date("2021-04-01"),
new Date("2021-04-01"),
new Date("2021-05-01"),
];
const actual = out.getColumn("event_date").toArray();
expect(actual).toEqual(expected);

});
test("dynamic - 2", () => {
const df = pl.DataFrame(
{
"event_date": [
new Date("2021-04-11"),
new Date("2021-04-29"),
new Date("2021-05-29"),
],
"adm1_code": [1, 2, 1],
"five_type": ["a", "b", "a"],
"actor": ["a", "a", "a"],
"admin": ["a", "a", "a"],
"fatalities": [10, 20, 30],
}
);
const out = df.groupByDynamic({
indexColumn: "event_date",
every: "1mo",
by: ["admin", "five_type", "actor"]
}).agg(
pl.col("adm1_code").unique(),
pl.col("fatalities")
.gt(0)
.sum()
);
const expected = [
new Date("2021-04-01"),
new Date("2021-05-01"),
new Date("2021-04-01"),
];
const actual = out.getColumn("event_date").toArray();
expect(actual).toEqual(expected);
});
test("default negative every offset dynamic groupby", () => {
const dates = [
new Date("2020-01-01"),
new Date("2020-01-02"),
new Date("2020-02-01"),
new Date("2020-03-01"),
];
const df = pl.DataFrame({dt: dates, idx: Array.from({length: dates.length}, (_v, k) => k)});
const actual = df.groupByDynamic({
indexColumn: "dt",
every: "1mo",
closed: "right"
}).agg(pl.col("idx"));
const expected = pl.DataFrame({
"dt": [
new Date("2020-01-01"),
new Date("2020-01-01"),
new Date("2020-03-01"),
],
"idx": [[0], [1, 2], [3]],
});
expect(actual).toFrameEqual(expected);
});
});
35 changes: 32 additions & 3 deletions nodejs-polars/polars/dataframe.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pli from "./internals/polars_internal";
import { arrayToJsDataFrame } from "./internals/construction";
import {GroupBy} from "./groupby";
import {DynamicGroupBy, GroupBy, RollingGroupBy} from "./groupby";
import {LazyDataFrame, _LazyDataFrame} from "./lazy/dataframe";
import {concat} from "./functions";
import {Expr} from "./lazy/expr";
Expand All @@ -23,7 +23,7 @@ import {
ExprOrString
} from "./utils";

import {Arithmetic, Deserialize, Sample, Serialize} from "./shared_traits";
import {Arithmetic, Deserialize, GroupByOps, Sample, Serialize} from "./shared_traits";
import {col} from "./lazy/functions";

const inspect = Symbol.for("nodejs.util.inspect.custom");
Expand Down Expand Up @@ -222,7 +222,13 @@ interface WriteMethods {
╰─────┴─────┴─────╯
```
*/
export interface DataFrame extends Arithmetic<DataFrame>, Sample<DataFrame>, WriteMethods, Serialize {
export interface DataFrame extends
Arithmetic<DataFrame>,
Sample<DataFrame>,
WriteMethods,
Serialize,
GroupByOps<RollingGroupBy>
{
/** @ignore */
_df: any
dtypes: DataType[]
Expand Down Expand Up @@ -1609,6 +1615,29 @@ export const _DataFrame = (_df: any): DataFrame => {

return GroupBy(_df as any, columnOrColumnsStrict(by));
},
groupByRolling(opts) {
return RollingGroupBy(
_DataFrame(_df) as any,
opts.indexColumn,
opts.period,
opts.offset,
opts.closed,
opts.by
);
},
groupByDynamic({indexColumn, every, period, offset, truncate, includeBoundaries, closed, by}) {
return DynamicGroupBy(
_DataFrame(_df) as any,
indexColumn,
every,
period,
offset,
truncate,
includeBoundaries,
closed,
by
);
},
hashRows(obj: any = 0n, k1=1n, k2=2n, k3=3n) {
if (typeof obj === "number" || typeof obj === "bigint") {
return _Series(_df.hashRows(BigInt(obj), BigInt(k1), BigInt(k2), BigInt(k3)));
Expand Down
58 changes: 57 additions & 1 deletion nodejs-polars/polars/groupby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import util from "util";
import {Expr} from "./lazy/expr";
import {col, exclude} from "./lazy/functions";
import pli from "./internals/polars_internal";
import {selectionToExprList} from "./utils";
import {ColumnsOrExpr, selectionToExprList} from "./utils";


const inspect = Symbol.for("nodejs.util.inspect.custom");
Expand Down Expand Up @@ -259,3 +259,59 @@ function PivotOps(
median: pivot("median"),
};
}


export interface RollingGroupBy {
agg(column: ColumnsOrExpr, ...columns: ColumnsOrExpr[]): DataFrame
}

export function RollingGroupBy(
df: any,
indexColumn: string,
period: string,
offset?: string,
closed?,
by?: ColumnsOrExpr
): RollingGroupBy {

return {
agg(column: ColumnsOrExpr, ...columns: ColumnsOrExpr[]) {


return df
.lazy()
.groupByRolling({indexColumn, period, offset, closed, by} as any)
.agg(column as any, ...columns)
.collectSync();
}
};
}

export interface DynamicGroupBy {
agg(column: ColumnsOrExpr, ...columns: ColumnsOrExpr[]): DataFrame
}

export function DynamicGroupBy(
df: any,
indexColumn: string,
every: string,
period?: string,
offset?: string,
truncate?: boolean,
includeBoundaries?: boolean,
closed?: string,
by?: ColumnsOrExpr
): DynamicGroupBy {

return {
agg(column: ColumnsOrExpr, ...columns: ColumnsOrExpr[]) {


return df
.lazy()
.groupByDynamic({indexColumn, every, period, offset, truncate, includeBoundaries, closed, by} as any)
.agg(column as any, ...columns)
.collectSync({noOptimizations: true});
}
};
}
61 changes: 52 additions & 9 deletions nodejs-polars/polars/lazy/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
ValueOrArray
} from "../utils";
import {LazyGroupBy} from "./groupby";
import {Deserialize, Serialize} from "../shared_traits";
import {Deserialize, GroupByOps, Serialize} from "../shared_traits";


type LazyJoinOptions = {
Expand All @@ -34,7 +34,7 @@ type LazyOptions = {
/**
* Representation of a Lazy computation graph / query.
*/
export interface LazyDataFrame extends Serialize {
export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
/** @ignore */
_ldf: any;
get columns(): string[]
Expand Down Expand Up @@ -159,6 +159,7 @@ export interface LazyDataFrame extends Serialize {
*/
groupBy(by: ColumnsOrExpr, maintainOrder?: boolean): LazyGroupBy
groupBy(by: ColumnsOrExpr, opts: {maintainOrder: boolean}): LazyGroupBy

/**
* Gets the first `n` rows of the DataFrame. You probably don't want to use this!
*
Expand Down Expand Up @@ -287,6 +288,25 @@ export interface LazyDataFrame extends Serialize {
withRowCount()
}

const prepareGroupbyInputs = (by) => {
if(Array.isArray(by)) {
const newBy: any = [];
by.forEach(e => {
if(typeof e === "string") {
e = pli.col(e);
}
newBy.push(e);
});

return newBy;
} else if (typeof by === "string") {
return [pli.col(by)];
} else if (Expr.isExpr(by)) {
return [by._expr];
} else {
return [];
}
};

export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
const unwrap = (method: string, ...args: any[]) => {
Expand All @@ -295,13 +315,6 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
const wrap = (method, ...args): LazyDataFrame => {
return _LazyDataFrame(unwrap(method, ...args));
};
const wrapNullArgs = (method: string) => () => wrap(method);
const withOptimizationToggle = (method) => (lazyOptions?: LazyOptions) => {
const ldf = unwrap("optimizationToggle", lazyOptions);

return unwrap(method, {}, ldf);

};

return {
_ldf,
Expand Down Expand Up @@ -429,6 +442,36 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {

return LazyGroupBy(_ldf.groupby(by, maintainOrder));
},
groupByRolling({indexColumn, by, period, offset, closed}) {
offset = offset ?? `-${period}`;
closed = closed ?? "right";
by = prepareGroupbyInputs(by);
const lgb = _ldf.groupbyRolling(indexColumn, period, offset, closed, by);

return LazyGroupBy(lgb);
},
groupByDynamic({indexColumn, every, period, offset, truncate, includeBoundaries, closed, by}) {
period = period ?? every;
offset = offset ?? `-${period}`;
closed = closed ?? "right";
by = prepareGroupbyInputs(by);
truncate = truncate ?? true;
includeBoundaries = includeBoundaries ?? false;

const lgb = _ldf.groupbyDynamic(
indexColumn,
every,
period,
offset,
truncate,
includeBoundaries,
closed,
by
);

return LazyGroupBy(lgb);

},
head(len=5) {
return _LazyDataFrame(_ldf.slice(0, len));
},
Expand Down
6 changes: 4 additions & 2 deletions nodejs-polars/polars/lazy/groupby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ export interface LazyGroupBy {


export const LazyGroupBy = (_lgb: any): LazyGroupBy => {

return {
agg(...aggs: Expr[]) {
const agg = selectionToExprList(aggs.flat(), false);
aggs = selectionToExprList(aggs, false);
const ret = _lgb.agg(aggs.flat());

return _LazyDataFrame(_lgb.agg(agg));
return _LazyDataFrame(ret);
},
head(n=5) {
return _LazyDataFrame(_lgb.head(n));
Expand Down

0 comments on commit 4154c6a

Please sign in to comment.