Skip to content

Commit f7d3dac

Browse files
authored
[core][feat] Allow edge properties (#2182)
1 parent f11d871 commit f7d3dac

22 files changed

+350
-230
lines changed

fixcore/fixcore/action_handlers/merge_deferred_edge_handler.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
1-
from attr import frozen
2-
3-
from fixcore.db.model import QueryModel
4-
from fixcore.message_bus import MessageBus, Action
5-
import logging
61
import asyncio
2+
import logging
73
from asyncio import Task, Future
8-
from typing import Optional, Tuple, List
4+
from collections import defaultdict
95
from contextlib import suppress
106
from datetime import timedelta
11-
from fixcore.model.graph_access import ByNodeId, NodeSelector
12-
from fixcore.service import Service
13-
from fixcore.task.model import Subscriber
7+
from typing import Optional, Tuple, List, Dict
8+
9+
from attr import frozen
10+
11+
from fixcore.db.db_access import DbAccess
12+
from fixcore.db.model import QueryModel
1413
from fixcore.ids import NodeId, SubscriberId
15-
from fixcore.task.task_handler import TaskHandlerService
1614
from fixcore.ids import TaskId
17-
from fixcore.task.subscribers import SubscriptionHandler
18-
from fixcore.db.db_access import DbAccess
15+
from fixcore.message_bus import MessageBus, Action
16+
from fixcore.model.graph_access import ByNodeId, NodeSelector, DeferredEdge
1917
from fixcore.model.model_handler import ModelHandler
2018
from fixcore.query.query_parser import parse_query
21-
19+
from fixcore.service import Service
20+
from fixcore.task.model import Subscriber
21+
from fixcore.task.subscribers import SubscriptionHandler
22+
from fixcore.task.task_handler import TaskHandlerService
23+
from fixcore.types import EdgeType
2224

2325
log = logging.getLogger(__name__)
2426

@@ -83,21 +85,21 @@ async def find_node_id(selector: NodeSelector) -> Optional[NodeId]:
8385
log.warning(f"task_id: {task_id}: Error {e} when finding node {selector}")
8486
return None
8587

86-
edges: List[Tuple[NodeId, NodeId, str]] = []
88+
edges: Dict[EdgeType, List[Tuple[NodeId, NodeId, DeferredEdge]]] = defaultdict(list)
8789
for pending_edge in pending_edges:
8890
for edge in pending_edge.edges:
8991
from_id = await find_node_id(edge.from_node)
9092
to_id = await find_node_id(edge.to_node)
9193
processed += 1
9294
if from_id and to_id:
93-
edges.append((from_id, to_id, edge.edge_type))
95+
edges[edge.edge_type].append((from_id, to_id, edge))
9496

9597
# apply edges in graph
9698
updated, deleted = await graph_db.update_deferred_edges(edges, first.created_at)
9799
# delete processed edge definitions
98100
for task_id in task_ids:
99101
await deferred_outer_edge_db.delete_for_task(task_id)
100-
log.info(f"DeferredEdges: {len(edges)} edges: {updated} updated, {deleted} deleted. ({task_ids})")
102+
log.info(f"DeferredEdges: {processed} edges: {updated} updated, {deleted} deleted. ({task_ids})")
101103
return DeferredMergeResult(processed, updated, deleted)
102104
else:
103105
log.info(f"MergeOuterEdgesHandler: no pending edges found. ({task_ids})")

fixcore/fixcore/cli/command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2781,7 +2781,8 @@ def fmt_json(elem: Json) -> JsonElement:
27812781
first = False
27822782
return result
27832783
elif is_edge(elem):
2784-
return f'{elem.get("from")} -> {elem.get("to")}: {elem.get("edge_type")}'
2784+
ers = to_str("", er) if isinstance(er := elem.get("reported"), dict) else ""
2785+
return f'{elem.get("from")} -{elem.get("edge_type")}-> {elem.get("to")}: {ers}'
27852786
else:
27862787
return elem
27872788

fixcore/fixcore/db/arango_query.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -882,12 +882,13 @@ def inout(in_crsr: str, start: int, until: int, edge_type: str, direction: str)
882882
in_c = ctx.next_crs("io_in")
883883
out = ctx.next_crs("io_out")
884884
out_crsr = ctx.next_crs("io_crs")
885-
link = ctx.next_crs("io_link")
885+
e = ctx.next_crs("io_link")
886886
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
887-
link_str = f", {link}" if with_edges else ""
887+
link_str = f", {e}" if with_edges else ""
888888
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"
889889
inout_result = (
890-
f"MERGE({out_crsr}, {{_from:{link}._from, _to:{link}._to, _link_id:{link}._id}})"
890+
# merge edge and vertex properties - will be split in the output transformer
891+
f"MERGE({out_crsr}, {{_from:{e}._from, _to:{e}._to, _link_id:{e}._id, _link_reported:{e}.reported}})"
891892
if with_edges
892893
else out_crsr
893894
)

fixcore/fixcore/db/async_arangodb.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
from fixcore.async_extensions import run_async
3030
from fixcore.error import QueryTookToLongError
31-
from fixcore.util import identity
3231
from fixcore.ids import GraphName
32+
from fixcore.util import identity
3333

3434
log = logging.getLogger(__name__)
3535

@@ -124,6 +124,8 @@ async def next_filtered(self) -> Optional[Json]:
124124
# example: vertex_name_default/edge_id -> default
125125
"edge_type": re.sub("/.*$", "", link_id[self.vt_len :]), # noqa: E203
126126
}
127+
if reported := element.get("_link_reported"):
128+
edge["reported"] = reported
127129
# make sure that both nodes of the edge have been visited already
128130
if from_id not in self.visited_node or to_id not in self.visited_node:
129131
self.deferred_edges.append(edge)

