Skip to content

Commit b2deb5e

Browse files
authored
[core][feat] Allow aggregation function for time series data (#2031)
* [core][feat] Allow aggregation function for time series data * fix default value
1 parent 664992e commit b2deb5e

File tree

5 files changed

+26
-9
lines changed

5 files changed

+26
-9
lines changed

fixcore/fixcore/cli/command.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6242,6 +6242,7 @@ def args_info(self) -> ArgsInfo:
62426242
ArgInfo("--group", expects_value=True, can_occur_multiple_times=True, help_text="<group>"),
62436243
ArgInfo("--filter", expects_value=True, can_occur_multiple_times=True, help_text="<var><op><value>"),
62446244
ArgInfo("--granularity", expects_value=True, help_text="<duration|integer>"),
6245+
ArgInfo("--aggregation", expects_value=True, possible_values=["avg", "sum", "min", "max"]),
62456246
],
62466247
"downsample": [],
62476248
}
@@ -6273,10 +6274,18 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
62736274
parser.add_argument("--group", type=str, nargs="*", default=None)
62746275
parser.add_argument("--filter", type=predicate_term.parse, nargs="*", default=None)
62756276
parser.add_argument("--granularity", type=parse_duration_or_int, default=5)
6277+
parser.add_argument("--aggregation", choices=["avg", "sum", "min", "max"], default="avg")
62766278
p = parser.parse_args(args_parts_unquoted_parser.parse(part))
62776279
timeout = if_set(ctx.env.get("search_timeout"), duration)
62786280
cursor = await self.dependencies.db_access.time_series_db.load_time_series(
6279-
p.name, p.start, p.end, group_by=p.group, filter_by=p.filter, granularity=p.granularity, timeout=timeout
6281+
p.name,
6282+
p.start,
6283+
p.end,
6284+
group_by=p.group,
6285+
filter_by=p.filter,
6286+
granularity=p.granularity,
6287+
timeout=timeout,
6288+
aggregation=p.aggregation,
62806289
)
62816290
return CLISourceContext(cursor.count(), cursor.full_count()), cursor
62826291

fixcore/fixcore/db/arango_query.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,7 @@ def load_time_series(
875875
start: datetime,
876876
end: datetime,
877877
granularity: timedelta,
878+
group_aggregation: Literal["avg", "sum", "min", "max"] = "avg",
878879
group_by: Optional[Collection[str]] = None,
879880
group_filter: Optional[List[Predicate]] = None,
880881
) -> Tuple[str, Json]:
@@ -911,7 +912,7 @@ def load_time_series(
911912
group = f"group: {{ {', '.join(parts)} }},"
912913

913914
query += f" COLLECT {', '.join(collect)} INTO group"
914-
query += f" SORT group_slot RETURN {{at: group_slot, {group} v: AVG(group[*].d.v)}}"
915+
query += f" SORT group_slot RETURN {{at: group_slot, {group} v: {group_aggregation}(group[*].d.v)}}"
915916
return query, ctx.bind_vars
916917

917918

fixcore/fixcore/db/timeseriesdb.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import timedelta, datetime, timezone
77
from functools import partial
88
from numbers import Number
9-
from typing import Optional, List, Set, Union, cast, Callable, Dict, AsyncIterator
9+
from typing import Optional, List, Set, Union, cast, Callable, Dict, AsyncIterator, Literal
1010

1111
from attr import evolve, define
1212
from fixcore.core_config import CoreConfig
@@ -115,6 +115,7 @@ async def load_time_series(
115115
trafo: Optional[Callable[[Json], Json]] = None,
116116
batch_size: Optional[int] = None,
117117
timeout: Optional[timedelta] = None,
118+
aggregation: Literal["avg", "sum", "min", "max"] = "avg",
118119
) -> AsyncCursor:
119120
"""
120121
Load time series data.
@@ -129,6 +130,7 @@ async def load_time_series(
129130
:param trafo: Optional transformation function to apply to each result.
130131
:param batch_size: Optional batch size for the query.
131132
:param timeout: Timeout for the query to run.
133+
:param aggregation: The aggregation function to use.
132134
:return: A cursor to iterate over the time series data.
133135
"""
134136
assert start < end, "start must be before end"
@@ -145,7 +147,9 @@ async def load_time_series(
145147

146148
grl = max(grl, timedelta(hours=1))
147149

148-
qs, bv = arango_query.load_time_series(self.collection_name, name, start, end, grl, group_by, filter_by)
150+
qs, bv = arango_query.load_time_series(
151+
self.collection_name, name, start, end, grl, aggregation, group_by, filter_by
152+
)
149153

150154
def result_trafo(js: Json) -> Json:
151155
js["at"] = utc_str(datetime.fromtimestamp(js["at"], timezone.utc))
@@ -192,6 +196,7 @@ def ts_format(ts: str, js: Json) -> Json:
192196
trafo=partial(ts_format, ts.name),
193197
batch_size=100_000, # The values are tiny. Use a large batch size.
194198
timeout=timedelta(seconds=60),
199+
aggregation="avg",
195200
)
196201
]:
197202
log.info(f"Compact {ts.name} bucket {bucket} to {len(ts_data)} entries (last={ts_bucket_last})")

