Skip to content

Commit 61e97ea

Browse files
authored
[core][fix] Traverse the graph by walking all possible paths when an edge filter is present (#2197)
1 parent 1735b14 commit 61e97ea

File tree

3 files changed

+50
-44
lines changed

3 files changed

+50
-44
lines changed

fixcore/fixcore/db/arango_query.py

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -881,36 +881,56 @@ def inout(
881881
in_crsr: str, start: int, until: int, edge_type: str, direction: str, edge_filter: Optional[Term]
882882
) -> str:
883883
nonlocal query_part
884-
in_c = ctx.next_crs("io_in")
884+
start_c = ctx.next_crs("graph_start")
885+
in_c = ctx.next_crs("gc")
886+
in_edge = f"{in_c}_edge"
887+
in_path = f"{in_c}_path"
888+
in_r = f"{in_c}_result"
885889
out = ctx.next_crs("io_out")
886-
out_crsr = ctx.next_crs("io_crs")
887-
e = ctx.next_crs("io_link")
888890
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
889891
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"
890-
inout_result = (
891-
# merge edge and vertex properties - will be split in the output transformer
892-
f"MERGE({out_crsr}, {{_from:{e}._from, _to:{e}._to, _link_id:{e}._id, _link_reported:{e}.reported}})"
893-
if with_edges
894-
else out_crsr
895-
)
892+
893+
# the path array contains the whole path from the start node.
894+
# in the case of start > 0, we need to slice the array to get the correct part
895+
def slice_or_all(in_p_part: str) -> str:
896+
return f"SLICE({in_path}.{in_p_part}, {start})" if start > 0 else f"{in_path}.{in_p_part}"
897+
898+
# Edge filter: decision to include the source element is not possible while traversing it.
899+
# When the target node is reached and edge properties are available, the decision can be made.
900+
# In case the filter succeeds, we need to select all vertices and edges on the path.
901+
# No filter but with_edges: another nested for loop required to return the node and edge
902+
# No filter and no with_edges: only the node is returned
903+
if edge_filter:
904+
# walk the path and return all vertices (and possibly edges)
905+
# this means intermediate nodes are returned multiple times and have to be made distinct
906+
# since we return nodes first, the edges can always be resolved
907+
walk_array = slice_or_all("vertices")
908+
walk_array = f'APPEND({walk_array}, {slice_or_all("edges")})' if with_edges else walk_array
909+
inout_result = f"FOR {in_r} in {walk_array} RETURN DISTINCT({in_r})"
910+
elif with_edges:
911+
# return the node and edge via a nested for loop
912+
inout_result = f"FOR {in_r} in [{in_c}, {in_edge}] FILTER {in_r}!=null RETURN DISTINCT({in_r})"
913+
else:
914+
# return only the node
915+
inout_result = f"RETURN DISTINCT {in_c}"
916+
896917
if outer_merge and part_idx == 0:
897918
graph_cursor = in_crsr
898919
outer_for = ""
899920
else:
900-
graph_cursor = in_c
901-
outer_for = f"FOR {in_c} in {in_crsr} "
921+
graph_cursor = start_c
922+
outer_for = f"FOR {start_c} in {in_crsr} "
902923

903924
# optional: add the edge filter to the query
904-
pre, fltr, post = term(e, edge_filter) if edge_filter else (None, None, None)
925+
pre, fltr, post = term(in_edge, edge_filter) if edge_filter else (None, None, None)
905926
pre_string = " " + pre if pre else ""
906927
post_string = f" AND ({post})" if post else ""
907928
filter_string = "" if not fltr and not post_string else f"{pre_string} FILTER {fltr}{post_string}"
908929
query_part += (
909930
f"LET {out} =({outer_for}"
910931
# suggested by jsteemann: use crs._id instead of crs (stored in the view and more efficient)
911-
f"FOR {out_crsr}, {e} IN {start}..{until} {dir_bound} {graph_cursor}._id "
912-
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} "
913-
f"RETURN DISTINCT {inout_result})"
932+
f"FOR {in_c}, {in_edge}, {in_path} IN {start}..{until} {dir_bound} {graph_cursor}._id "
933+
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} {inout_result})"
914934
)
915935
return out
916936

