Skip to content

Commit ab1561b

Browse files
authored
[core][feat] TimeSeries: allow defining a factor for computing the average (#2126)
1 parent 32d82a5 commit ab1561b

File tree

5 files changed

+48
-12
lines changed

5 files changed

+48
-12
lines changed

fixcore/fixcore/cli/command.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6186,9 +6186,9 @@ def key_fn(node: Json) -> Union[str, Tuple[str, str]]:
61866186
class TimeSeriesCommand(CLICommand):
61876187
"""
61886188
```
6189-
timeseries snapshot --name <time series> <aggregate search>
6189+
timeseries snapshot --name <time series> [--avg-factor <factor>] <aggregate search>
61906190
timeseries get --name <time series> --start <time> --end <time> --granularity <duration|integer>
6191-
--group <group> --filter <var><op><value>
6191+
--group <group> --filter <var><op><value> --avg-factor <factor>
61926192
timeseries list
61936193
timeseries downsample
61946194
```
@@ -6219,7 +6219,8 @@ class TimeSeriesCommand(CLICommand):
62196219
- `--group` - The variable(s) to select in the group. Can be repeated multiple times.
62206220
- `--filter` - The variable(s) to filter. Can be repeated multiple times.
62216221
- `--granularity` - The granularity of the time series. Can be a duration or an integer.
6222-
6222+
- `--avg-factor` - Factor used to divide values to compute the average.
6223+
This is required for very big numbers to avoid overflow.
62236224
62246225
## Examples
62256226
@@ -6250,6 +6251,7 @@ def args_info(self) -> ArgsInfo:
62506251
"list": [],
62516252
"snapshot": [
62526253
ArgInfo("--name", help_text="<time series>."),
6254+
ArgInfo("--avg-factor", help_text="<avg factor>."),
62536255
ArgInfo(expects_value=True, value_hint="search"),
62546256
],
62556257
"get": [
@@ -6260,6 +6262,7 @@ def args_info(self) -> ArgsInfo:
62606262
ArgInfo("--filter", expects_value=True, can_occur_multiple_times=True, help_text="<var><op><value>"),
62616263
ArgInfo("--granularity", expects_value=True, help_text="<duration|integer>"),
62626264
ArgInfo("--aggregation", expects_value=True, possible_values=["avg", "sum", "min", "max"]),
6265+
ArgInfo("--avg-factor", help_text="<avg factor>."),
62636266
],
62646267
"downsample": [],
62656268
}
@@ -6268,13 +6271,16 @@ def parse(self, arg: Optional[str] = None, ctx: CLIContext = EmptyContext, **kwa
62686271
async def snapshot_time_series(part: str) -> AsyncIterator[str]:
62696272
parser = NoExitArgumentParser()
62706273
parser.add_argument("--name", type=str, required=True)
6274+
parser.add_argument("--avg-factor", type=int)
62716275
parsed, rest = parser.parse_known_args(args_parts_parser.parse(part))
62726276
graph_name = ctx.graph_name
62736277
graph_db = self.dependencies.db_access.get_graph_db(graph_name)
62746278
model = await self.dependencies.model_handler.load_model(graph_name)
62756279
query = await self.dependencies.template_expander.parse_query(" ".join(rest), ctx.section, env=ctx.env)
62766280
query_model = QueryModel(query, model, ctx.env)
6277-
res = await self.dependencies.db_access.time_series_db.add_entries(parsed.name, query_model, graph_db)
6281+
res = await self.dependencies.db_access.time_series_db.add_entries(
6282+
parsed.name, query_model, graph_db, avg_factor=parsed.avg_factor
6283+
)
62786284
yield f"{res} entries added to time series {parsed.name}."
62796285

62806286
async def load_time_series(part: str) -> Tuple[CLISourceContext, AsyncIterator[Json]]:
@@ -6292,6 +6298,7 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
62926298
parser.add_argument("--filter", type=predicate_term.parse, nargs="*", default=None)
62936299
parser.add_argument("--granularity", type=parse_duration_or_int, default=5)
62946300
parser.add_argument("--aggregation", choices=["avg", "sum", "min", "max"], default="avg")
6301+
parser.add_argument("--avg-factor", type=int)
62956302
p = parser.parse_args(args_parts_unquoted_parser.parse(part))
62966303
timeout = if_set(ctx.env.get("search_timeout"), duration)
62976304
cursor = await self.dependencies.db_access.time_series_db.load_time_series(
@@ -6303,6 +6310,7 @@ def parse_duration_or_int(s: str) -> Union[int, timedelta]:
63036310
granularity=p.granularity,
63046311
timeout=timeout,
63056312
aggregation=p.aggregation,
6313+
avg_factor=p.avg_factor,
63066314
)
63076315
return CLISourceContext(cursor.count(), cursor.full_count(), cursor.stats()), cursor
63086316

fixcore/fixcore/db/arango_query.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ def load_time_series(
871871
group_aggregation: Literal["avg", "sum", "min", "max"] = "avg",
872872
group_by: Optional[Collection[str]] = None,
873873
group_filter: Optional[List[Predicate]] = None,
874+
avg_factor: Optional[int] = None,
874875
) -> Tuple[str, Json]:
875876
ctx = ArangoQueryContext()
876877
bv_name = ctx.add_bind_var(time_series)
@@ -887,12 +888,18 @@ def load_time_series(
887888
time_slot = ctx.next_crs()
888889
slotter = int(granularity.total_seconds())
889890
gran = ctx.add_bind_var(slotter)
890-
offset = start.timestamp() - ((start.timestamp() // slotter) * slotter)
891+
offset = int(start.timestamp() - ((start.timestamp() // slotter) * slotter))
891892
# slot the time by averaging each single group
892893
query += f" LET {time_slot} = (FLOOR(d.at / @{gran}) * @{gran}) + @{ctx.add_bind_var(offset)}"
893894
query += f" COLLECT group_slot={time_slot}, complete_group=d.group"
894-
query += " AGGREGATE slot_avg = AVG(d.v)"
895-
query += " RETURN {at: group_slot, group: complete_group, v: slot_avg}"
895+
if avg_factor: # Required as long as https://github.com/arangodb/arangodb/issues/21096 is not fixed
896+
assert avg_factor > 0, "Given average factor must be greater than 0!"
897+
bvf = ctx.add_bind_var(avg_factor)
898+
query += f" AGGREGATE slot_avg = AVG(d.v / @{bvf})"
899+
query += f" RETURN {{at: group_slot, group: complete_group, v: slot_avg * @{bvf}}}"
900+
else:
901+
query += " AGGREGATE slot_avg = AVG(d.v)"
902+
query += " RETURN {at: group_slot, group: complete_group, v: slot_avg}"
896903

897904
# short circuit: no additional grouping and aggregation is avg
898905
if group_by is None and group_aggregation == "avg":

fixcore/fixcore/db/timeseriesdb.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,15 @@ async def list_time_series(self) -> List[TimeSeriesMeta]:
7777
for e in all_ts
7878
]
7979

80-
async def add_entries(self, name: str, query_model: QueryModel, graph_db: GraphDB, at: Optional[int] = None) -> int:
80+
async def add_entries(
81+
self,
82+
name: str,
83+
query_model: QueryModel,
84+
graph_db: GraphDB,
85+
*,
86+
avg_factor: Optional[int] = None,
87+
at: Optional[int] = None,
88+
) -> int:
8189
query = query_model.query
8290
model = query_model.model
8391
assert query.aggregate is not None, "Only aggregate queries are supported for time series."
@@ -97,9 +105,9 @@ async def add_entries(self, name: str, query_model: QueryModel, graph_db: GraphD
97105
# update meta information
98106
await self.__execute_aql(
99107
"UPSERT { _key: @key } "
100-
"INSERT { _key: @key, created_at: DATE_NOW(), last_updated: DATE_NOW(), count: @count } "
101-
f"UPDATE {{ last_updated: DATE_NOW(), count: @count }} IN `{self.names_db}`",
102-
dict(key=name, count=result),
108+
"INSERT { _key: @key, created_at: DATE_NOW(), last_updated: DATE_NOW(), count: @count, factor: @factor } " # noqa
109+
f"UPDATE {{ last_updated: DATE_NOW(), count: @count, factor: @factor }} IN `{self.names_db}`",
110+
dict(key=name, count=result, factor=avg_factor),
103111
)
104112
return result
105113

@@ -116,6 +124,7 @@ async def load_time_series(
116124
batch_size: Optional[int] = None,
117125
timeout: Optional[timedelta] = None,
118126
aggregation: Literal["avg", "sum", "min", "max"] = "avg",
127+
avg_factor: Optional[int] = None,
119128
) -> AsyncCursor:
120129
"""
121130
Load time series data.
@@ -131,6 +140,8 @@ async def load_time_series(
131140
:param batch_size: Optional batch size for the query.
132141
:param timeout: Timeout for the query to run.
133142
:param aggregation: The aggregation function to use.
143+
:param avg_factor: Apply this avg_factor to all values to compute the average in a slot.
144+
Required for big numbers to avoid overflows.
134145
:return: A cursor to iterate over the time series data.
135146
"""
136147
assert start < end, "start must be before end"
@@ -145,10 +156,14 @@ async def load_time_series(
145156
else:
146157
grl = duration / 20 # default to 20 datapoints if nothing is specified
147158

159+
# no avg_factor given, try to get it from the database
160+
if avg_factor is None and (nd := await self.db.get(self.names_db, name)) and (fc := nd.get("factor")):
161+
avg_factor = int(fc)
162+
148163
grl = max(grl, timedelta(hours=1))
149164

150165
qs, bv = arango_query.load_time_series(
151-
self.collection_name, name, start, end, grl, aggregation, group_by, filter_by
166+
self.collection_name, name, start, end, grl, aggregation, group_by, filter_by, avg_factor=avg_factor
152167
)
153168

154169
def result_trafo(js: Json) -> Json:

fixcore/tests/fixcore/db/arango_query_test.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,3 +367,7 @@ def test_load_time_series() -> None:
367367
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
368368
)
369369
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 800}
370+
# use avg-factor
371+
q, _ = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, avg_factor=1000)
372+
assert "slot_avg = AVG(d.v / @b" in q # factor divides average
373+
assert "v: slot_avg * @b" in q # factor multiplies result

fixcore/tests/fixcore/db/timeseriesdb_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ async def load_ts(**kwargs: Any) -> List[Json]:
4949
## check filter_by is working
5050
# some_int is the same for all entries: one every hour: 5 entries
5151
assert len(await load_ts(name="test", start=begin, end=after5h, filter_by=[(P("id").eq("1"))])) == 5
52+
# avg factor does not change the result size
53+
assert len(await load_ts(name="test", start=begin, end=after5h, filter_by=[(P("id").eq("1"))], avg_factor=10)) == 5
5254

5355

5456
async def test_compact_time_series(timeseries_db: TimeSeriesDB, foo_model: Model, filled_graph_db: GraphDB) -> None:

0 commit comments

Comments
 (0)