Skip to content

Commit

Permalink
[resotocore][fix] Deferred Outer Edge handling (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Jul 21, 2023
1 parent 9171a14 commit b49e238
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 106 deletions.
32 changes: 17 additions & 15 deletions resotocore/resotocore/action_handlers/merge_outer_edge_handler.py
Expand Up @@ -42,11 +42,12 @@ def __init__(
self.model_handler = model_handler

async def merge_outer_edges(self, task_id: TaskId) -> Tuple[int, int]:
pending_outer_edge_db = self.db_access.pending_deferred_edge_db
pending_edges = await pending_outer_edge_db.get(task_id)
deferred_outer_edge_db = self.db_access.deferred_outer_edge_db
pending_edges = await deferred_outer_edge_db.all_for_task(task_id)
if pending_edges:
graph_db = self.db_access.get_graph_db(pending_edges.graph)
model = await self.model_handler.load_model(pending_edges.graph)
first = pending_edges[0]
graph_db = self.db_access.get_graph_db(first.graph)
model = await self.model_handler.load_model(first.graph)

async def find_node_id(selector: NodeSelector) -> Optional[NodeId]:
try:
Expand All @@ -70,28 +71,29 @@ async def find_node_id(selector: NodeSelector) -> Optional[NodeId]:
return None

edges: List[Tuple[NodeId, NodeId, str]] = []
for edge in pending_edges.edges:
from_id = await find_node_id(edge.from_node)
to_id = await find_node_id(edge.to_node)
if from_id and to_id:
edges.append((from_id, to_id, edge.edge_type))
for pending_edge in pending_edges:
for edge in pending_edge.edges:
from_id = await find_node_id(edge.from_node)
to_id = await find_node_id(edge.to_node)
if from_id and to_id:
edges.append((from_id, to_id, edge.edge_type))

updated, deleted = await graph_db.update_deferred_edges(edges, pending_edges.created_at)
updated, deleted = await graph_db.update_deferred_edges(edges, first.created_at)

log.info(
f"MergeOuterEdgesHandler: updated {updated}/{len(pending_edges.edges)},"
f"MergeOuterEdgesHandler: updated {updated}/{len(edges)},"
f" deleted {deleted} edges in task id {task_id}"
)

return (updated, deleted)
return updated, deleted
else:
log.info(f"MergeOuterEdgesHandler: no pending edges for task id {task_id} found.")

return (0, 0)
return 0, 0

async def mark_done(self, task_id: TaskId) -> None:
pending_outer_edge_db = self.db_access.pending_deferred_edge_db
await pending_outer_edge_db.delete(task_id)
deferred_outer_edge_db = self.db_access.deferred_outer_edge_db
await deferred_outer_edge_db.delete_for_task(task_id)

async def __handle_events(self, subscription_done: Future[None]) -> None:
async with self.message_bus.subscribe(subscriber_id, [merge_outer_edges]) as events:
Expand Down
8 changes: 4 additions & 4 deletions resotocore/resotocore/db/db_access.py
Expand Up @@ -17,7 +17,7 @@
from resotocore.db.arangodb_extensions import ArangoHTTPClient
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.db.configdb import config_entity_db, config_validation_entity_db
from resotocore.db.deferred_edge_db import pending_deferred_edge_db
from resotocore.db.deferredouteredgedb import deferred_outer_edge_db
from resotocore.db.entitydb import EventEntityDb
from resotocore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB
from resotocore.db.jobdb import job_db
Expand Down Expand Up @@ -49,7 +49,7 @@ def __init__(
subscriber_name: str = "subscribers",
running_task_name: str = "running_tasks",
job_name: str = "jobs",
deferred_edge_name: str = "deferred_outer_edges",
deferred_outer_edge_name: str = "deferred_outer_edges",
config_entity: str = "configs",
config_validation_entity: str = "config_validation",
configs_model: str = "configs_model",
Expand All @@ -64,7 +64,7 @@ def __init__(
self.subscribers_db = EventEntityDb(subscriber_db(self.db, subscriber_name), event_sender, subscriber_name)
self.system_data_db = SystemDataDb(self.db)
self.running_task_db = running_task_db(self.db, running_task_name)
self.pending_deferred_edge_db = pending_deferred_edge_db(self.db, deferred_edge_name)
self.deferred_outer_edge_db = deferred_outer_edge_db(self.db, deferred_outer_edge_name)
self.job_db = job_db(self.db, job_name)
self.config_entity_db = config_entity_db(self.db, config_entity)
self.config_validation_entity_db = config_validation_entity_db(self.db, config_validation_entity)
Expand All @@ -83,7 +83,7 @@ async def start(self) -> None:
await self.config_validation_entity_db.create_update_schema()
await self.configs_model_db.create_update_schema()
await self.template_entity_db.create_update_schema()
await self.pending_deferred_edge_db.create_update_schema()
await self.deferred_outer_edge_db.create_update_schema()
await self.package_entity_db.create_update_schema()
for graph in cast(List[Json], self.database.graphs()):
graph_name = GraphName(graph["name"])
Expand Down
39 changes: 0 additions & 39 deletions resotocore/resotocore/db/deferred_edge_db.py

This file was deleted.

60 changes: 60 additions & 0 deletions resotocore/resotocore/db/deferredouteredgedb.py
@@ -0,0 +1,60 @@
from attrs import define
from datetime import datetime
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.db.entitydb import ArangoEntityDb
from resotocore.model.graph_access import DeferredEdge
from resotocore.ids import TaskId
from typing import List, cast
import logging

from resotocore.model.typed_model import from_js
from resotocore.types import Json
from resotocore.ids import GraphName


@define
class DeferredOuterEdges:
id: str
change_id: str
task_id: TaskId
created_at: datetime # update the corresponding TTL index when changing this name
graph: GraphName
edges: List[DeferredEdge]


TWO_HOURS = 7200


log = logging.getLogger(__name__)


class DeferredOuterEdgeDb(ArangoEntityDb[str, DeferredOuterEdges]):
async def all_for_task(self, task_id: TaskId) -> List[DeferredOuterEdges]:
result = []
async with await self.db.aql_cursor(
f"FOR e IN `{self.collection_name}` FILTER e.task_id == @task_id RETURN e", bind_vars={"task_id": task_id}
) as cursor:
async for doc in cursor:
edges = from_js(doc, DeferredOuterEdges)
result.append(edges)
return result

async def delete_for_task(self, task_id: TaskId) -> None:
async with await self.db.aql_cursor(
f"FOR e IN `{self.collection_name}` FILTER e.task_id == @task_id REMOVE e IN `{self.collection_name}`",
bind_vars={"task_id": task_id},
) as cursor:
async for _ in cursor:
pass

async def create_update_schema(self) -> None:
await super().create_update_schema()
ttl_index_name = "deferred_edges_expiration_index"
collection = self.db.collection(self.collection_name)
if ttl_index_name not in {idx["name"] for idx in cast(List[Json], collection.indexes())}:
log.info(f"Add index {ttl_index_name} on {collection.name}")
collection.add_ttl_index(["created_at"], TWO_HOURS, "deferred_edges_expiration_index")


def deferred_outer_edge_db(db: AsyncArangoDB, collection: str) -> DeferredOuterEdgeDb:
return DeferredOuterEdgeDb(db, collection, DeferredOuterEdges, lambda k: k.id)
8 changes: 5 additions & 3 deletions resotocore/resotocore/model/db_updater.py
Expand Up @@ -24,7 +24,7 @@
from resotocore.async_extensions import run_async
from resotocore.core_config import CoreConfig
from resotocore.db.db_access import DbAccess
from resotocore.db.deferred_edge_db import PendingDeferredEdges
from resotocore.db.deferredouteredgedb import DeferredOuterEdges
from resotocore.db.graphdb import GraphDB
from resotocore.db.model import GraphUpdate
from resotocore.error import ImportAborted
Expand Down Expand Up @@ -184,13 +184,15 @@ async def merge_graph(self, db: DbAccess) -> GraphUpdate: # type: ignore
log.debug("Graph read into memory")
builder.check_complete()
graphdb = db.get_graph_db(nxt.graph)
outer_edge_db = db.pending_deferred_edge_db
outer_edge_db = db.deferred_outer_edge_db
await graphdb.insert_usage_data(builder.usage)
_, result = await graphdb.merge_graph(builder.graph, model, nxt.change_id, nxt.is_batch)
# sizes of model entries have been adjusted during the merge. Update the model in the db.
await model_handler.update_model(graphdb.name, list(model.kinds.values()))
if nxt.task_id and builder.deferred_edges:
await outer_edge_db.update(PendingDeferredEdges(nxt.task_id, utc(), nxt.graph, builder.deferred_edges))
await outer_edge_db.update(
DeferredOuterEdges(uuid_str(), nxt.change_id, nxt.task_id, utc(), nxt.graph, builder.deferred_edges)
)
log.debug(f"Updated {len(builder.deferred_edges)} pending outer edges for collect task {nxt.task_id}")
return result

Expand Down
Expand Up @@ -5,7 +5,7 @@

from resotocore.action_handlers.merge_outer_edge_handler import MergeOuterEdgesHandler
from resotocore.db.db_access import DbAccess
from resotocore.db.deferred_edge_db import PendingDeferredEdges
from resotocore.db.deferredouteredgedb import DeferredOuterEdges
from resotocore.db.graphdb import ArangoGraphDB
from resotocore.db.model import QueryModel
from resotocore.ids import TaskId, NodeId
Expand Down Expand Up @@ -62,17 +62,11 @@ async def test_merge_outer_edges(
await graph_db.create_node(foo_model, id1, to_json(Foo("id1", "foo")), NodeId("root"))
await graph_db.create_node(foo_model, id3, to_json(Foo("id3", "foo")), NodeId("root"))
await graph_db.create_node(foo_model, id2, to_json(Bla("id2", "bla")), NodeId("root"))
await db_access.pending_deferred_edge_db.create_update_schema()

await db_access.pending_deferred_edge_db.update(
PendingDeferredEdges(
TaskId("task123"),
now,
graph_db.name,
[
DeferredEdge(ByNodeId(id1), BySearchCriteria("is(bla)"), EdgeTypes.default),
],
)
await db_access.deferred_outer_edge_db.create_update_schema()

e1 = DeferredEdge(ByNodeId(id1), BySearchCriteria("is(bla)"), EdgeTypes.default)
await db_access.deferred_outer_edge_db.update(
DeferredOuterEdges("t0", "c0", TaskId("task123"), now, graph_db.name, [e1])
)
await merge_handler.merge_outer_edges(TaskId("task123"))

Expand All @@ -84,15 +78,9 @@ async def test_merge_outer_edges(

new_now = now + timedelta(minutes=10)

await db_access.pending_deferred_edge_db.update(
PendingDeferredEdges(
TaskId("task456"),
new_now,
graph_db.name,
[
DeferredEdge(ByNodeId(id2), ByNodeId(id1), EdgeTypes.default),
],
)
e2 = DeferredEdge(ByNodeId(id2), ByNodeId(id1), EdgeTypes.default)
await db_access.deferred_outer_edge_db.update(
DeferredOuterEdges("t1", "c1", TaskId("task456"), new_now, graph_db.name, [e2])
)
await merge_handler.merge_outer_edges(TaskId("task456"))

Expand All @@ -105,15 +93,8 @@ async def test_merge_outer_edges(

new_now_2 = now + timedelta(minutes=10)

await db_access.pending_deferred_edge_db.update(
PendingDeferredEdges(
TaskId("task789"),
new_now_2,
graph_db.name,
[
DeferredEdge(ByNodeId(id2), ByNodeId(id1), EdgeTypes.default),
],
)
await db_access.deferred_outer_edge_db.update(
DeferredOuterEdges("t2", "c4", TaskId("task789"), new_now_2, graph_db.name, [e2])
)
updated, deleted = await merge_handler.merge_outer_edges(TaskId("task789"))
# here we also implicitly test that the timestamp was updated, because otherwise the edge
Expand Down
11 changes: 10 additions & 1 deletion resotocore/tests/resotocore/conftest.py
Expand Up @@ -29,6 +29,7 @@
alias_names,
all_commands,
)
from resotocore.db.deferredouteredgedb import DeferredOuterEdgeDb
from resotocore.dependencies import Dependencies
from resotocore.config import ConfigHandler, ConfigEntity, ConfigValidation, ConfigOverride
from resotocore.config.config_handler_service import ConfigHandlerService
Expand All @@ -44,7 +45,7 @@
SnapshotsScheduleConfig,
RunConfig,
)
from resotocore.db import runningtaskdb, SystemData
from resotocore.db import runningtaskdb, SystemData, deferredouteredgedb
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.db.db_access import DbAccess
from resotocore.db.graphdb import ArangoGraphDB, EventGraphDB
Expand Down Expand Up @@ -200,6 +201,14 @@ async def running_task_db(async_db: AsyncArangoDB) -> RunningTaskDb:
return task_db


@fixture
async def pending_deferred_edge_db(async_db: AsyncArangoDB) -> DeferredOuterEdgeDb:
edges_db = deferredouteredgedb.deferred_outer_edge_db(async_db, "pending_deferred_edge")
await edges_db.create_update_schema()
await edges_db.wipe()
return edges_db


@fixture()
def db_access(graph_db: ArangoGraphDB) -> DbAccess:
access = DbAccess(graph_db.db.db, NoEventSender(), NoAdjust(), empty_config())
Expand Down
64 changes: 64 additions & 0 deletions resotocore/tests/resotocore/db/deferredouteredgedb_test.py
@@ -0,0 +1,64 @@
from datetime import datetime

import pytest
from pytest import fixture
from typing import List

from resotocore.db.deferredouteredgedb import DeferredOuterEdges, DeferredOuterEdgeDb
from resotocore.ids import TaskId, GraphName, NodeId
from resotocore.model.graph_access import DeferredEdge, ByNodeId, EdgeTypes


@fixture
def instances() -> List[DeferredOuterEdges]:
return [
DeferredOuterEdges(
id="1",
change_id="c1",
task_id=TaskId("t1"),
created_at=datetime(2021, 1, 1),
graph=GraphName("test"),
edges=[DeferredEdge(ByNodeId(NodeId("e1")), ByNodeId(NodeId("e2")), EdgeTypes.default)],
),
DeferredOuterEdges(
id="2",
change_id="c1",
task_id=TaskId("t1"),
created_at=datetime(2021, 1, 1),
graph=GraphName("test"),
edges=[DeferredEdge(ByNodeId(NodeId("e2")), ByNodeId(NodeId("e3")), EdgeTypes.default)],
),
DeferredOuterEdges(
id="3",
change_id="c2",
task_id=TaskId("t2"),
created_at=datetime(2021, 1, 1),
graph=GraphName("test"),
edges=[DeferredEdge(ByNodeId(NodeId("e2")), ByNodeId(NodeId("e3")), EdgeTypes.default)],
),
]


@pytest.mark.asyncio
async def test_all_by_task_id(
pending_deferred_edge_db: DeferredOuterEdgeDb, instances: List[DeferredOuterEdges]
) -> None:
await pending_deferred_edge_db.update_many(instances)
assert len(await pending_deferred_edge_db.all_for_task(TaskId("t1"))) == 2
assert len(await pending_deferred_edge_db.all_for_task(TaskId("t2"))) == 1


@pytest.mark.asyncio
async def test_remove_by_task_id(
pending_deferred_edge_db: DeferredOuterEdgeDb, instances: List[DeferredOuterEdges]
) -> None:
await pending_deferred_edge_db.wipe()
# insert all
await pending_deferred_edge_db.update_many(instances)
# delete t1
await pending_deferred_edge_db.delete_for_task(TaskId("t1"))
assert len(await pending_deferred_edge_db.all_for_task(TaskId("t1"))) == 0
assert len(await pending_deferred_edge_db.all_for_task(TaskId("t2"))) == 1
# delete t2
await pending_deferred_edge_db.delete_for_task(TaskId("t2"))
assert len([n async for n in pending_deferred_edge_db.all()]) == 0

0 comments on commit b49e238

Please sign in to comment.