fixcore/fixcore/db/async_arangodb.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ def __init__(
5353
self.cursor_exhausted = False
5454
self.trafo: Callable[[Json], Optional[Any]] = trafo if trafo else identity # type: ignore
5555
self.vt_len: Optional[int] = None
56-
self.on_hold: Optional[Json] = None
5756
self.get_next: Callable[[], Awaitable[Optional[Json]]] = (
5857
self.next_filtered if flatten_nodes_and_edges else self.next_element
5958
)
@@ -62,11 +61,7 @@ async def __anext__(self) -> Any:
6261
# if there is an on-hold element: unset and return it
6362
# background: a graph node contains vertex and edge information.
6463
# since this method can only return one element at a time, the edge is put on-hold for vertex+edge data.
65-
if self.on_hold:
66-
res = self.on_hold
67-
self.on_hold = None
68-
return res
69-
elif self.cursor_exhausted:
64+
if self.cursor_exhausted:
7065
return await self.next_deferred_edge()
7166
else:
7267
try:
@@ -99,20 +94,10 @@ async def next_element(self) -> Optional[Json]:
9994

10095
async def next_filtered(self) -> Optional[Json]:
10196
element = await self.next_from_db()
102-
vertex: Optional[Json] = None
103-
edge = None
10497
try:
105-
_key = element["_key"]
106-
if _key not in self.visited_node:
107-
self.visited_node.add(_key)
108-
vertex = self.trafo(element)
109-
110-
from_id = element.get("_from")
111-
to_id = element.get("_to")
112-
link_id = element.get("_link_id")
113-
if from_id is not None and to_id is not None and link_id is not None:
114-
if link_id not in self.visited_edge:
115-
self.visited_edge.add(link_id)
98+
if (from_id := element.get("_from")) and (to_id := element.get("_to")) and (node_id := element.get("_id")):
99+
if node_id not in self.visited_edge:
100+
self.visited_edge.add(node_id)
116101
if not self.vt_len:
117102
self.vt_len = len(re.sub("/.*$", "", from_id)) + 1
118103
edge = {
@@ -122,21 +107,22 @@ async def next_filtered(self) -> Optional[Json]:
122107
# example: vertex_name/node_id -> node_id
123108
"to": to_id[self.vt_len :], # noqa: E203
124109
# example: vertex_name_default/edge_id -> default
125-
"edge_type": re.sub("/.*$", "", link_id[self.vt_len :]), # noqa: E203
110+
"edge_type": re.sub("/.*$", "", node_id[self.vt_len :]), # noqa: E203
126111
}
127-
if reported := element.get("_link_reported"):
112+
if reported := element.get("reported"):
128113
edge["reported"] = reported
129114
# make sure that both nodes of the edge have been visited already
130115
if from_id not in self.visited_node or to_id not in self.visited_node:
131116
self.deferred_edges.append(edge)
132-
edge = None
133-
# if the vertex is not returned: return the edge
134-
# otherwise return the vertex and remember the edge
135-
if vertex:
136-
self.on_hold = edge
137-
return vertex
117+
return None
118+
else:
119+
return edge
120+
elif key := element.get("_key"):
121+
if key not in self.visited_node:
122+
self.visited_node.add(key)
123+
return self.trafo(element)
138124
else:
139-
return edge
125+
return element
140126
except Exception as ex:
141127
log.warning(f"Could not read element {element}: {ex}. Ignore.")
142128
return None

fixcore/tests/fixcore/cli/command_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def test_search_source(cli: CLIService) -> None:
156156
assert len(result3[0]) == 3
157157

158158
result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", list_sink)
159-
assert result4[0][0]["rating"] in ["simple", "complex"]
159+
assert result4[0][0]["rating"] in ["simple", "bad", "complex"]
160160

161161
# use absolute path syntax
162162
result5 = await cli.execute_cli_command(

0 commit comments

Comments
 (0)