Skip to content

Commit 7f41f80

Browse files
authored
[core][fix] move slotted data to previous slot, not next (#2226)
1 parent e0f8435 commit 7f41f80

File tree

3 files changed

+16
-16
lines changed

3 files changed

+16
-16
lines changed

fixcore/fixcore/db/arango_query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,9 +1157,9 @@ def load_time_series(
11571157
time_slot = ctx.next_crs()
11581158
slotter = int(granularity.total_seconds())
11591159
gran = ctx.add_bind_var(slotter)
1160-
offset = int(start.timestamp() - ((start.timestamp() // slotter) * slotter))
1160+
offset = ctx.add_bind_var(slotter - int(start.timestamp() - ((start.timestamp() // slotter) * slotter)))
11611161
# slot the time by averaging each single group
1162-
query += f" LET {time_slot} = (FLOOR(d.at / @{gran}) * @{gran}) + @{ctx.add_bind_var(offset)}"
1162+
query += f" LET {time_slot} = (FLOOR((d.at + @{offset}) / @{gran}) * @{gran}) - @{offset}"
11631163
query += f" COLLECT group_slot={time_slot}, complete_group=d.group"
11641164
if avg_factor: # Required as long as https://github.com/arangodb/arangodb/issues/21096 is not fixed
11651165
assert avg_factor > 0, "Given average factor must be greater than 0!"

fixcore/tests/fixcore/db/arango_query_test.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -331,29 +331,29 @@ def test_load_time_series() -> None:
331331
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=[])
332332
assert (
333333
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
334-
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
334+
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
335335
"COLLECT group_slot=m0, complete_group=d.group "
336336
"AGGREGATE slot_avg = AVG(d.v) "
337337
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
338338
"FOR d in m1 COLLECT group_slot=d.at AGGREGATE agg_val=avg(d.v) "
339339
"SORT group_slot RETURN {at: group_slot, v: agg_val}"
340340
)
341-
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
341+
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
342342
# no group by defined --> group by all values
343343
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour)
344344
assert (
345345
q == "FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
346-
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
346+
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
347347
"COLLECT group_slot=m0, complete_group=d.group "
348348
"AGGREGATE slot_avg = AVG(d.v) "
349349
"RETURN {at: group_slot, group: complete_group, v: slot_avg}"
350350
)
351-
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
351+
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
352352
# group by specific group variables
353353
q, bv = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"])
354354
assert (
355355
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 "
356-
"LET m0 = (FLOOR(d.at / @b3) * @b3) + @b4 "
356+
"LET m0 = (FLOOR((d.at + @b4) / @b3) * @b3) - @b4 "
357357
"COLLECT group_slot=m0, complete_group=d.group "
358358
"AGGREGATE slot_avg = AVG(d.v) "
359359
"RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
@@ -362,22 +362,22 @@ def test_load_time_series() -> None:
362362
"AGGREGATE agg_val=avg(d.v) "
363363
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
364364
)
365-
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 800}
365+
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": 3600, "b4": 2800}
366366
# group by specific group variables and filter by group variables
367367
q, bv = load_time_series(
368368
"ts", "foo", now - (24 * one_hour), now, one_hour, group_by=["a", "b"], group_filter=[P("a").eq("a")]
369369
)
370370
assert (
371371
q == "LET m1 = ( FOR d in `ts` FILTER d.ts==@b0 AND d.at>=@b1 AND d.at<@b2 FILTER d.group.a == @b3 "
372-
"LET m0 = (FLOOR(d.at / @b4) * @b4) + @b5 "
372+
"LET m0 = (FLOOR((d.at + @b5) / @b4) * @b4) - @b5 "
373373
"COLLECT group_slot=m0, complete_group=d.group "
374374
"AGGREGATE slot_avg = AVG(d.v) RETURN {at: group_slot, group: complete_group, v: slot_avg} )\n "
375375
"FOR d in m1 "
376376
"COLLECT group_slot=d.at, group_a=d.group.a, group_b=d.group.b "
377377
"AGGREGATE agg_val=avg(d.v) "
378378
"SORT group_slot RETURN {at: group_slot,group: { a: group_a, b: group_b }, v: agg_val}"
379379
)
380-
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 800}
380+
assert bv == {"b0": "foo", "b1": 1699913600, "b2": 1700000000, "b3": "a", "b4": 3600, "b5": 2800}
381381
# use avg-factor
382382
q, _ = load_time_series("ts", "foo", now - (24 * one_hour), now, one_hour, avg_factor=1000)
383383
assert "slot_avg = AVG(d.v / @b" in q # factor divides average

fixcore/tests/fixcore/db/timeseriesdb_test.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
9191
"bucket": "Bucket(start=5mo25d, end=2yr, resolution=3d)",
9292
"start": "2021-12-01T00:00:00Z",
9393
"end": "2023-06-04T00:00:00Z",
94-
"data_points": 90, # 24 days (1d->3d) --> 0,3,6,9,12,15,18,21,24 ==> 9 with 10 entries each ==> 90
94+
"data_points": 80, # 24 days (1d->3d) --> 0,3,6,9,12,15,18,21 ==> 8 with 10 entries each ==> 80
9595
},
9696
]
9797
}
98-
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 450
98+
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 440
9999
assert await timeseries_db.downsample(now) == "No changes since last downsample run"
100-
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 450
100+
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 440
101101
assert await timeseries_db.downsample(now=now + timedelta(days=27)) == {
102102
"test": [
103103
{
@@ -114,7 +114,7 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
114114
},
115115
]
116116
}
117-
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 230
117+
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 220
118118
assert await timeseries_db.downsample(now=now + timedelta(days=200)) == {
119119
"test": [
120120
{
@@ -125,9 +125,9 @@ async def create_ts(before_now: timedelta, granularity: timedelta, number_of_ent
125125
}
126126
]
127127
}
128-
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140
128+
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 130
129129
assert await timeseries_db.downsample(now=now + timedelta(days=400)) == {}
130-
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 140
130+
assert timeseries_db.db.collection(timeseries_db.collection_name).count() == 130
131131

132132

133133
async def test_acquire_lock(timeseries_db: TimeSeriesDB) -> None:

0 commit comments

Comments
 (0)