Skip to content

Commit

Permalink
[resotocore][fix] Use the right model during the graph merge (#1720)
Browse files Browse the repository at this point in the history
  • Loading branch information
meln1k committed Jul 14, 2023
1 parent 03f40d8 commit f2cee16
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 30 deletions.
10 changes: 2 additions & 8 deletions resotocore/resotocore/db/db_access.py
Expand Up @@ -46,7 +46,6 @@ def __init__(
event_sender: AnalyticsEventSender,
adjust_node: AdjustNode,
config: CoreConfig,
model_name: str = "model",
subscriber_name: str = "subscribers",
running_task_name: str = "running_tasks",
job_name: str = "jobs",
Expand All @@ -61,7 +60,6 @@ def __init__(
self.database = arango_database
self.db = AsyncArangoDB(arango_database)
self.adjust_node = adjust_node
self.model_db = EventEntityDb(model_db(self.db, model_name), event_sender, model_name)
self.graph_model_dbs: Dict[GraphName, ModelDb] = {}
self.subscribers_db = EventEntityDb(subscriber_db(self.db, subscriber_name), event_sender, subscriber_name)
self.system_data_db = SystemDataDb(self.db)
Expand All @@ -78,7 +76,6 @@ def __init__(
self.cleaner = Periodic("outdated_updates_cleaner", self.check_outdated_updates, timedelta(seconds=60))

async def start(self) -> None:
await self.model_db.create_update_schema()
await self.subscribers_db.create_update_schema()
await self.running_task_db.create_update_schema()
await self.job_db.create_update_schema()
Expand Down Expand Up @@ -117,7 +114,7 @@ async def create_graph(self, name: GraphName, validate_name: bool = True) -> Gra
return db

async def delete_graph(self, name: GraphName) -> None:
def delete(name: GraphName) -> None:
def delete() -> None:
db = self.database
if db.has_graph(name):
# delete arrangodb graph
Expand All @@ -136,7 +133,7 @@ def delete(name: GraphName) -> None:
db.delete_collection(coll["name"], ignore_missing=True)
self.graph_dbs.pop(name, None)

return await run_async(delete, name)
return await run_async(delete)

async def delete_graph_model(self, graph_name: GraphName) -> None:
await self.db.delete_collection(self.graph_model_name(graph_name), ignore_missing=True)
Expand All @@ -156,9 +153,6 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB:
self.graph_dbs[name] = event_db
return event_db

def get_model_db(self) -> ModelDb:
return self.model_db

async def get_graph_model_db(self, graph_name: GraphName) -> ModelDb:
if db := self.graph_model_dbs.get(graph_name):
return db
Expand Down
18 changes: 12 additions & 6 deletions resotocore/resotocore/model/db_updater.py
Expand Up @@ -27,14 +27,13 @@
from resotocore.db.deferred_edge_db import PendingDeferredEdges
from resotocore.db.graphdb import GraphDB
from resotocore.db.model import GraphUpdate
from resotocore.system_start import db_access, setup_process, reset_process_start_method
from resotocore.error import ImportAborted
from resotocore.ids import TaskId, GraphName
from resotocore.message_bus import MessageBus, CoreMessage
from resotocore.model.graph_access import GraphBuilder
from resotocore.model.model import Model
from resotocore.model.model_handler import ModelHandlerDB, ModelHandler
from resotocore.service import Service
from resotocore.system_start import db_access, setup_process, reset_process_start_method
from resotocore.types import Json
from resotocore.util import utc, uuid_str, shutdown_process

Expand Down Expand Up @@ -140,11 +139,18 @@ class DbUpdaterProcess(Process):
The result is either an exception in case of failure or a graph update in success case.
"""

def __init__(self, read_queue: Queue[ProcessAction], write_queue: Queue[ProcessAction], config: CoreConfig) -> None:
def __init__(
self,
read_queue: Queue[ProcessAction],
write_queue: Queue[ProcessAction],
config: CoreConfig,
graph_name: GraphName,
) -> None:
super().__init__(name="merge_update")
self.read_queue = read_queue
self.write_queue = write_queue
self.config = config
self.graph_name = graph_name

def next_action(self) -> ProcessAction:
try:
Expand All @@ -155,7 +161,8 @@ def next_action(self) -> ProcessAction:
raise ImportAborted("Merge process did not receive any data for more than 90 seconds. Abort.") from ex

async def merge_graph(self, db: DbAccess) -> GraphUpdate: # type: ignore
model = Model.from_kinds([kind async for kind in db.model_db.all()])
model_handler = ModelHandlerDB(db)
model = await model_handler.load_model(self.graph_name)
builder = GraphBuilder(model)
nxt = self.next_action()
if isinstance(nxt, ReadFile):
Expand All @@ -179,7 +186,6 @@ async def merge_graph(self, db: DbAccess) -> GraphUpdate: # type: ignore
await graphdb.insert_usage_data(builder.usage)
_, result = await graphdb.merge_graph(builder.graph, model, builder.at, nxt.change_id, nxt.is_batch)
# sizes of model entries have been adjusted during the merge. Update the model in the db.
model_handler = ModelHandlerDB(db, "")
await model_handler.update_model(graphdb.name, list(model.kinds.values()))
if nxt.task_id and builder.deferred_edges:
await outer_edge_db.update(PendingDeferredEdges(nxt.task_id, utc(), nxt.graph, builder.deferred_edges))
Expand Down Expand Up @@ -317,7 +323,7 @@ async def __merge_graph_process(
change_id = maybe_batch if maybe_batch else uuid_str()
write: Queue[ProcessAction] = Queue()
read: Queue[ProcessAction] = Queue()
updater = DbUpdaterProcess(write, read, self.config) # the process communication queue
updater = DbUpdaterProcess(write, read, self.config, db.name) # the process communication queue
stale = timedelta(seconds=5).total_seconds() # consider dead communication after this amount of time
dead_adjusted = False

Expand Down
12 changes: 2 additions & 10 deletions resotocore/resotocore/model/model_handler.py
Expand Up @@ -92,7 +92,7 @@ async def update_model(self, graph_name: GraphName, kinds: List[Kind]) -> Model:


class ModelHandlerDB(ModelHandler, Service):
def __init__(self, db_access: DbAccess, plantuml_server: str):
def __init__(self, db_access: DbAccess, plantuml_server: str = "https://plantuml.resoto.org"):
self.db_access = db_access
self.plantuml_server = plantuml_server
self.__loaded_model: Dict[GraphName, Model] = {}
Expand All @@ -103,13 +103,7 @@ async def load_model(self, graph_name: GraphName, *, force: bool = False) -> Mod
return model
else:
graph_model_db = await self.db_access.get_graph_model_db(graph_name)
model_db = self.db_access.get_model_db()
# check the new implementation for the kinds
model_kinds = [kind async for kind in graph_model_db.all()]
# if nothing found and the graph is the legacy default one, look in the legacy implementation
if not model_kinds and graph_name == self.default_legacy_graph_name:
model_kinds = [kind async for kind in model_db.all()]
model = Model.from_kinds(model_kinds)
model = Model.from_kinds([kind async for kind in graph_model_db.all()])
self.__loaded_model[graph_name] = model
return model

Expand Down Expand Up @@ -238,8 +232,6 @@ async def update_model(self, graph_name: GraphName, kinds: List[Kind]) -> Model:
# store all updated kinds
db = await self.db_access.get_graph_model_db(graph_name)
await db.update_many(kinds)
if graph_name == self.default_legacy_graph_name:
await self.db_access.get_model_db().update_many(kinds)
# unset loaded model
self.__loaded_model[graph_name] = updated
return updated
2 changes: 1 addition & 1 deletion resotocore/tests/resotocore/model/db_updater_test.py
Expand Up @@ -31,7 +31,7 @@ async def test_merge_process(
# wipe any existing data
await graph_db.wipe()
# store the model in db, so it can be loaded by the sub process
graph_db.db.collection("model").insert_many([to_js(a) for a in foo_kinds])
graph_db.db.collection(f"{graph_db.name}_model").insert_many([to_js(a) for a in foo_kinds])
# define args to parse for the sub process
config = empty_config(["--graphdb-username", "test", "--graphdb-password", "test", "--graphdb-database", "test"])
# create sample graph data to insert
Expand Down
6 changes: 1 addition & 5 deletions resotocore/tests/resotocore/web/api_test.py
Expand Up @@ -66,11 +66,7 @@ async def create_core_client(
additional_args = ["--psk", psk] if psk else []

# wipe and cleanly import the test model
await db_access.model_db.create_update_schema()
await db_access.model_db.wipe()
await db_access.model_db.update_many(foo_kinds)

for graph_name in [g, "test", "hello", "bonjour", "foo"]:
for graph_name in [g, "test", "hello", "bonjour", "foo", "resoto"]:
db = await db_access.get_graph_model_db(GraphName(graph_name))
await db.create_update_schema()
await db.wipe()
Expand Down

0 comments on commit f2cee16

Please sign in to comment.