fixcore/fixcore/web/api.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,12 +772,14 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
772772
start = if_set(body.get("start"), parse_utc, utc() - timedelta(days=7))
773773
end = if_set(body.get("end"), parse_utc, utc())
774774
group_by: Optional[Set[str]] = if_set(body.get("group"), set)
775+
aggregation: Literal["avg", "sum", "min", "max"] = body.get("aggregation", "avg")
776+
assert aggregation in ["avg", "sum", "min", "max"], f"Invalid aggregation {aggregation}"
775777
filter_by: Optional[List[Predicate]] = if_set(
776778
body.get("filter"), lambda x: [predicate_term.parse(y) for y in x] # type: ignore
777779
)
778780
granularity: Optional[Union[int, timedelta]] = if_set(body.get("granularity"), parse_duration_or_int)
779781
cursor = await deps.db_access.time_series_db.load_time_series(
780-
name, start, end, group_by=group_by, filter_by=filter_by, granularity=granularity
782+
name, start, end, group_by=group_by, filter_by=filter_by, granularity=granularity, aggregation=aggregation
781783
)
782784
return await self.stream_response_from_gen(
783785
request, cursor, count=cursor.count(), total_count=cursor.full_count()

fixcore/tests/fixcore/db/arango_query_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def test_load_time_series() -> None:
324324
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
325325
"COLLECT group_slot=m0 INTO group "
326326
"SORT group_slot "
327-
"RETURN {at: group_slot, v: AVG(group[*].d.v)}"
327+
"RETURN {at: group_slot, v: avg(group[*].d.v)}"
328328
)
329329
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
330330
# no group by defined --> group by all values
@@ -334,7 +334,7 @@ def test_load_time_series() -> None:
334334
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
335335
"COLLECT group_slot=m0, complete_group=d.group INTO group "
336336
"SORT group_slot "
337-
"RETURN {at: group_slot, group: complete_group, v: AVG(group[*].d.v)}"
337+
"RETURN {at: group_slot, group: complete_group, v: avg(group[*].d.v)}"
338338
)
339339
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
340340
# group by specific group variables
@@ -344,7 +344,7 @@ def test_load_time_series() -> None:
344344
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
345345
"COLLECT group_slot=m0, group_a=d.group.a, group_b=d.group.b INTO group "
346346
"SORT group_slot "
347-
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: AVG(group[*].d.v)}"
347+
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: avg(group[*].d.v)}"
348348
)
349349
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
350350
# group by specific group variables and filter by group variables
@@ -357,6 +357,6 @@ def test_load_time_series() -> None:
357357
"LET m0 = (FLOOR(d.at / @b4) * @b4) + @b5 "
358358
"COLLECT group_slot=m0, group_a=d.group.a, group_b=d.group.b INTO group "
359359
"SORT group_slot "
360-
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: AVG(group[*].d.v)}"
360+
"RETURN {at: group_slot, group: { a: group_a, b: group_b }, v: avg(group[*].d.v)}"
361361
)
362362
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 800}

0 commit comments

Comments
 (0)