Skip to content

Commit 4f24d5b

Browse files
authored
[core][feat] Add history timeline (#2152)
1 parent 0c4ee17 commit 4f24d5b

File tree

6 files changed

+186
-11
lines changed

6 files changed

+186
-11
lines changed

fixcore/fixcore/db/arango_query.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,23 @@ def history_query(db: Any, query_model: QueryModel) -> Tuple[str, Json]:
137137
return f"""{query_str} FOR result in {cursor}{last_limit} RETURN UNSET(result, {unset_props})""", ctx.bind_vars
138138

139139

140+
def history_query_histogram(db: Any, query_model: QueryModel, granularity: timedelta) -> Tuple[str, Json]:
141+
ctx = ArangoQueryContext()
142+
query = rewrite_query(query_model)
143+
in_cursor, query_str = query_string(
144+
db, query, query_model, f"`{db.name}_node_history`", False, ctx, id_column="id", use_fulltext_index=False
145+
)
146+
crs = ctx.next_crs()
147+
slot = ctx.add_bind_var(granularity.total_seconds() * 1000)
148+
query_str += (
149+
f" FOR {crs} IN {in_cursor} "
150+
f"COLLECT change={crs}.change, at=DATE_ISO8601(FLOOR(DATE_TIMESTAMP({crs}.changed_at) / @{slot}) * @{slot}) "
151+
f"WITH COUNT INTO v SORT at ASC "
152+
'RETURN {"at": at, "group": {"change": change}, "v": v}'
153+
)
154+
return query_str, ctx.bind_vars
155+
156+
140157
def query_view_string(
141158
db: Any,
142159
query: Query,

fixcore/fixcore/db/async_arangodb.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ async def aql_cursor(
201201
stream: Optional[bool] = None,
202202
skip_inaccessible_cols: Optional[bool] = None,
203203
max_runtime: Optional[Number] = None,
204+
fill_block_cache: Optional[bool] = None,
205+
allow_dirty_read: bool = False,
206+
allow_retry: bool = False,
207+
force_one_shard_attribute_value: Optional[str] = None,
204208
) -> AsyncCursorContext:
205209
cursor: Cursor = await run_async(
206210
self.db.aql.execute, # type: ignore
@@ -224,6 +228,10 @@ async def aql_cursor(
224228
stream,
225229
skip_inaccessible_cols,
226230
max_runtime,
231+
fill_block_cache,
232+
allow_dirty_read,
233+
allow_retry,
234+
force_one_shard_attribute_value,
227235
)
228236
return AsyncCursorContext(
229237
AsyncCursor(

fixcore/fixcore/db/graphdb.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,19 @@ async def search_history(
185185
) -> AsyncCursorContext:
186186
pass
187187

188+
@abstractmethod
189+
async def history_timeline(
190+
self,
191+
query: QueryModel,
192+
before: datetime,
193+
after: datetime,
194+
granularity: Optional[timedelta] = None,
195+
changes: Optional[List[HistoryChange]] = None,
196+
timeout: Optional[timedelta] = None,
197+
**kwargs: Any,
198+
) -> AsyncCursorContext:
199+
pass
200+
188201
@abstractmethod
189202
async def list_possible_values(
190203
self,
@@ -731,21 +744,21 @@ async def search_list(
731744
ttl=cast(Number, int(timeout.total_seconds())) if timeout else None,
732745
)
733746

734-
async def search_history(
747+
def _history_query_model(
735748
self,
736749
query: QueryModel,
737750
changes: Optional[List[HistoryChange]] = None,
738751
before: Optional[datetime] = None,
739752
after: Optional[datetime] = None,
740-
with_count: bool = False,
741-
timeout: Optional[timedelta] = None,
742-
**kwargs: Any,
743-
) -> AsyncCursorContext:
753+
) -> QueryModel:
744754
more_than_one = len(query.query.parts) > 1
745755
has_invalid_terms = any(query.query.find_terms(lambda t: isinstance(t, (FulltextTerm, MergeTerm))))
746756
has_navigation = any(p.navigation for p in query.query.parts)
747757
if more_than_one or has_invalid_terms or has_navigation:
748758
raise AttributeError("Fulltext, merge terms and navigation is not supported in history queries!")
759+
if before and after and before < after:
760+
raise AttributeError("Before marks the end and should be greater than after!")
761+
749762
# adjust query
750763
term = query.query.current_part.term
751764
if changes:
@@ -754,16 +767,25 @@ async def search_history(
754767
term = term.and_term(P.single("changed_at").gt(utc_str(after)))
755768
if before:
756769
term = term.and_term(P.single("changed_at").lt(utc_str(before)))
757-
query = QueryModel(evolve(query.query, parts=[evolve(query.query.current_part, term=term)]), query.model)
770+
return QueryModel(evolve(query.query, parts=[evolve(query.query.current_part, term=term)]), query.model)
771+
772+
async def search_history(
773+
self,
774+
query: QueryModel,
775+
changes: Optional[List[HistoryChange]] = None,
776+
before: Optional[datetime] = None,
777+
after: Optional[datetime] = None,
778+
with_count: bool = False,
779+
timeout: Optional[timedelta] = None,
780+
**kwargs: Any,
781+
) -> AsyncCursorContext:
782+
query = self._history_query_model(query, changes, before, after)
758783
q_string, bind = arango_query.history_query(self, query)
759784
trafo = (
760785
None
761786
if query.query.aggregate
762787
else self.document_to_instance_fn(
763-
query.model,
764-
query,
765-
["change", "changed_at", "before", "diff"],
766-
id_column="id",
788+
query.model, query, ["change", "changed_at", "before", "diff"], id_column="id"
767789
)
768790
)
769791
ttl = cast(Number, int(timeout.total_seconds())) if timeout else None
@@ -777,6 +799,26 @@ async def search_history(
777799
ttl=ttl,
778800
)
779801

802+
async def history_timeline(
803+
self,
804+
query: QueryModel,
805+
before: datetime,
806+
after: datetime,
807+
granularity: Optional[timedelta] = None,
808+
changes: Optional[List[HistoryChange]] = None,
809+
timeout: Optional[timedelta] = None,
810+
**kwargs: Any,
811+
) -> AsyncCursorContext:
812+
# ignore aggregates functions for timelines
813+
if query.query.aggregate is not None:
814+
query = evolve(query, query=evolve(query.query, aggregate=None))
815+
# in case no granularity is provided we will compute one: 1/25 of the time range but at least one hour
816+
gran = max(granularity or abs(before - after) / 25, timedelta(hours=1))
817+
query = self._history_query_model(query, changes, before, after)
818+
q_string, bind = arango_query.history_query_histogram(self, query, gran)
819+
ttl = cast(Number, int(timeout.total_seconds())) if timeout else None
820+
return await self.db.aql_cursor(query=q_string, bind_vars=bind, batch_size=10000, ttl=ttl)
821+
780822
async def search_graph_gen(
781823
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
782824
) -> AsyncCursorContext:
@@ -1849,6 +1891,18 @@ async def search_history(
18491891
await self.event_sender.core_event(CoreEvent.HistoryQuery, context, **counters)
18501892
return await self.real.search_history(query, changes, before, after, with_count, timeout, **kwargs)
18511893

1894+
async def history_timeline(
1895+
self,
1896+
query: QueryModel,
1897+
before: datetime,
1898+
after: datetime,
1899+
granularity: Optional[timedelta] = None,
1900+
changes: Optional[List[HistoryChange]] = None,
1901+
timeout: Optional[timedelta] = None,
1902+
**kwargs: Any,
1903+
) -> AsyncCursorContext:
1904+
return await self.real.history_timeline(query, before, after, granularity, changes, timeout, **kwargs)
1905+
18521906
async def search_graph_gen(
18531907
self, query: QueryModel, with_count: bool = False, timeout: Optional[timedelta] = None, **kwargs: Any
18541908
) -> AsyncCursorContext:

fixcore/fixcore/static/api-doc.yaml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,67 @@ paths:
13101310
application/json:
13111311
schema:
13121312
$ref: "#/components/schemas/EstimatedSearchCost"
1313+
/graph/{graph_id}/search/history/timeline:
1314+
post:
1315+
summary: "Search history events and provide the number of events over time."
1316+
description: |
1317+
Search all history events and a histogram over time.
1318+
A section can be defined (defaults to `/` == root) to interpret relative property paths.
1319+
Example: is(volume) and (reported.age>23d or desired.clean==true or metadata.version==2)
1320+
tags:
1321+
- graph_search
1322+
parameters:
1323+
- $ref: "#/components/parameters/graph_id"
1324+
- $ref: "#/components/parameters/section"
1325+
- name: before
1326+
in: query
1327+
description: "Count all history events before the given timestamp"
1328+
required: true
1329+
schema:
1330+
type: string
1331+
format: date-time
1332+
- name: after
1333+
in: query
1334+
description: "Count all history events after the given timestamp"
1335+
required: true
1336+
schema:
1337+
type: string
1338+
format: date-time
1339+
- name: granularity
1340+
in: query
1341+
description: "Optional parameter to define the granularity of the timeline"
1342+
required: false
1343+
schema:
1344+
type: string
1345+
format: duration
1346+
- name: change
1347+
in: query
1348+
description: "Optional parameter to get all history events with the given change type"
1349+
required: false
1350+
schema:
1351+
type: string
1352+
enum:
1353+
- node_created
1354+
- node_updated
1355+
- node_deleted
1356+
requestBody:
1357+
description: "The search to perform"
1358+
content:
1359+
text/plain:
1360+
schema:
1361+
type: string
1362+
example: is(volume) and reported.volume_size>100
1363+
responses:
1364+
"200":
1365+
description: "The result of this search in the defined format"
1366+
content:
1367+
application/json:
1368+
example: |
1369+
[
1370+
{ "at": "2024-07-14T00:00:00.000Z", "group": { "change": "node_created" }, "v": 170 },
1371+
{ "at": "2024-07-15T00:00:00.000Z", "group": { "change": "node_updated" }, "v": 833 },
1372+
{ "at": "2024-07-15T00:00:00.000Z", "group": { "change": "node_created" }, "v": 1166 }
1373+
]
13131374
/graph/{graph_id}/search/history/list:
13141375
post:
13151376
summary: "Search all history events and return them."

fixcore/fixcore/web/api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
WorkerTaskInProgress,
134134
)
135135
from fixlib.asynchronous.web.ws_handler import accept_websocket, clean_ws_handler
136+
from fixlib.durations import parse_duration
136137
from fixlib.jwt import encode_jwt
137138
from fixlib.x509 import cert_to_bytes
138139

@@ -246,6 +247,7 @@ def __add_routes(self, prefix: str) -> None:
246247
web.post(prefix + "/graph/{graph_id}/search/aggregate", require(self.query_aggregation, r)),
247248
web.post(prefix + "/graph/{graph_id}/search/history/list", require(self.query_history, r)),
248249
web.post(prefix + "/graph/{graph_id}/search/history/aggregate", require(self.query_history, r)),
250+
web.post(prefix + "/graph/{graph_id}/search/history/timeline", require(self.history_timeline, r)),
249251
web.post(prefix + "/graph/{graph_id}/property/attributes", require(self.possible_values, r)),
250252
web.post(prefix + "/graph/{graph_id}/property/values", require(self.possible_values, r)),
251253
web.post(prefix + "/graph/{graph_id}/property/path/complete", require(self.property_path_complete, r)),
@@ -1238,6 +1240,23 @@ async def query_aggregation(self, request: Request, deps: TenantDependencies) ->
12381240
request, cursor, count=cursor.count(), total_count=cursor.full_count(), query_stats=cursor.stats()
12391241
)
12401242

1243+
async def history_timeline(self, request: Request, deps: TenantDependencies) -> StreamResponse:
1244+
graph_db, query_model = await self.graph_query_model_from_request(request, deps)
1245+
before = request.query.getone("before")
1246+
after = request.query.getone("after")
1247+
granularity = request.query.get("granularity")
1248+
changes = if_set(request.query.get("change"), lambda x: x.split(","))
1249+
async with await graph_db.history_timeline(
1250+
query=query_model,
1251+
before=parse_utc(before),
1252+
after=parse_utc(after),
1253+
granularity=parse_duration(granularity) if granularity else None,
1254+
changes=[HistoryChange[change] for change in changes] if changes else None,
1255+
) as cursor:
1256+
return await self.stream_response_from_gen(
1257+
request, cursor, count=cursor.count(), total_count=cursor.full_count(), query_stats=cursor.stats()
1258+
)
1259+
12411260
async def query_history(self, request: Request, deps: TenantDependencies) -> StreamResponse:
12421261
graph_db, query_model = await self.graph_query_model_from_request(request, deps)
12431262
before = request.query.get("before")

fixcore/tests/fixcore/db/graphdb_test.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
from pytest import mark, raises
1313

1414
from fixcore.analytics import CoreEvent, InMemoryEventSender
15+
from fixcore.db.db_access import DbAccess
1516
from fixcore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB, HistoryChange
1617
from fixcore.db.model import QueryModel, GraphUpdate
17-
from fixcore.db.db_access import DbAccess
1818
from fixcore.error import ConflictingChangeInProgress, NoSuchChangeError, InvalidBatchUpdate
1919
from fixcore.ids import NodeId, GraphName
2020
from fixcore.model.graph_access import GraphAccess, EdgeTypes, Section
@@ -434,6 +434,22 @@ async def nodes(query: Query, **args: Any) -> List[Json]:
434434
assert len(await nodes(Query.by("foo"), after=five_min_ago, changes=[HistoryChange.node_deleted])) == 0
435435

436436

437+
@mark.asyncio
438+
async def test_query_history_timeline(filled_graph_db: ArangoGraphDB, foo_model: Model) -> None:
439+
async def nodes(query: Query, **args: Any) -> List[Json]:
440+
async with await filled_graph_db.history_timeline(QueryModel(query, foo_model), **args) as crsr:
441+
return [x async for x in crsr]
442+
443+
now_plus_60 = utc() + timedelta(minutes=60)
444+
now_min_60 = now_plus_60 - timedelta(minutes=120)
445+
slices = await nodes(Query.by("foo"), after=now_min_60, before=now_plus_60)
446+
assert len(slices) == 1
447+
assert slices[0]["v"] == 10
448+
slices = await nodes(Query.by("bla"), after=now_min_60, before=now_plus_60)
449+
assert len(slices) == 1
450+
assert slices[0]["v"] == 100
451+
452+
437453
@mark.asyncio
438454
async def test_query_graph(filled_graph_db: ArangoGraphDB, foo_model: Model) -> None:
439455
graph = await load_graph(filled_graph_db, foo_model)

0 commit comments

Comments
 (0)