Skip to content

Commit e9ed544

Browse files
authored
[core][fix] Timeseries apply aggregation on slotted values (#2033)
1 parent b2deb5e commit e9ed544

File tree

2 files changed

+39
-19
lines changed

2 files changed

+39
-19
lines changed

fixcore/fixcore/db/arango_query.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -895,9 +895,19 @@ def load_time_series(
895895
slotter = int(granularity.total_seconds())
896896
gran = ctx.add_bind_var(slotter)
897897
offset = start.timestamp() - ((start.timestamp() // slotter) * slotter)
898+
# slot the time by averaging each single group
898899
query += f" LET {time_slot} = (FLOOR(d.at / @{gran}) * @{gran}) + @{ctx.add_bind_var(offset)}"
900+
query += f" COLLECT group_slot={time_slot}, complete_group=d.group"
901+
query += " AGGREGATE slot_avg = AVG(d.v)"
902+
query += " RETURN {at: group_slot, group: complete_group, v: slot_avg}"
903+
904+
# short circuit: no additional grouping and aggregation is avg
905+
if group_by is None and group_aggregation == "avg":
906+
return query, ctx.bind_vars # already the correct query
907+
899908
# create the groups to collect
900-
collect = [f"group_slot={time_slot}"]
909+
slotted = ctx.next_crs()
910+
collect = ["group_slot=d.at"]
901911
group = ""
902912
if group_by is None:
903913
collect.append("complete_group=d.group")
@@ -911,8 +921,10 @@ def load_time_series(
911921
parts.append(f"{g}: group_{g}")
912922
group = f"group: {{ {', '.join(parts)} }},"
913923

914-
query += f" COLLECT {', '.join(collect)} INTO group"
915-
query += f" SORT group_slot RETURN {{at: group_slot, {group} v: {group_aggregation}(group[*].d.v)}}"
924+
query = f"LET {slotted} = ( {query} )\n"
925+
query += f" FOR d in {slotted} COLLECT {', '.join(collect)}"
926+
query += f" AGGREGATE agg_val={group_aggregation}(d.v)"
927+
query += f" SORT group_slot RETURN {{at: group_slot,{group} v: agg_val}}"
916928
return query, ctx.bind_vars
917929

918930

fixcore/tests/fixcore/db/arango_query_test.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -320,43 +320,51 @@ def test_load_time_series() -> None:
320320
# group_by=[] --> no group by any value
321321
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=[])
322322
assert (
323-
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
323+
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
324324
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
325-
"COLLECT group_slot=m0 INTO group "
326-
"SORT group_slot "
327-
"RETURN {at: group_slot, v: avg(group[*].d.v)}"
325+
"COLLECT group_slot=m0, complete_group=d.group "
326+
"AGGREGATE slot_avg = AVG(d.v) "
327+
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
328+
"FOR d in m1 COLLECT group_slot=d.at AGGREGATE agg_val=avg(d.v) "
329+
"SORT group_slot RETURN {at: group_slot, v: agg_val}"
328330
)
329331
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
330332
# no group by defined --> group by all values
331333
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour)
332334
assert (
333335
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
334336
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
335-
"COLLECT group_slot=m0, complete_group=d.group INTO group "
336-
"SORT group_slot "
337-
"RETURN {at: group_slot, group: complete_group, v: avg(group[*].d.v)}"
337+
"COLLECT group_slot=m0, complete_group=d.group "
338+
"AGGREGATE slot_avg = AVG(d.v) "
339+
"RETURN {at: group_slot, group: complete_group, v: slot_avg}"
338340
)
339341
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
340342
# group by specific group variables
341343
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"])
342344
assert (
343-
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
345+
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
344346
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
345-
"COLLECT group_slot=m0, group_a=d.group.a, group_b=d.group.b INTO group "
346-
"SORT group_slot "
347-
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: avg(group[*].d.v)}"
347+
"COLLECT group_slot=m0, complete_group=d.group "
348+
"AGGREGATE slot_avg = AVG(d.v) "
349+
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
350+
"FOR d in m1 "
351+
"COLLECT group_slot=d.at, group_a=d.group.a, group_b=d.group.b "
352+
"AGGREGATE agg_val=avg(d.v) "
353+
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
348354
)
349355
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
350356
# group by specific group variables and filter by group variables
351357
q, bv = load_time_series(
352358
"ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"], group_filter=[P("a").eq("a")]
353359
)
354360
assert (
355-
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
356-
"FILTER d.group.a==@b3 "
361+
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 FILTER d.group.a==@b3 "
357362
"LET m0 = (FLOOR(d.at / @b4) * @b4) + @b5 "
358-
"COLLECT group_slot=m0, group_a=d.group.a, group_b=d.group.b INTO group "
359-
"SORT group_slot "
360-
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: avg(group[*].d.v)}"
363+
"COLLECT group_slot=m0, complete_group=d.group "
364+
"AGGREGATE slot_avg = AVG(d.v) RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
365+
"FOR d in m1 "
366+
"COLLECT group_slot=d.at, group_a=d.group.a, group_b=d.group.b "
367+
"AGGREGATE agg_val=avg(d.v) "
368+
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
361369
)
362370
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 800}

0 commit comments

Comments
 (0)