/
graph_access.py
627 lines (540 loc) · 26.6 KB
/
graph_access.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
from __future__ import annotations
import hashlib
import json
import logging
import re
from collections import namedtuple, defaultdict
from functools import reduce
from typing import Optional, Generator, Any, Dict, List, Set, Tuple, Union
from attrs import define
from networkx import DiGraph, MultiDiGraph, all_shortest_paths, is_directed_acyclic_graph
from resotocore.ids import NodeId
from resotocore.model.model import (
Model,
Kind,
AnyKind,
ComplexKind,
ArrayKind,
DateTimeKind,
DictionaryKind,
StringKind,
Property,
SimpleKind,
DateKind,
DurationKind,
UsageDatapoint,
)
from resotocore.model.resolve_in_graph import GraphResolver, NodePath, ResolveProp
from resotocore.model.typed_model import from_js
from resotocore.types import Json, EdgeType, JsonElement
from resotocore.util import utc, utc_str, value_in_path, set_value_in_path, value_in_path_get
log = logging.getLogger(__name__)
# This version is used when the content hash of a node is computed.
# All computed hashes will be invalidated, by incrementing the version.
# This can be used, if computed values should be recomputed for all imported data.
ContentHashVersion = 3
class Section:
# The reported section contains the data gathered by the collector.
# This data is usually not changed by the user directly, but implicitly via changes on the
# infrastructure, so the next collect run will change this state.
reported = "reported"
# This section holds changes that should be reflected by the given node.
# The desired section can be queried the same way as the reported section
# and allows to query commands of the graph with a common desired state.
# For example the clean flag is manifested in the desired section.
# The separate clean step would query all nodes that should be cleaned
# and can compute the correct order of action by walking the graph structure.
desired = "desired"
# This section holds information about this node that are gathered during the import process.
# Example: This section resolves common graph attributes like cloud, account, region, zone to make
# querying the graph easy.
metadata = "metadata"
# This section holds information about security issues detected for this node.
security = "security"
# Following sections are used to lookup special kinds in the graph hierarchy to simplify access.
# See GraphResolver for details.
# All resolved ancestors are written to this section.
ancestors = "ancestors"
# Resolved descendants would be written to this section.
# Only here for completeness - currently not used.
descendants = "descendants"
# Usage of the resource
usage = "usage"
# The set of all content sections
content_ordered = [reported, security, desired, metadata]
content = set(content_ordered)
# The list of all lookup sections
lookup_sections_ordered = [ancestors, descendants, usage]
lookup_sections = set(lookup_sections_ordered)
# The list of all sections
all_ordered = [*content_ordered, *lookup_sections_ordered]
all = set(all_ordered)
# remove the section plus dot if it exists in the string: reported.foo => foo
__no_section = re.compile("^/?(" + "|".join(f"({s})" for s in content_ordered) + ")[.]")
@classmethod
def without_section(cls, path: str) -> str:
return cls.__no_section.sub("", path, 1)
class EdgeTypes:
# This edge type defines the default relationship between resources.
# It is the main edge type and is assumed, if no edge type is given.
# The related graph is also used as source of truth for graph updates.
default: EdgeType = "default"
# This edge type defines the order of delete operations.
# A resource can be deleted, if all outgoing resources are deleted.
delete: EdgeType = "delete"
# The set of all allowed edge types.
# Note: the database schema has to be adapted to support additional edge types.
all: Set[EdgeType] = {default, delete}
class Direction:
# Opposite direction as the edge direction.
inbound = "in"
# Same direction as the edge direction
outbound = "out"
# Ignore the direction of the edge and traverse in any direction.
any = "any"
# The set of all allowed directions.
all: Set[str] = {inbound, outbound, any}
EdgeKey = namedtuple("EdgeKey", ["from_node", "to_node", "edge_type"])
@define
class BySearchCriteria:
query: str
@define
class ByNodeId:
value: NodeId
NodeSelector = Union[ByNodeId, BySearchCriteria]
@define
class DeferredEdge:
from_node: NodeSelector
to_node: NodeSelector
edge_type: str
class GraphBuilder:
def __init__(self, model: Model, change_id: str):
self.model = model
self.graph = MultiDiGraph()
self.nodes = 0
self.edges = 0
self.deferred_edges: List[DeferredEdge] = []
self.usage: List[UsageDatapoint] = []
self.at = int(utc().timestamp())
self.change_id = change_id
def add_from_json(self, js: Json) -> None:
if "id" in js and Section.reported in js:
usage_json = js.get(Section.usage, {})
if len(usage_json) == 0:
usage_json = None
self.add_node(
node_id=js["id"],
reported=js[Section.reported],
desired=js.get(Section.desired, None),
metadata=js.get(Section.metadata, None),
search=js.get("search", None),
replace=js.get("replace", False) is True,
)
if usage_json:
usage = UsageDatapoint(
id=js["id"],
change_id=self.change_id,
at=self.at,
v=usage_json,
)
self.usage.append(usage)
elif "from" in js and "to" in js:
self.add_edge(js["from"], js["to"], js.get("edge_type", EdgeTypes.default))
elif "from_selector" in js and "to_selector" in js:
def parse_selector(js: Json) -> NodeSelector:
if "node_id" in js:
return ByNodeId(NodeId(from_js(js["node_id"], str)))
elif "search_criteria" in js:
return BySearchCriteria(from_js(js["search_criteria"], str))
else:
raise AttributeError(f"can't parse edge selector! Got {json.dumps(js)}")
self.store_deferred_connection(
parse_selector(js["from_selector"]),
parse_selector(js["to_selector"]),
js.get("edge_type", EdgeTypes.default),
)
else:
raise AttributeError(f"Format not understood! Got {json.dumps(js)} which is neither vertex nor edge.")
def __update_property_size(self, kind: Kind, element: JsonElement) -> None:
def prop_size(prop: Property, pk: Kind, part: JsonElement) -> None:
if part is None:
pass
elif isinstance(pk, (StringKind, DateTimeKind, DateKind, DurationKind)) and isinstance(part, str):
str_len = len(part)
if prop.metadata is None:
prop.metadata = {}
size = prop.metadata.get("len", 0)
prop.metadata["len"] = max(size, str_len)
elif isinstance(pk, SimpleKind):
pass
elif isinstance(pk, ArrayKind) and isinstance(part, list) and isinstance(pk.inner, StringKind):
for elem in part:
prop_size(prop, pk.inner, elem)
elif isinstance(pk, ArrayKind) and isinstance(part, list):
for elem in part:
self.__update_property_size(pk.inner, elem)
elif isinstance(pk, DictionaryKind) and isinstance(part, dict) and isinstance(pk.value_kind, StringKind):
for elem in part.values():
prop_size(prop, pk.value_kind, elem)
elif isinstance(pk, DictionaryKind) and isinstance(part, dict):
for k, v in part.items():
self.__update_property_size(pk.key_kind, k)
self.__update_property_size(pk.value_kind, v)
elif isinstance(pk, ComplexKind) and isinstance(part, dict):
for cp, cpk in pk.all_props_with_kind():
prop_size(cp, cpk, part.get(cp.name, None))
if isinstance(kind, ComplexKind) and isinstance(element, dict):
prop_size(Property("root", kind.fqn), kind, element)
def add_node(
self,
node_id: NodeId,
reported: Json,
desired: Optional[Json] = None,
metadata: Optional[Json] = None,
search: Optional[str] = None,
replace: bool = False,
) -> None:
self.nodes += 1
# validate kind of this reported json
coerced = self.model.check_valid(reported)
reported = reported if coerced is None else coerced
kind = self.model[reported]
# if replace is defined, make it part of metadata
if replace:
metadata = metadata or {}
metadata["replace"] = True
# create content hash
sha = GraphBuilder.content_hash(reported, desired, metadata)
# flat all properties into a single string for search
flat = search if isinstance(search, str) else (GraphBuilder.flatten(reported, kind))
self.graph.add_node(
node_id,
id=node_id,
reported=reported,
desired=desired,
metadata=metadata,
hash=sha,
kind=kind,
kinds=list(kind.kind_hierarchy()),
kinds_set=kind.kind_hierarchy(),
flat=flat,
replace=metadata.get("replace", False) is True if metadata else False,
)
# update property sizes
self.__update_property_size(kind, reported)
def add_edge(self, from_node: str, to_node: str, edge_type: EdgeType) -> None:
self.edges += 1
key = GraphAccess.edge_key(from_node, to_node, edge_type)
self.graph.add_edge(from_node, to_node, key, edge_type=edge_type)
def store_deferred_connection(self, from_selector: NodeSelector, to_selector: NodeSelector, edge_type: str) -> None:
self.deferred_edges.append(DeferredEdge(from_selector, to_selector, edge_type))
@staticmethod
def content_hash(js: Json, desired: Optional[Json] = None, metadata: Optional[Json] = None) -> str:
sha256 = hashlib.sha256()
# all content hashes will be different, when the version changes
sha256.update(ContentHashVersion.to_bytes(2, "big"))
sha256.update(json.dumps(js, sort_keys=True).encode("utf-8"))
if desired:
sha256.update(json.dumps(desired, sort_keys=True).encode("utf-8"))
if metadata:
sha256.update(json.dumps(metadata, sort_keys=True).encode("utf-8"))
return sha256.hexdigest()
@staticmethod
def flatten(js: Json, kind: Kind) -> str:
result = ""
def dispatch(value: Any, k: Kind) -> None:
nonlocal result
if isinstance(value, dict):
for prop, elem in value.items():
sub = (
k.property_kind_of(prop, AnyKind())
if isinstance(k, ComplexKind)
else (k.value_kind if isinstance(k, DictionaryKind) else AnyKind())
)
dispatch(elem, sub)
elif isinstance(value, list):
sub = k.inner if isinstance(k, ArrayKind) else AnyKind()
for elem in value:
dispatch(elem, sub)
elif value is None or isinstance(value, bool):
pass
else:
# in case of date time: "2017-05-30T22:04:34Z" -> "2017-05-30 22:04:34"
if isinstance(k, DateTimeKind):
value = re.sub("[ZT]", " ", value)
if result:
result += " "
result += str(value).strip()
dispatch(js, kind)
return result
def check_complete(self) -> None:
# check that all vertices are given, that were defined in any edge definition
# note: DiGraph will create an empty vertex node automatically
for node_id, node in self.graph.nodes(data=True):
assert node.get(Section.reported), f"{node_id} was used in an edge definition but not provided as vertex!"
edge_types: Set[str] = {edge[2] for edge in self.graph.edges(data="edge_type")}
al = EdgeTypes.all
assert not edge_types.difference(al), f"Graph contains unknown edge types! Given: {edge_types}. Known: {al}"
# make sure there is only one root node
rid = GraphAccess.root_id(self.graph)
root_node = self.graph.nodes[rid]
# make sure the root
if value_in_path(root_node, NodePath.reported_kind) == "graph_root" and rid != "root":
# remove node with wrong id +
root_node = self.graph.nodes[rid]
root_node["id"] = "root"
self.graph.add_node("root", **root_node)
for succ in list(self.graph.successors(rid)):
for edge_type in EdgeTypes.all:
key = GraphAccess.edge_key(rid, succ, edge_type)
if self.graph.has_edge(rid, succ, key):
self.graph.remove_edge(rid, succ, key)
self.add_edge("root", succ, edge_type)
self.graph.remove_node(rid)
NodeData = Tuple[str, Json, Optional[Json], Optional[Json], Optional[Json], str, List[str], str]
class GraphAccess:
def __init__(
self,
sub: MultiDiGraph,
maybe_root_id: Optional[str] = None,
visited_nodes: Optional[Set[NodeId]] = None,
visited_edges: Optional[Set[EdgeKey]] = None,
):
super().__init__()
self.g = sub
self.nodes = sub.nodes()
self.visited_nodes: Set[NodeId] = visited_nodes if visited_nodes else set()
self.visited_edges: Set[EdgeKey] = visited_edges if visited_edges else set()
self.at = utc()
self.at_json = utc_str(self.at)
self.maybe_root_id = maybe_root_id
self.resolved = False
def root(self) -> str:
return self.maybe_root_id if self.maybe_root_id else GraphAccess.root_id(self.g)
def node(self, node_id: NodeId) -> Optional[Json]:
self.visited_nodes.add(node_id)
if self.g.has_node(node_id):
n = self.nodes[node_id]
return self.dump(node_id, n)
else:
return None
def has_edge(self, from_id: object, to_id: object, edge_type: EdgeType) -> bool:
key = self.edge_key(from_id, to_id, edge_type)
result: bool = self.g.has_edge(from_id, to_id, key)
if result:
self.visited_edges.add(key)
return result
def resolve(self) -> None:
if not self.resolved:
self.resolved = True
log.info("Resolve attributes in graph")
for node_id in self.nodes:
self.__resolve(node_id, self.nodes[node_id])
self.__resolve_count_descendants()
log.info("Resolve attributes finished.")
def __resolve_count_descendants(self) -> None:
visited: Set[str] = set()
def count_successors_by(node_id: NodeId, edge_type: EdgeType, path: List[str]) -> Dict[str, int]:
result: Dict[str, int] = {}
to_visit = list(self.successors(node_id, edge_type))
while to_visit:
visit_next: List[NodeId] = []
for elem_id in to_visit:
if elem_id not in visited:
visited.add(elem_id)
elem = self.nodes[elem_id]
if not value_in_path_get(elem, NodePath.is_phantom, False):
extracted = value_in_path(elem, path)
if isinstance(extracted, str):
result[extracted] = result.get(extracted, 0) + 1
# check if there is already a successor summary: stop the traversal and take the result.
existing = value_in_path(elem, NodePath.descendant_summary)
if existing and isinstance(existing, dict):
for summary_item, count in existing.items():
result[summary_item] = result.get(summary_item, 0) + count
else:
visit_next.extend(a for a in self.successors(elem_id, edge_type) if a not in visited)
to_visit = visit_next
return result
for on_kind, prop in GraphResolver.count_successors.items():
for node_id, node in self.g.nodes(data=True):
kinds = node.get("kinds_set")
if kinds and on_kind in kinds:
summary = count_successors_by(node_id, EdgeTypes.default, prop.extract_path)
set_value_in_path(summary, prop.to_path, node)
total = reduce(lambda left, right: left + right, summary.values(), 0)
set_value_in_path(total, NodePath.descendant_count, node)
def __resolve(self, node_id: NodeId, node: Json) -> Json:
def with_ancestor(ancestor: Json, prop: ResolveProp) -> None:
extracted = value_in_path(ancestor, prop.extract_path)
if extracted:
set_value_in_path(extracted, prop.to_path, node)
for resolver in GraphResolver.to_resolve:
# search for ancestor that matches filter criteria
anc = self.ancestor_of(node_id, EdgeTypes.default, resolver.kind)
if anc:
on_self = anc.get("id") == node_id
for res in resolver.resolve:
if not on_self or res.apply_on_self:
with_ancestor(anc, res)
return node
def dump(self, node_id: NodeId, node: Json) -> Json:
kind = node.get("kind", AnyKind())
return self.dump_direct(node_id, node, kind)
def predecessors(self, node_id: NodeId, edge_type: EdgeType) -> Generator[NodeId, Any, None]:
for pred_id in self.g.predecessors(node_id):
# direction from parent node to provided node
if self.g.has_edge(pred_id, node_id, self.edge_key(pred_id, node_id, edge_type)):
yield pred_id
def successors(self, node_id: NodeId, edge_type: EdgeType) -> Generator[NodeId, Any, None]:
for succ_id in self.g.successors(node_id):
# direction from provided node to successor node
if self.g.has_edge(node_id, succ_id, self.edge_key(node_id, succ_id, edge_type)):
yield succ_id
def ancestor_of(self, node_id: NodeId, edge_type: EdgeType, kind: str) -> Optional[Json]:
# note: we are using breadth first search here on purpose.
# if there is an ancestor with less distance to this node, we should use this one
next_level = [node_id]
while next_level:
parents: List[NodeId] = []
for p_id in next_level:
p: Json = self.nodes[p_id]
kinds: Optional[List[str]] = value_in_path(p, NodePath.kinds)
if kinds and kind in kinds:
return p
else:
parents.extend(self.predecessors(p_id, edge_type))
next_level = parents
return None
def is_acyclic_per_edge_type(self) -> bool:
"""
Checks if the graph is acyclic with respect to a specific edge type.
This means it is valid if there are cycles in the graph but not for the same edge type.
:return: True if the graph is acyclic for all edge types, otherwise False.
"""
edges_per_type = defaultdict(list)
# edge is a tuple: (from_node, to_node, edge_key)
for edge in self.g.edges(keys=True):
key: EdgeKey = edge[2]
edges_per_type[key.edge_type].append(edge)
for edges in edges_per_type.values():
typed_graph = self.g.edge_subgraph(edges)
acyclic = is_directed_acyclic_graph(typed_graph)
if not acyclic:
return False
return True
@staticmethod
def dump_direct(node_id: NodeId, node: Json, kind: Kind, recompute: bool = False) -> Json:
reported = node[Section.reported]
desired: Optional[Json] = node.get(Section.desired, None)
metadata: Optional[Json] = node.get(Section.metadata, None)
if "id" not in node:
node["id"] = node_id
if recompute or "hash" not in node:
node["hash"] = GraphBuilder.content_hash(reported, desired, metadata)
if recompute or "flat" not in node:
node["flat"] = GraphBuilder.flatten(reported, kind)
if "kinds" not in node:
node["kinds"] = [reported["kind"]]
return node
def not_visited_nodes(self) -> Generator[Json, None, None]:
return (self.dump(nid, self.nodes[nid]) for nid in self.g.nodes if nid not in self.visited_nodes)
def not_visited_edges(self, edge_type: EdgeType) -> Generator[Tuple[str, str], None, None]:
# edge collection with (from, to, type): filter and drop type -> (from, to)
edges = self.g.edges(data="edge_type")
return (edge[:2] for edge in edges if edge[2] == edge_type and edge not in self.visited_edges)
@staticmethod
def edge_key(from_node: object, to_node: object, edge_type: EdgeType) -> EdgeKey:
return EdgeKey(from_node, to_node, edge_type)
@staticmethod
def root_id(graph: DiGraph) -> NodeId:
# noinspection PyTypeChecker
roots: List[NodeId] = [n for n, d in graph.in_degree if d == 0]
assert len(roots) == 1, f"Given subgraph has more than one root: {roots}"
return roots[0]
@staticmethod
def merge_graphs(
graph: MultiDiGraph,
) -> Tuple[List[str], GraphAccess, Generator[Tuple[str, GraphAccess], None, None]]:
"""
Find all merge graphs in the provided graph.
A merge graph is a self contained graph under a node which is marked with replace=true.
Such nodes are replaced with the replace node in the database.
Example:
A -> B -> C(replace=true) -> E -> E1 -> E2
-> F -> F1
-> D(replace=true) -> G -> G1 -> G2 -> G3 -> G4
This will result in 2 merge roots:
C: [A, B]
D: [A, B]
Note that all successors of a merge node that is also a predecessors of the merge node is sorted out.
:param graph: the incoming multi graph update.
:return: the list of all merge roots, the expected parent graph and all merge root graphs.
"""
# Find replace nodes: all nodes that are marked as replace node.
# This method returns all replace roots as key, with the respective predecessors nodes as value.
def replace_roots() -> Dict[NodeId, Set[NodeId]]:
graph_root = GraphAccess.root_id(graph)
replace_nodes: Dict[NodeId, Json] = {
node_id: data for node_id, data in graph.nodes(data=True) if data.get("replace", False)
}
assert (
len(replace_nodes) > 0
), "No replace nodes provided in the graph. Mark at least one node with replace=true!"
result: Dict[NodeId, Set[NodeId]] = {}
for node, data in replace_nodes.items():
kind = GraphResolver.resolved_kind(data)
assert (
kind is not None
), f"Node {node} is marked as replace node, but the kind is not resolved during import!"
# compute the shortest path from root to here
pres: Set[NodeId] = reduce(
lambda res, p: {*res, *p}, all_shortest_paths(graph, graph_root, node), set[NodeId]()
)
result[node] = pres
# make sure there is no replace node beyond another replace node
rs = result.copy()
for node in rs:
for nid, parent_nodes in rs.items():
if nid != node and node in parent_nodes:
log.info(f"Node {nid} marked as replace, but is child of another replace node {node}. Ignore.")
result.pop(nid, None)
return result
# Walk the graph from given starting node and return all successors.
# A successor which is also a predecessors is not followed.
def sub_graph_nodes(from_node: NodeId, parent_ids: Set[NodeId]) -> Set[NodeId]:
to_visit = [from_node]
visited: Set[NodeId] = {from_node}
def successors(node: NodeId) -> List[NodeId]:
return [a for a in graph.successors(node) if a not in visited and a not in parent_ids]
while to_visit:
to_visit = reduce(lambda li, node: li + successors(node), to_visit, list[NodeId]())
visited.update(to_visit)
return visited
# Create a generator for all given merge roots by:
# - creating the set of all successors
# - creating a subgraph which contains all predecessors and all succors
# - all predecessors are marked as visited
# - all predecessors edges are marked as visited
# This way it is possible to have nodes in the graph that will not be touched by the update
# while edges will be created from successors of the merge node to predecessors of the merge node.
def merge_sub_graphs(
root_nodes: Dict[NodeId, Set[NodeId]], parent_nodes: Set[NodeId], parent_edges: Set[EdgeKey]
) -> Generator[Tuple[NodeId, GraphAccess], None, None]:
all_successors: Set[NodeId] = set()
for root, predecessors in root_nodes.items():
successors: Set[NodeId] = sub_graph_nodes(root, predecessors)
# make sure nodes are not "mixed" between different merge nodes
overlap = successors & all_successors
if overlap:
raise AttributeError(f"Nodes are referenced in more than one merge node: {overlap}")
all_successors.update(successors)
# create subgraph with all successors and all parents, where all parents are already marked as visited
sub = GraphAccess(graph.subgraph(successors), root, parent_nodes, parent_edges)
yield root, sub
GraphAccess(graph).resolve() # resolve graph references
roots = replace_roots()
parents: Set[NodeId] = reduce(lambda res, ps: {*res, *ps}, roots.values(), set[NodeId]())
parent_graph = graph.subgraph(parents)
graphs = merge_sub_graphs(roots, parents, set(parent_graph.edges(data="edge_type")))
return list(roots.keys()), GraphAccess(parent_graph, GraphAccess.root_id(graph)), graphs