fixcore/fixcore/db/db_access.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from fixcore.db.arangodb_extensions import ArangoHTTPClient
2222
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncCursor
2323
from fixcore.db.configdb import config_entity_db, config_validation_entity_db
24-
from fixcore.db.deferredouteredgedb import deferred_outer_edge_db
24+
from fixcore.db.deferrededgesdb import deferred_outer_edge_db
2525
from fixcore.db.entitydb import EventEntityDb
2626
from fixcore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB
2727
from fixcore.db.jobdb import job_db

fixcore/fixcore/db/deferredouteredgedb.py renamed to fixcore/fixcore/db/deferrededgesdb.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
from typing import List, cast
88
import logging
99

10-
from fixcore.model.typed_model import from_js
1110
from fixcore.types import Json
1211
from fixcore.ids import GraphName
12+
from fixlib.json import from_json
1313

1414

1515
@define
16-
class DeferredOuterEdges:
16+
class DeferredEdges:
1717
id: str
1818
change_id: str
1919
task_id: TaskId
@@ -28,14 +28,14 @@ class DeferredOuterEdges:
2828
log = logging.getLogger(__name__)
2929

3030

31-
class DeferredOuterEdgeDb(ArangoEntityDb[str, DeferredOuterEdges]):
32-
async def all_for_task(self, task_id: TaskId) -> List[DeferredOuterEdges]:
31+
class DeferredEdgesDb(ArangoEntityDb[str, DeferredEdges]):
32+
async def all_for_task(self, task_id: TaskId) -> List[DeferredEdges]:
3333
result = []
3434
async with await self.db.aql_cursor(
3535
f"FOR e IN `{self.collection_name}` FILTER e.task_id == @task_id RETURN e", bind_vars={"task_id": task_id}
3636
) as cursor:
3737
async for doc in cursor:
38-
edges = from_js(doc, DeferredOuterEdges)
38+
edges = from_json(doc, DeferredEdges)
3939
result.append(edges)
4040
return result
4141

@@ -53,8 +53,10 @@ async def create_update_schema(self) -> None:
5353
collection = self.db.collection(self.collection_name)
5454
if ttl_index_name not in {idx["name"] for idx in cast(List[Json], collection.indexes())}:
5555
log.info(f"Add index {ttl_index_name} on {collection.name}")
56-
collection.add_ttl_index(["created_at"], TWO_HOURS, "deferred_edges_expiration_index")
56+
collection.add_index(
57+
dict(type="ttl", fields=["created_at"], expireAfter=TWO_HOURS, name="deferred_edges_expiration_index")
58+
)
5759

5860

59-
def deferred_outer_edge_db(db: AsyncArangoDB, collection: str) -> DeferredOuterEdgeDb:
60-
return DeferredOuterEdgeDb(db, collection, DeferredOuterEdges, lambda k: k.id)
61+
def deferred_outer_edge_db(db: AsyncArangoDB, collection: str) -> DeferredEdgesDb:
62+
return DeferredEdgesDb(db, collection, DeferredEdges, lambda k: k.id)

0 commit comments

Comments
 (0)