Skip to content

Commit 526b549

Browse files
authored
[core][feat] Merge deferred edges via API (#2136)
1 parent 327e6a0 commit 526b549

File tree

8 files changed

+116
-60
lines changed

8 files changed

+116
-60
lines changed

fixcore/fixcore/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from urllib3.exceptions import HTTPWarning
1919

2020
from fixcore import version
21-
from fixcore.action_handlers.merge_outer_edge_handler import MergeOuterEdgesHandler
21+
from fixcore.action_handlers.merge_deferred_edge_handler import MergeDeferredEdgesHandler
2222
from fixcore.analytics import CoreEvent, NoEventSender
2323
from fixcore.analytics.posthog import PostHogEventSender
2424
from fixcore.analytics.recurrent_events import emit_recurrent_events
@@ -228,8 +228,8 @@ async def direct_tenant(deps: TenantDependencies) -> None:
228228
)
229229
deps.add(ServiceNames.graph_manager, GraphManager(db, config, core_config_handler, task_handler))
230230
deps.add(
231-
ServiceNames.merge_outer_edges_handler,
232-
MergeOuterEdgesHandler(message_bus, subscriptions, task_handler, db, model),
231+
ServiceNames.merge_deferred_edges_handler,
232+
MergeDeferredEdgesHandler(message_bus, subscriptions, task_handler, db, model),
233233
)
234234
deps.add(
235235
ServiceNames.event_emitter_periodic,

fixcore/fixcore/action_handlers/merge_outer_edge_handler.py renamed to fixcore/fixcore/action_handlers/merge_deferred_edge_handler.py

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from attr import frozen
2+
13
from fixcore.db.model import QueryModel
24
from fixcore.message_bus import MessageBus, Action
35
import logging
@@ -21,10 +23,17 @@
2123
log = logging.getLogger(__name__)
2224

2325
subscriber_id = SubscriberId("fixcore")
24-
merge_outer_edges = "merge_outer_edges"
26+
merge_deferred_edges = "merge_deferred_edges"
27+
2528

29+
@frozen
30+
class DeferredMergeResult:
31+
processed: int
32+
updated: int
33+
deleted: int
2634

27-
class MergeOuterEdgesHandler(Service):
35+
36+
class MergeDeferredEdgesHandler(Service):
2837
def __init__(
2938
self,
3039
message_bus: MessageBus,
@@ -35,18 +44,21 @@ def __init__(
3544
):
3645
super().__init__()
3746
self.message_bus = message_bus
38-
self.merge_outer_edges_listener: Optional[Task[None]] = None
47+
self.merge_deferred_edges_listener: Optional[Task[None]] = None
3948
self.subscription_handler = subscription_handler
4049
self.subscriber: Optional[Subscriber] = None
4150
self.task_handler_service = task_handler_service
4251
self.db_access = db_access
4352
self.model_handler = model_handler
4453

45-
async def merge_outer_edges(self, task_id: TaskId) -> Tuple[int, int]:
54+
async def merge_deferred_edges(self, task_ids: List[TaskId]) -> DeferredMergeResult:
4655
deferred_outer_edge_db = self.db_access.deferred_outer_edge_db
47-
pending_edges = await deferred_outer_edge_db.all_for_task(task_id)
56+
pending_edges = []
57+
for task_id in task_ids:
58+
pending_edges.extend(await deferred_outer_edge_db.all_for_task(task_id))
4859
if pending_edges:
49-
first = pending_edges[0]
60+
processed = 0
61+
first = min(pending_edges, key=lambda x: x.created_at)
5062
graph_db = self.db_access.get_graph_db(first.graph)
5163
model = await self.model_handler.load_model(first.graph)
5264

@@ -76,49 +88,43 @@ async def find_node_id(selector: NodeSelector) -> Optional[NodeId]:
7688
for edge in pending_edge.edges:
7789
from_id = await find_node_id(edge.from_node)
7890
to_id = await find_node_id(edge.to_node)
91+
processed += 1
7992
if from_id and to_id:
8093
edges.append((from_id, to_id, edge.edge_type))
8194

95+
# apply edges in graph
8296
updated, deleted = await graph_db.update_deferred_edges(edges, first.created_at)
83-
84-
log.info(
85-
f"MergeOuterEdgesHandler: updated {updated}/{len(edges)},"
86-
f" deleted {deleted} edges in task id {task_id}"
87-
)
88-
89-
return updated, deleted
97+
# delete processed edge definitions
98+
for task_id in task_ids:
99+
await deferred_outer_edge_db.delete_for_task(task_id)
100+
log.info(f"DeferredEdges: {len(edges)} edges: {updated} updated, {deleted} deleted. ({task_ids})")
101+
return DeferredMergeResult(processed, updated, deleted)
90102
else:
91-
log.info(f"MergeOuterEdgesHandler: no pending edges for task id {task_id} found.")
92-
93-
return 0, 0
94-
95-
async def mark_done(self, task_id: TaskId) -> None:
96-
deferred_outer_edge_db = self.db_access.deferred_outer_edge_db
97-
await deferred_outer_edge_db.delete_for_task(task_id)
103+
log.info(f"MergeOuterEdgesHandler: no pending edges found. ({task_ids})")
104+
return DeferredMergeResult(0, 0, 0)
98105

99106
async def __handle_events(self, subscription_done: Future[None]) -> None:
100-
async with self.message_bus.subscribe(subscriber_id, [merge_outer_edges]) as events:
107+
async with self.message_bus.subscribe(subscriber_id, [merge_deferred_edges]) as events:
101108
subscription_done.set_result(None)
102109
while True:
103110
event = await events.get()
104-
if isinstance(event, Action) and event.message_type == merge_outer_edges:
105-
await self.merge_outer_edges(event.task_id)
106-
await self.mark_done(event.task_id)
111+
if isinstance(event, Action) and event.message_type == merge_deferred_edges:
112+
await self.merge_deferred_edges([event.task_id])
107113
await self.task_handler_service.handle_action_done(event.done(subscriber_id))
108114

109115
async def start(self) -> None:
110116
subscription_done = asyncio.get_event_loop().create_future()
111117
self.subscriber = await self.subscription_handler.add_subscription(
112-
subscriber_id, merge_outer_edges, True, timedelta(seconds=30)
118+
subscriber_id, merge_deferred_edges, True, timedelta(seconds=30)
113119
)
114-
self.merge_outer_edges_listener = asyncio.create_task(
120+
self.merge_deferred_edges_listener = asyncio.create_task(
115121
self.__handle_events(subscription_done), name=subscriber_id
116122
)
117123
await subscription_done
118124

119125
async def stop(self) -> None:
120-
if self.merge_outer_edges_listener:
126+
if self.merge_deferred_edges_listener:
121127
with suppress(Exception):
122-
self.merge_outer_edges_listener.cancel()
128+
self.merge_deferred_edges_listener.cancel()
123129
if self.subscriber:
124-
await self.subscription_handler.remove_subscription(subscriber_id, merge_outer_edges)
130+
await self.subscription_handler.remove_subscription(subscriber_id, merge_deferred_edges)

fixcore/fixcore/dependencies.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class ServiceNames:
7878
infra_apps_runtime = "infra_apps_runtime"
7979
inspector = "inspector"
8080
jwt_signing_key_holder = "jwt_signing_key_holder"
81-
merge_outer_edges_handler = "merge_outer_edges_handler"
81+
merge_deferred_edges_handler = "merge_deferred_edges_handler"
8282
message_bus = "message_bus"
8383
model_handler = "model_handler"
8484
scheduler = "scheduler"

fixcore/fixcore/static/api-doc.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,42 @@ paths:
334334
application/json:
335335
schema:
336336
$ref: "#/components/schemas/GraphUpdate"
337+
/graph/{graph_id}/merge/deferred_edges:
338+
post:
339+
summary: "Merge deferred edges by a given task id"
340+
description: |
341+
All existing deferred edges will be replaced by the definition of all deferred edges of the given task ids.
342+
We might be able to track deferred edges more specifically in the future.
343+
tags:
344+
- graph_management
345+
parameters:
346+
- $ref: "#/components/parameters/graph_id"
347+
requestBody:
348+
description: "The task ids to merge."
349+
required: true
350+
content:
351+
application/json:
352+
schema:
353+
type: array
354+
items:
355+
type: string
356+
responses:
357+
"200":
358+
description: "Return a summary of actions that has been applied."
359+
content:
360+
application/json:
361+
schema:
362+
type: object
363+
properties:
364+
processed:
365+
type: integer
366+
description: Number of processed edges
367+
updated:
368+
type: integer
369+
description: Number of updated edges
370+
deleted:
371+
type: integer
372+
description: Number of deleted edges
337373
/graph/{graph_id}/batch/merge:
338374
post:
339375
summary: "Merge a given graph with the existing graph under marked merge nodes as batch update."

fixcore/fixcore/task/task_handler.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -610,12 +610,17 @@ def workflow(name: TaskDescriptorId, steps: List[Step]) -> Workflow:
610610
trigger.append(TimeTrigger(wf_config.schedule))
611611
return Workflow(uid=name, name=name, steps=steps, triggers=trigger, on_surpass=TaskSurpassBehaviour.Wait)
612612

613-
collect_steps = [
613+
pre_collect = [
614614
Step("pre_collect", PerformAction("pre_collect"), timedelta(seconds=10)),
615615
Step("collect", PerformAction("collect"), timedelta(seconds=10)),
616616
Step("wait_for_graph_merged", WaitForCollectDone(), timedelta(minutes=10)),
617-
Step("merge_outer_edges", PerformAction("merge_outer_edges"), timedelta(seconds=10)),
618-
Step("post_collect", PerformAction("post_collect"), timedelta(seconds=10)),
617+
]
618+
post_collect = [Step("post_collect", PerformAction("post_collect"), timedelta(seconds=10))]
619+
collect_steps = pre_collect + post_collect
620+
collect_with_merge = [
621+
*pre_collect,
622+
Step("merge_deferred_edges", PerformAction("merge_deferred_edges"), timedelta(seconds=10)),
623+
*post_collect, # deferred edges are merged before post_collect
619624
]
620625
cleanup_steps = [
621626
Step("pre_cleanup_plan", PerformAction("pre_cleanup_plan"), timedelta(seconds=10)),
@@ -634,7 +639,7 @@ def workflow(name: TaskDescriptorId, steps: List[Step]) -> Workflow:
634639
workflow(TaskDescriptorId("collect"), collect_steps + metrics_steps),
635640
workflow(TaskDescriptorId("cleanup"), cleanup_steps + metrics_steps),
636641
workflow(TaskDescriptorId("metrics"), metrics_steps),
637-
workflow(TaskDescriptorId("collect_and_cleanup"), collect_steps + cleanup_steps + metrics_steps),
642+
workflow(TaskDescriptorId("collect_and_cleanup"), collect_with_merge + cleanup_steps + metrics_steps),
638643
]
639644

640645
# endregion

fixcore/fixcore/web/api.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from multidict import MultiDict
5959
from networkx.readwrite import cytoscape_data
6060

61+
from fixcore.action_handlers.merge_deferred_edge_handler import MergeDeferredEdgesHandler
6162
from fixcore.analytics import AnalyticsEvent
6263
from fixcore.cli.command import alias_names
6364
from fixcore.cli.model import (
@@ -74,7 +75,7 @@
7475
from fixcore.console_renderer import ConsoleColorSystem, ConsoleRenderer
7576
from fixcore.db.graphdb import GraphDB, HistoryChange
7677
from fixcore.db.model import QueryModel
77-
from fixcore.dependencies import Dependencies, TenantDependencies
78+
from fixcore.dependencies import Dependencies, TenantDependencies, ServiceNames
7879
from fixcore.dependencies import TenantDependencyProvider
7980
from fixcore.error import NotFoundError, NotEnoughPermissions
8081
from fixcore.ids import (
@@ -251,6 +252,7 @@ def __add_routes(self, prefix: str) -> None:
251252
# maintain the graph
252253
web.patch(prefix + "/graph/{graph_id}/nodes", require(self.update_nodes, r, w)),
253254
web.post(prefix + "/graph/{graph_id}/merge", require(self.merge_graph, r, w)),
255+
web.post(prefix + "/graph/{graph_id}/merge/deferred_edges", require(self.merge_deferred_edges, r, w)),
254256
web.post(prefix + "/graph/{graph_id}/batch/merge", require(self.update_merge_graph_batch, r, w)),
255257
web.get(prefix + "/graph/{graph_id}/batch", require(self.list_batches, r, w)),
256258
web.post(prefix + "/graph/{graph_id}/batch/{batch_id}", require(self.commit_batch, r, w)),
@@ -1074,6 +1076,13 @@ async def create_graph(self, request: Request, deps: TenantDependencies) -> Stre
10741076
root = await graph.get_node(model, NodeId("root"))
10751077
return web.json_response(root)
10761078

1079+
async def merge_deferred_edges(self, request: Request, deps: TenantDependencies) -> StreamResponse:
1080+
task_ids = await request.json()
1081+
assert isinstance(task_ids, list), "Expected a list of task ids"
1082+
deferred_edges_handler = deps.service(ServiceNames.merge_deferred_edges_handler, MergeDeferredEdgesHandler)
1083+
r = await deferred_edges_handler.merge_deferred_edges(task_ids)
1084+
return await single_result(request, {"processed": r.processed, "updated": r.updated, "deleted": r.deleted})
1085+
10771086
async def merge_graph(self, request: Request, deps: TenantDependencies) -> StreamResponse:
10781087
graph_id = GraphName(request.match_info.get("graph_id", "fix"))
10791088
wait_for_result = request.query.get("wait_for_result", "true").lower() == "true"

fixcore/tests/fixcore/action_handlers/merge_outer_edge_handler_test.py renamed to fixcore/tests/fixcore/action_handlers/merge_deferred_edge_handler_test.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import asyncio
22
from datetime import timedelta
3+
from typing import List
34

45
import pytest
56

6-
from fixcore.action_handlers.merge_outer_edge_handler import MergeOuterEdgesHandler
7+
from fixcore.action_handlers.merge_deferred_edge_handler import MergeDeferredEdgesHandler, merge_deferred_edges
78
from fixcore.db.db_access import DbAccess
89
from fixcore.db.deferredouteredgedb import DeferredOuterEdges
910
from fixcore.db.graphdb import ArangoGraphDB
@@ -19,38 +20,36 @@
1920
from fixcore.util import utc
2021
from tests.fixcore.db.graphdb_test import Foo, Bla, BaseResource
2122

22-
merge_outer_edges = "merge_outer_edges"
23-
2423

2524
@pytest.mark.asyncio
2625
async def test_handler_invocation(
27-
merge_handler: MergeOuterEdgesHandler,
26+
merge_handler: MergeDeferredEdgesHandler,
2827
subscription_handler: SubscriptionHandler,
2928
message_bus: MessageBus,
3029
) -> None:
31-
merge_called: asyncio.Future[TaskId] = asyncio.get_event_loop().create_future()
30+
merge_called: asyncio.Future[List[TaskId]] = asyncio.get_event_loop().create_future()
3231

33-
def mocked_merge(task_id: TaskId) -> None:
34-
merge_called.set_result(task_id)
32+
def mocked_merge(task_ids: List[TaskId]) -> None:
33+
merge_called.set_result(task_ids)
3534

36-
# monkey patching the merge_outer_edges method
35+
# monkey patching the merge_deferred_edges method
3736
# use setattr here, since assignment does not work in mypy https://github.com/python/mypy/issues/2427
38-
setattr(merge_handler, "merge_outer_edges", mocked_merge)
37+
setattr(merge_handler, "merge_deferred_edges", mocked_merge)
3938

40-
subscribers = await subscription_handler.list_subscriber_for(merge_outer_edges)
39+
subscribers = await subscription_handler.list_subscriber_for(merge_deferred_edges)
4140

4241
assert subscribers[0].id == "fixcore"
4342

4443
task_id = TaskId("test_task_1")
4544

46-
await message_bus.emit(Action(merge_outer_edges, task_id, merge_outer_edges))
45+
await message_bus.emit(Action(merge_deferred_edges, task_id, merge_deferred_edges))
4746

48-
assert await merge_called == task_id
47+
assert await merge_called == [task_id]
4948

5049

5150
@pytest.mark.asyncio
52-
async def test_merge_outer_edges(
53-
merge_handler: MergeOuterEdgesHandler, graph_db: ArangoGraphDB, foo_model: Model, db_access: DbAccess
51+
async def test_merge_deferred_edges(
52+
merge_handler: MergeDeferredEdgesHandler, graph_db: ArangoGraphDB, foo_model: Model, db_access: DbAccess
5453
) -> None:
5554
now = utc()
5655

@@ -68,7 +67,7 @@ async def test_merge_outer_edges(
6867
await db_access.deferred_outer_edge_db.update(
6968
DeferredOuterEdges("t0", "c0", TaskId("task123"), now, graph_db.name, [e1])
7069
)
71-
await merge_handler.merge_outer_edges(TaskId("task123"))
70+
await merge_handler.merge_deferred_edges([TaskId("task123")])
7271

7372
graph = await graph_db.search_graph(QueryModel(parse_query("is(graph_root) -default[0:]->"), foo_model))
7473
assert graph.has_edge("id1", "id2")
@@ -82,7 +81,7 @@ async def test_merge_outer_edges(
8281
await db_access.deferred_outer_edge_db.update(
8382
DeferredOuterEdges("t1", "c1", TaskId("task456"), new_now, graph_db.name, [e2])
8483
)
85-
await merge_handler.merge_outer_edges(TaskId("task456"))
84+
await merge_handler.merge_deferred_edges([TaskId("task456")])
8685

8786
graph = await graph_db.search_graph(QueryModel(parse_query("is(graph_root) -default[0:]->"), foo_model))
8887
assert not graph.has_edge("id1", "id2")
@@ -96,11 +95,12 @@ async def test_merge_outer_edges(
9695
await db_access.deferred_outer_edge_db.update(
9796
DeferredOuterEdges("t2", "c4", TaskId("task789"), new_now_2, graph_db.name, [e2])
9897
)
99-
updated, deleted = await merge_handler.merge_outer_edges(TaskId("task789"))
98+
r = await merge_handler.merge_deferred_edges([TaskId("task789")])
99+
assert r.processed == 1
100100
# here we also implicitly test that the timestamp was updated, because otherwise the edge
101101
# would have an old timestamp and would be deleted
102-
assert updated == 1
103-
assert deleted == 0
102+
assert r.updated == 1
103+
assert r.deleted == 0
104104
graph = await graph_db.search_graph(QueryModel(parse_query("is(graph_root) -default[0:]->"), foo_model))
105105
assert not graph.has_edge("id1", "id2")
106106
assert graph.has_edge("id2", "id1")

fixcore/tests/fixcore/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from pytest import fixture
2323
from rich.console import Console
2424

25-
from fixcore.action_handlers.merge_outer_edge_handler import MergeOuterEdgesHandler
25+
from fixcore.action_handlers.merge_deferred_edge_handler import MergeDeferredEdgesHandler
2626
from fixcore.analytics import AnalyticsEventSender, InMemoryEventSender, NoEventSender
2727
from fixcore.cli.cli import CLIService
2828
from fixcore.cli.command import (
@@ -843,9 +843,9 @@ async def merge_handler(
843843
task_handler: TaskHandlerService,
844844
db_access: DbAccess,
845845
foo_model: Model,
846-
) -> AsyncGenerator[MergeOuterEdgesHandler, None]:
846+
) -> AsyncGenerator[MergeDeferredEdgesHandler, None]:
847847
model_handler = ModelHandlerStatic(foo_model)
848-
handler = MergeOuterEdgesHandler(message_bus, subscription_handler, task_handler, db_access, model_handler)
848+
handler = MergeDeferredEdgesHandler(message_bus, subscription_handler, task_handler, db_access, model_handler)
849849
await handler.start()
850850
yield handler
851851
await handler.stop()

0 commit comments

Comments
 (0)