@@ -53,6 +53,7 @@ def __init__(
5353 self .cursor_exhausted = False
5454 self .trafo : Callable [[Json ], Optional [Any ]] = trafo if trafo else identity # type: ignore
5555 self .vt_len : Optional [int ] = None
56+ self .on_hold : Optional [Json ] = None
5657 self .get_next : Callable [[], Awaitable [Optional [Json ]]] = (
5758 self .next_filtered if flatten_nodes_and_edges else self .next_element
5859 )
@@ -61,7 +62,11 @@ async def __anext__(self) -> Any:
6162 # if there is an on-hold element: unset and return it
6263 # background: a graph node contains vertex and edge information.
6364 # since this method can only return one element at a time, the edge is put on-hold for vertex+edge data.
64- if self .cursor_exhausted :
65+ if self .on_hold :
66+ res = self .on_hold
67+ self .on_hold = None
68+ return res
69+ elif self .cursor_exhausted :
6570 return await self .next_deferred_edge ()
6671 else :
6772 try :
@@ -94,35 +99,44 @@ async def next_element(self) -> Optional[Json]:
9499
95100 async def next_filtered (self ) -> Optional [Json ]:
96101 element = await self .next_from_db ()
102+ vertex : Optional [Json ] = None
103+ edge : Optional [Json ] = None
97104 try :
98- if (from_id := element .get ("_from" )) and (to_id := element .get ("_to" )) and (node_id := element .get ("_id" )):
99- if node_id not in self .visited_edge :
100- self .visited_edge .add (node_id )
101- if not self .vt_len :
102- self .vt_len = len (re .sub ("/.*$" , "" , from_id )) + 1
103- edge = {
104- "type" : "edge" ,
105- # example: vertex_name/node_id -> node_id
106- "from" : from_id [self .vt_len :], # noqa: E203
107- # example: vertex_name/node_id -> node_id
108- "to" : to_id [self .vt_len :], # noqa: E203
109- # example: vertex_name_default/edge_id -> default
110- "edge_type" : re .sub ("/.*$" , "" , node_id [self .vt_len :]), # noqa: E203
111- }
112- if reported := element .get ("reported" ):
113- edge ["reported" ] = reported
114- # make sure that both nodes of the edge have been visited already
115- if from_id not in self .visited_node or to_id not in self .visited_node :
116- self .deferred_edges .append (edge )
117- return None
118- else :
119- return edge
120- elif key := element .get ("_key" ):
105+ if ep := element .get ("_edge" ):
106+ if (from_id := ep .get ("_from" )) and (to_id := ep .get ("_to" )) and (node_id := ep .get ("_id" )):
107+ if node_id not in self .visited_edge :
108+ self .visited_edge .add (node_id )
109+ if not self .vt_len :
110+ self .vt_len = len (re .sub ("/.*$" , "" , from_id )) + 1
111+ edge = {
112+ "type" : "edge" ,
113+ # example: vertex_name/node_id -> node_id
114+ "from" : from_id [self .vt_len :], # noqa: E203
115+ # example: vertex_name/node_id -> node_id
116+ "to" : to_id [self .vt_len :], # noqa: E203
117+ # example: vertex_name_default/edge_id -> default
118+ "edge_type" : re .sub ("/.*$" , "" , node_id [self .vt_len :]), # noqa: E203
119+ }
120+ if reported := ep .get ("reported" ):
121+ edge ["reported" ] = reported
122+ # make sure that both nodes of the edge have been visited already
123+ if from_id not in self .visited_node or to_id not in self .visited_node :
124+ self .deferred_edges .append (edge )
125+ edge = None
126+ if key := element .get ("_key" ):
121127 if key not in self .visited_node :
122128 self .visited_node .add (key )
123- return self .trafo (element )
129+ vertex = self .trafo (element )
130+ else :
131+ vertex = element
132+ # if the vertex is not returned: return the edge
133+ # otherwise return the vertex and remember the edge
134+ if vertex :
135+ self .on_hold = edge
136+ return vertex
124137 else :
125- return element
138+ return edge
139+
126140 except Exception as ex :
127141 log .warning (f"Could not read element { element } : { ex } . Ignore." )
128142 return None
0 commit comments