Skip to content

Commit

Permalink
Outer edge collection support - Part 2/4 (#892)
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Jun 1, 2022
1 parent e2f7a33 commit 941eaca
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 177 deletions.
6 changes: 3 additions & 3 deletions resotocore/resotocore/async_extensions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Optional
from typing import Any, Callable, Optional, cast

# Global bounded thread pool to bridge sync io with asyncio.
GlobalAsyncPool: Optional[ThreadPoolExecutor] = None


async def run_async(sync_func, *args: Any, **kwargs: Any) -> Any: # type: ignore
async def run_async(sync_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
global GlobalAsyncPool # pylint: disable=global-statement
if GlobalAsyncPool is None:
# The maximum number of threads is defined explicitly here, since the default is very limited.
GlobalAsyncPool = ThreadPoolExecutor(1024, "async") # pylint: disable=consider-using-with
# run in executor does not allow passing kwargs. apply them partially here if defined
fn_with_args = sync_func if not kwargs else partial(sync_func, **kwargs)
fn_with_args = cast(Callable[..., Any], sync_func if not kwargs else partial(sync_func, **kwargs))
return await asyncio.get_event_loop().run_in_executor(GlobalAsyncPool, fn_with_args, *args)
7 changes: 7 additions & 0 deletions resotocore/resotocore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from resotocore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB
from resotocore.db.jobdb import job_db
from resotocore.db.modeldb import ModelDb, model_db
from resotocore.db.deferred_edge_db import OuterEdgeDb, outer_edge_db
from resotocore.db.runningtaskdb import running_task_db
from resotocore.db.subscriberdb import subscriber_db
from resotocore.db.templatedb import template_entity_db
Expand All @@ -43,6 +44,7 @@ def __init__(
subscriber_name: str = "subscribers",
running_task_name: str = "running_tasks",
job_name: str = "jobs",
deferred_edge_name: str = "deferred_outer_edges",
config_entity: str = "configs",
config_validation_entity: str = "config_validation",
configs_model: str = "configs_model",
Expand All @@ -55,6 +57,7 @@ def __init__(
self.model_db = EventEntityDb(model_db(self.db, model_name), event_sender, model_name)
self.subscribers_db = EventEntityDb(subscriber_db(self.db, subscriber_name), event_sender, subscriber_name)
self.running_task_db = running_task_db(self.db, running_task_name)
self.pending_outer_edge_db = outer_edge_db(self.db, deferred_edge_name)
self.job_db = job_db(self.db, job_name)
self.config_entity_db = config_entity_db(self.db, config_entity)
self.config_validation_entity_db = config_validation_entity_db(self.db, config_validation_entity)
Expand All @@ -73,6 +76,7 @@ async def start(self) -> None:
await self.config_validation_entity_db.create_update_schema()
await self.configs_model_db.create_update_schema()
await self.template_entity_db.create_update_schema()
await self.pending_outer_edge_db.create_update_schema()
for graph in self.database.graphs():
log.info(f'Found graph: {graph["name"]}')
db = self.get_graph_db(graph["name"])
Expand Down Expand Up @@ -116,6 +120,9 @@ def get_graph_db(self, name: str, no_check: bool = False) -> GraphDB:
def get_model_db(self) -> ModelDb:
return self.model_db

def get_pending_outer_edge_db(self) -> OuterEdgeDb:
return self.pending_outer_edge_db

async def check_outdated_updates(self) -> None:
now = datetime.now(timezone.utc)
for db in self.graph_dbs.values():
Expand Down
20 changes: 20 additions & 0 deletions resotocore/resotocore/db/deferred_edge_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.db.entitydb import EntityDb, ArangoEntityDb
from resotocore.model.graph_access import DeferredEdge
from resotocore.ids import TaskId
from typing import List


@dataclass
class PendingDeferredEdges:
task_id: TaskId
graph: str
edges: List[DeferredEdge]


OuterEdgeDb = EntityDb[TaskId, PendingDeferredEdges]


def outer_edge_db(db: AsyncArangoDB, collection: str) -> ArangoEntityDb[TaskId, PendingDeferredEdges]:
return ArangoEntityDb(db, collection, PendingDeferredEdges, lambda k: k.task_id)
24 changes: 17 additions & 7 deletions resotocore/resotocore/model/db_updater.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import asyncio
import json
import logging
Expand All @@ -19,11 +20,13 @@
from resotocore.db.db_access import DbAccess
from resotocore.db.graphdb import GraphDB
from resotocore.db.model import GraphUpdate
from resotocore.db.deferred_edge_db import PendingDeferredEdges
from resotocore.dependencies import db_access, setup_process, reset_process_start_method
from resotocore.error import ImportAborted
from resotocore.model.graph_access import GraphBuilder
from resotocore.model.model import Model
from resotocore.types import Json
from resotocore.ids import TaskId
from resotocore.util import utc, uuid_str, shutdown_process

log = logging.getLogger(__name__)
Expand All @@ -46,6 +49,7 @@ class ReadElement(ProcessAction):
"""

elements: List[Union[bytes, Json]]
task_id: Optional[str]

def jsons(self) -> Generator[Json, Any, None]:
return (e if isinstance(e, dict) else json.loads(e) for e in self.elements)
Expand All @@ -61,6 +65,7 @@ class MergeGraph(ProcessAction):
graph: str
change_id: str
is_batch: bool = False
task_id: Optional[TaskId] = None


@dataclass
Expand Down Expand Up @@ -110,7 +115,7 @@ class DbUpdaterProcess(Process):
The result is either an exception in case of failure or a graph update in success case.
"""

def __init__(self, read_queue: Queue, write_queue: Queue, config: CoreConfig) -> None: # type: ignore
def __init__(self, read_queue: Queue[ProcessAction], write_queue: Queue[ProcessAction], config: CoreConfig) -> None:
super().__init__(name="merge_update")
self.read_queue = read_queue
self.write_queue = write_queue
Expand All @@ -120,7 +125,7 @@ def next_action(self) -> ProcessAction:
try:
# graph is read into memory. If the sender does not send data in a given amount of time,
# we raise an exception and abort the update.
return self.read_queue.get(True, 90) # type: ignore
return self.read_queue.get(True, 90)
except Empty as ex:
raise ImportAborted("Merge process did not receive any data for more than 90 seconds. Abort.") from ex

Expand All @@ -140,7 +145,11 @@ async def merge_graph(self, db: DbAccess) -> GraphUpdate: # type: ignore
log.debug("Graph read into memory")
builder.check_complete()
graphdb = db.get_graph_db(nxt.graph)
outer_edge_db = db.get_pending_outer_edge_db()
_, result = await graphdb.merge_graph(builder.graph, model, nxt.change_id, nxt.is_batch)
if nxt.task_id and builder.deferred_edges:
await outer_edge_db.update(PendingDeferredEdges(nxt.task_id, nxt.graph, builder.deferred_edges))
log.debug(f"Updated {len(builder.deferred_edges)} pending outer edges for collect task {nxt.task_id}")
return result

async def setup_and_merge(self) -> GraphUpdate:
Expand Down Expand Up @@ -175,10 +184,11 @@ async def merge_graph_process(
content: AsyncGenerator[Union[bytes, Json], None],
max_wait: timedelta,
maybe_batch: Optional[str],
task_id: Optional[TaskId],
) -> GraphUpdate:
change_id = maybe_batch if maybe_batch else uuid_str()
write = Queue() # type: ignore
read = Queue() # type: ignore
write: Queue[ProcessAction] = Queue()
read: Queue[ProcessAction] = Queue()
updater = DbUpdaterProcess(write, read, config) # the process reads from our write queue and vice versa
stale = timedelta(seconds=5).total_seconds() # consider dead communication after this amount of time
deadline = utc() + max_wait
Expand Down Expand Up @@ -220,12 +230,12 @@ async def read_forever() -> GraphUpdate:
updater.start()
task = read_results() # concurrently read result queue
chunked: Stream = stream.chunks(content, BatchSize)
async with chunked.stream() as streamer: # pylint: disable=no-member
async with chunked.stream() as streamer:
async for lines in streamer:
if not await send_to_child(ReadElement(lines)):
if not await send_to_child(ReadElement(lines, task_id)):
# in case the child is dead, we should stop
break
await send_to_child(MergeGraph(db.name, change_id, maybe_batch is not None))
await send_to_child(MergeGraph(db.name, change_id, maybe_batch is not None, task_id))
result = await task # wait for final result
return result
finally:
Expand Down
37 changes: 35 additions & 2 deletions resotocore/resotocore/model/graph_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import re
from collections import namedtuple, defaultdict
from functools import reduce
from typing import Optional, Generator, Any, Dict, List, Set, Tuple
from typing import Optional, Generator, Any, Dict, List, Set, Tuple, Union
from dataclasses import dataclass

from networkx import DiGraph, MultiDiGraph, all_shortest_paths, is_directed_acyclic_graph

from resotocore.model.model import Model, Kind, AnyKind, ComplexKind, ArrayKind, DateTimeKind, DictionaryKind
from resotocore.model.resolve_in_graph import GraphResolver, NodePath, ResolveProp
from resotocore.types import Json
from resotocore.util import utc, utc_str, value_in_path, set_value_in_path, value_in_path_get
from resotocore.model.typed_model import from_js


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,12 +103,24 @@ class Direction:
EdgeKey = namedtuple("EdgeKey", ["from_node", "to_node", "edge_type"])


SearchCriteria = str
NodeSelector = Union[str, SearchCriteria]


@dataclass
class DeferredEdge:
from_node: NodeSelector
to_node: NodeSelector
edge_type: str


class GraphBuilder:
def __init__(self, model: Model):
self.model = model
self.graph = MultiDiGraph()
self.nodes = 0
self.edges = 0
self.deferred_edges: List[DeferredEdge] = []

def add_from_json(self, js: Json) -> None:
if "id" in js and Section.reported in js:
Expand All @@ -119,6 +134,21 @@ def add_from_json(self, js: Json) -> None:
)
elif "from" in js and "to" in js:
self.add_edge(js["from"], js["to"], js.get("edge_type", EdgeType.default))
elif "from_selector" in js and "to_selector" in js:

def parse_selector(js: Json) -> NodeSelector:
if "node_id" in js:
return from_js(js["node_id"], str)
elif "search_criterea" in js:
return from_js(js["search_criteria"], str)
else:
raise AttributeError(f"can't parse edge selector! Got {json.dumps(js)}")

self.add_deferred_connection(
parse_selector(js["from_selector"]),
parse_selector(js["to_selector"]),
js.get("edge_type", EdgeType.default),
)
else:
raise AttributeError(f"Format not understood! Got {json.dumps(js)} which is neither vertex nor edge.")

Expand Down Expand Up @@ -159,6 +189,9 @@ def add_edge(self, from_node: str, to_node: str, edge_type: str) -> None:
key = GraphAccess.edge_key(from_node, to_node, edge_type)
self.graph.add_edge(from_node, to_node, key, edge_type=edge_type)

def add_deferred_connection(self, from_selector: NodeSelector, to_selector: NodeSelector, edge_type: str) -> None:
self.deferred_edges.append(DeferredEdge(from_selector, to_selector, edge_type))

@staticmethod
def content_hash(js: Json, desired: Optional[Json] = None, metadata: Optional[Json] = None) -> str:
sha256 = hashlib.sha256()
Expand Down Expand Up @@ -208,7 +241,7 @@ def check_complete(self) -> None:
for node_id, node in self.graph.nodes(data=True):
assert node.get(Section.reported), f"{node_id} was used in an edge definition but not provided as vertex!"

edge_types = {edge[2] for edge in self.graph.edges(data="edge_type")}
edge_types: Set[str] = {edge[2] for edge in self.graph.edges(data="edge_type")}
al = EdgeType.all
assert not edge_types.difference(al), f"Graph contains unknown edge types! Given: {edge_types}. Known: {al}"
# make sure there is only one root node
Expand Down
10 changes: 8 additions & 2 deletions resotocore/resotocore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,22 +603,28 @@ async def create_graph(self, request: Request) -> StreamResponse:
async def merge_graph(self, request: Request) -> StreamResponse:
log.info("Received merge_graph request")
graph_id = request.match_info.get("graph_id", "resoto")
task_id: Optional[TaskId] = None
if tid := request.headers.get("Resoto-Worker-Task-Id"):
task_id = TaskId(tid)
db = self.db.get_graph_db(graph_id)
it = self.to_line_generator(request)
info = await merge_graph_process(
db, self.event_sender, self.config, it, self.config.graph_update.merge_max_wait_time(), None
db, self.event_sender, self.config, it, self.config.graph_update.merge_max_wait_time(), None, task_id
)
return web.json_response(to_js(info))

async def update_merge_graph_batch(self, request: Request) -> StreamResponse:
log.info("Received put_sub_graph_batch request")
graph_id = request.match_info.get("graph_id", "resoto")
task_id: Optional[TaskId] = None
if tid := request.headers.get("Resoto-Worker-Task-Id"):
task_id = TaskId(tid)
db = self.db.get_graph_db(graph_id)
rnd = "".join(SystemRandom().choice(string.ascii_letters) for _ in range(12))
batch_id = request.query.get("batch_id", rnd)
it = self.to_line_generator(request)
info = await merge_graph_process(
db, self.event_sender, self.config, it, self.config.graph_update.merge_max_wait_time(), batch_id
db, self.event_sender, self.config, it, self.config.graph_update.merge_max_wait_time(), batch_id, task_id
)
return web.json_response(to_json(info), headers={"BatchId": batch_id})

Expand Down
4 changes: 2 additions & 2 deletions resotocore/tests/resotocore/cli/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,8 @@ async def check_backup(res: Stream) -> None:
async for s in streamer:
assert isinstance(s, str)
assert os.path.exists(s)
# backup should have size between 30k and 200k (adjust size if necessary)
assert 30000 < os.path.getsize(s) < 200000
# backup should have size between 30k and 250k (adjust size if necessary)
assert 30000 < os.path.getsize(s) < 250000
assert only_one
only_one = False

Expand Down
20 changes: 18 additions & 2 deletions resotocore/tests/resotocore/model/db_updater_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
from resotocore.db.graphdb import ArangoGraphDB
from resotocore.db.model import GraphUpdate
from resotocore.dependencies import empty_config
from resotocore.ids import TaskId
from resotocore.model.db_updater import merge_graph_process
from resotocore.model.model import Kind
from resotocore.model.typed_model import to_js
from resotocore.db.deferred_edge_db import outer_edge_db

# noinspection PyUnresolvedReferences
from tests.resotocore.analytics import event_sender
Expand All @@ -37,11 +39,25 @@ async def test_merge_process(
# create sample graph data to insert
graph = create_graph("test")

await outer_edge_db(graph_db.db, "deferred_outer_edges").create_update_schema()

async def iterator() -> AsyncGenerator[bytes, None]:
for node in graph.nodes():
yield bytes(json.dumps(graph.nodes[node]), "utf-8")
for from_node, to_node, data in graph.edges(data=True):
yield bytes(json.dumps({"from": from_node, "to": to_node, "edge_type": data["edge_type"]}), "utf-8")

result = await merge_graph_process(graph_db, event_sender, config, iterator(), timedelta(seconds=30), None)
yield bytes(
json.dumps(
{"from_selector": {"node_id": "id_123"}, "to_selector": {"node_id": "id_456"}, "edge_type": "delete"}
),
"utf-8",
)

result = await merge_graph_process(
graph_db, event_sender, config, iterator(), timedelta(seconds=30), None, TaskId("test_task_123")
)
assert result == GraphUpdate(112, 1, 0, 212, 0, 0)
elem = graph_db.db.collection("deferred_outer_edges").all().next()
assert elem["_key"] == "test_task_123"
assert elem["task_id"] == "test_task_123"
assert elem["edges"][0] == {"from_node": "id_123", "to_node": "id_456", "edge_type": "delete"}
9 changes: 5 additions & 4 deletions resotolib/resotolib/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
remove_event_listener,
)
from prometheus_client import Summary
from typing import Dict, List, Tuple
from typing import Dict, Iterator, List, Tuple, Any, Optional
from io import BytesIO
from dataclasses import fields
from typeguard import check_type
from time import time
from collections import defaultdict, namedtuple

Json = Dict[str, Any]

metrics_graph_search = Summary("resoto_graph_search_seconds", "Time it took the Graph search() method")
metrics_graph_searchall = Summary("resoto_graph_searchall_seconds", "Time it took the Graph searchall() method")
Expand Down Expand Up @@ -323,7 +324,7 @@ def resolve_deferred_connections(self):
if isinstance(node, BaseResource):
node.resolve_deferred_connections(self)

def export_model(self) -> List:
def export_model(self) -> List[Json]:
"""Return the graph node dataclass model in resotocore format"""
classes = set()
for node in self.nodes:
Expand Down Expand Up @@ -708,7 +709,7 @@ def __init__(
self,
graph: Graph,
delete_tempfile: bool = True,
tempdir: str = None,
tempdir: Optional[str] = None,
graph_merge_kind: GraphMergeKind = GraphMergeKind.cloud,
):
self.graph = graph
Expand Down Expand Up @@ -738,7 +739,7 @@ def __del__(self):
except Exception:
pass

def __iter__(self):
def __iter__(self) -> Iterator[bytes]:
if not self.graph_exported:
self.export_graph()
start_time = time()
Expand Down
Loading

0 comments on commit 941eaca

Please sign in to comment.