Skip to content

Commit 21ddc09

Browse files
authored
[core][fix] Persist parent update structure in an atomic way (#2196)
1 parent 6d11a8a commit 21ddc09

File tree

5 files changed

+114
-111
lines changed

5 files changed

+114
-111
lines changed

fixcore/fixcore/db/db_access.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB:
275275
else:
276276
if not no_check and not self.database.has_graph(name):
277277
raise NoSuchGraph(name)
278-
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph)
278+
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph, self.lock_db)
279279
event_db = EventGraphDB(graph_db, self.event_sender)
280280
self.graph_dbs[name] = event_db
281281
return event_db

fixcore/fixcore/db/graphdb.py

Lines changed: 49 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from numbers import Number
1010
from textwrap import dedent
1111
from typing import (
12-
DefaultDict,
1312
Optional,
1413
Callable,
1514
AsyncGenerator,
@@ -18,7 +17,6 @@
1817
Dict,
1918
List,
2019
Tuple,
21-
TypeVar,
2220
cast,
2321
AsyncIterator,
2422
Literal,
@@ -39,7 +37,8 @@
3937
from fixcore.db import arango_query, EstimatedSearchCost
4038
from fixcore.db.arango_query import fulltext_delimiter
4139
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncArangoTransactionDB, AsyncArangoDBBase, AsyncCursorContext
42-
from fixcore.db.model import GraphUpdate, QueryModel
40+
from fixcore.db.lockdb import LockDB
41+
from fixcore.db.model import GraphUpdate, QueryModel, GraphChange
4342
from fixcore.db.usagedb import resource_usage_db
4443
from fixcore.error import InvalidBatchUpdate, ConflictingChangeInProgress, NoSuchChangeError, OptimisticLockingFailed
4544
from fixcore.ids import NodeId, GraphName
@@ -275,6 +274,7 @@ def __init__(
275274
name: GraphName,
276275
adjust_node: AdjustNode,
277276
config: GraphConfig,
277+
lock_db: LockDB,
278278
) -> None:
279279
super().__init__()
280280
self._name = name
@@ -283,6 +283,7 @@ def __init__(
283283
self.in_progress = f"{name}_in_progress"
284284
self.node_history = f"{name}_node_history"
285285
self.usage_db = resource_usage_db(db, f"{name}_usage")
286+
self.lock_db = lock_db
286287
self.db = db
287288
self.config = config
288289

@@ -309,8 +310,8 @@ async def create_node(self, model: Model, node_id: NodeId, data: Json, under_nod
309310
graph.add_node(node_id, data)
310311
graph.add_edge(under_node_id, node_id, EdgeTypes.default)
311312
access = GraphAccess(graph.graph, node_id, {under_node_id})
312-
_, node_inserts, _, _ = self.prepare_nodes(access, [], model)
313-
_, edge_inserts, _, _ = self.prepare_edges(access, [], EdgeTypes.default)
313+
node_inserts = self.prepare_nodes(access, [], model).node_inserts
314+
edge_inserts = self.prepare_edges(access, [], EdgeTypes.default).edge_inserts[EdgeTypes.default]
314315
assert len(node_inserts) == 1
315316
assert len(edge_inserts) == 1
316317
edge_collection = self.edge_collection(EdgeTypes.default)
@@ -974,7 +975,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str, update_histo
974975
+ edge_updates
975976
+ edge_deletes
976977
+ usage_updates
977-
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress}'],
978+
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress} OPTIONS {{ ignoreErrors: true }}'],
978979
)
979980
)
980981
cmd = f'function () {{\nvar db=require("@arangodb").db;\n{updates}\n}}'
@@ -1034,15 +1035,9 @@ def adjust_node(
10341035
# adjuster has the option to manipulate the resulting json
10351036
return self.node_adjuster.adjust(json)
10361037

1037-
def prepare_nodes(
1038-
self, access: GraphAccess, node_cursor: Iterable[Json], model: Model
1039-
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
1038+
def prepare_nodes(self, access: GraphAccess, node_cursor: Iterable[Json], model: Model) -> GraphChange:
10401039
log.info(f"Prepare nodes for subgraph {access.root()}")
1041-
info = GraphUpdate()
1042-
resource_inserts: List[Json] = []
1043-
resource_updates: List[Json] = []
1044-
resource_deletes: List[Json] = []
1045-
1040+
change = GraphChange()
10461041
optional_properties = [*Section.all_ordered, "refs", "kinds", "flat", "hash", "hist_hash"]
10471042

10481043
def insert_node(node: Json) -> None:
@@ -1052,17 +1047,15 @@ def insert_node(node: Json) -> None:
10521047
value = node.get(prop, None)
10531048
if value:
10541049
js_doc[prop] = value
1055-
resource_inserts.append(js_doc)
1056-
info.nodes_created += 1
1050+
change.node_inserts.append(js_doc)
10571051

10581052
def update_or_delete_node(node: Json) -> None:
10591053
key = node["_key"]
10601054
hash_string = node["hash"]
10611055
elem = access.node(key)
10621056
if elem is None:
10631057
# node is in db, but not in the graph any longer: delete node
1064-
resource_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
1065-
info.nodes_deleted += 1
1058+
change.node_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
10661059
elif elem["hash"] != hash_string:
10671060
# node is in db and in the graph, content is different
10681061
adjusted: Json = self.adjust_node(model, elem, node["created"], access.at_json)
@@ -1072,15 +1065,14 @@ def update_or_delete_node(node: Json) -> None:
10721065
value = adjusted.get(prop, None)
10731066
if value:
10741067
js[prop] = value
1075-
resource_updates.append(js)
1076-
info.nodes_updated += 1
1068+
change.node_updates.append(js)
10771069

10781070
for doc in node_cursor:
10791071
update_or_delete_node(doc)
10801072

10811073
for not_visited in access.not_visited_nodes():
10821074
insert_node(not_visited)
1083-
return info, resource_inserts, resource_updates, resource_deletes
1075+
return change
10841076

10851077
def _edge_to_json(
10861078
self, from_node: str, to_node: str, data: Optional[Json], refs: Optional[Dict[str, str]] = None, **kwargs: Any
@@ -1096,14 +1088,9 @@ def _edge_to_json(
10961088
js["_to"] = f"{self.vertex_name}/{to_node}"
10971089
return js
10981090

1099-
def prepare_edges(
1100-
self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType
1101-
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
1091+
def prepare_edges(self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType) -> GraphChange:
11021092
log.info(f"Prepare edges of type {edge_type} for subgraph {access.root()}")
1103-
info = GraphUpdate()
1104-
edge_inserts: List[Json] = []
1105-
edge_updates: List[Json] = []
1106-
edge_deletes: List[Json] = []
1093+
change = GraphChange()
11071094

11081095
def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
11091096
# Take the refs with the lower number of entries (or none):
@@ -1117,29 +1104,26 @@ def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
11171104
return self._edge_to_json(from_node, to_node, edge_data, refs)
11181105

11191106
def insert_edge(from_node: str, to_node: str, edge_data: Optional[Json]) -> None:
1120-
edge_inserts.append(edge_json(from_node, to_node, edge_data))
1121-
info.edges_created += 1
1107+
change.edge_inserts[edge_type].append(edge_json(from_node, to_node, edge_data))
11221108

11231109
def update_edge(edge: Json) -> None:
11241110
from_node = edge["_from"].split("/")[1] # vertex/id
11251111
to_node = edge["_to"].split("/")[1] # vertex/id
11261112
has_edge, edge_data = access.has_edge(from_node, to_node, edge_type)
11271113
edge_hash = edge_data.get("hash") if edge_data else None
11281114
if not has_edge:
1129-
edge_deletes.append(edge)
1130-
info.edges_deleted += 1
1115+
change.edge_deletes[edge_type].append(edge)
11311116
elif edge_hash != edge.get("hash"):
11321117
js = edge_json(from_node, to_node, edge_data)
1133-
edge_updates.append(js)
1134-
info.edges_updated += 1
1118+
change.edge_updates[edge_type].append(js)
11351119

11361120
for doc in edge_cursor:
11371121
update_edge(doc)
11381122

11391123
for edge_from, edge_to, data in access.not_visited_edges(edge_type):
11401124
insert_edge(edge_from, edge_to, data)
11411125

1142-
return info, edge_inserts, edge_updates, edge_deletes
1126+
return change
11431127

11441128
async def merge_graph(
11451129
self,
@@ -1154,105 +1138,67 @@ async def merge_graph(
11541138

11551139
async def prepare_graph(
11561140
sub: GraphAccess, node_query: Tuple[str, Json], edge_query: Callable[[EdgeType], Tuple[str, Json]]
1157-
) -> Tuple[
1158-
GraphUpdate,
1159-
List[Json], # node insert
1160-
List[Json], # node update
1161-
List[Json], # node delete
1162-
Dict[EdgeType, List[Json]], # edge insert
1163-
Dict[EdgeType, List[Json]], # edge update
1164-
Dict[EdgeType, List[Json]], # edge delete
1165-
]:
1166-
graph_info = GraphUpdate()
1141+
) -> GraphChange:
1142+
graph_change = GraphChange()
11671143
# check all nodes for this subgraph
11681144
query, bind = node_query
11691145
log.debug(f"Query for nodes: {sub.root()}")
11701146
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as node_cursor:
1171-
node_info, ni, nu, nd = self.prepare_nodes(sub, node_cursor, model)
1172-
graph_info += node_info
1147+
graph_change += self.prepare_nodes(sub, node_cursor, model)
11731148

11741149
# check all edges in all relevant edge-collections
1175-
edge_inserts: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
1176-
edge_updates: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
1177-
edge_deletes: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
11781150
for edge_type in EdgeTypes.all:
11791151
query, bind = edge_query(edge_type)
11801152
log.debug(f"Query for edges of type {edge_type}: {sub.root()}")
11811153
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as ec:
1182-
edge_info, gei, geu, ged = self.prepare_edges(sub, ec, edge_type)
1183-
graph_info += edge_info
1184-
edge_inserts[edge_type] = gei
1185-
edge_updates[edge_type] = geu
1186-
edge_deletes[edge_type] = ged
1187-
return graph_info, ni, nu, nd, edge_inserts, edge_updates, edge_deletes
1154+
graph_change += self.prepare_edges(sub, ec, edge_type)
1155+
return graph_change
11881156

11891157
roots, parent, graphs = GraphAccess.merge_graphs(graph_to_merge)
11901158
log.info(f"merge_graph {len(roots)} merge nodes found. change_id={change_id}, is_batch={is_batch}.")
11911159

11921160
def merge_edges(merge_node: str, merge_node_kind: str, edge_type: EdgeType) -> Tuple[str, Json]:
11931161
return self.query_update_edges(edge_type, merge_node_kind), {"update_id": merge_node}
11941162

1195-
K = TypeVar("K") # noqa: N806
1196-
V = TypeVar("V") # noqa: N806
1197-
1198-
def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, List[V]]:
1199-
result = dict(left)
1200-
for key, right_values in right.items():
1201-
left_values = left.get(key)
1202-
result[key] = left_values + right_values if left_values else right_values
1203-
return result
1163+
def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
1164+
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
1165+
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)
12041166

12051167
# this will throw an exception, in case of a conflicting update (--> outside try block)
12061168
log.debug("Mark all parent nodes for this update to avoid conflicting changes")
12071169
await self.mark_update(roots, list(parent.nodes), change_id, is_batch)
12081170
try:
1209-
1210-
def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
1211-
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
1212-
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)
1213-
1171+
# store parent nodes and edges with a mutex to avoid conflicts
12141172
parents_nodes = self.nodes_by_ids_and_until_replace_node(preserve_parent_structure, parent)
1215-
info, nis, nus, nds, eis, eus, eds = await prepare_graph(parent, parents_nodes, parent_edges)
1173+
async with self.lock_db.lock("merge_graph_parents"):
1174+
parent_change_id = change_id + "_parent"
1175+
parent_change = await prepare_graph(parent, parents_nodes, parent_edges)
1176+
if parent_change.change_count(): # only persist in case of changes
1177+
await self._persist_update(parent_change_id, False, parent_change, update_history)
1178+
1179+
change = GraphChange()
12161180
for num, (root, graph) in enumerate(graphs):
12171181
root_kind = GraphResolver.resolved_kind(graph_to_merge.nodes[root])
12181182
if root_kind:
12191183
# noinspection PyTypeChecker
12201184
log.info(f"Update subgraph: root={root} ({root_kind}, {num+1} of {len(roots)})")
12211185
node_query = self.query_update_nodes(root_kind), {"update_id": root}
12221186
edge_query = partial(merge_edges, root, root_kind)
1223-
1224-
i, ni, nu, nd, ei, eu, ed = await prepare_graph(graph, node_query, edge_query)
1225-
info += i
1226-
nis += ni
1227-
nus += nu
1228-
nds += nd
1229-
eis = combine_dict(eis, ei)
1230-
eus = combine_dict(eus, eu)
1231-
eds = combine_dict(eds, ed)
1187+
change += await prepare_graph(graph, node_query, edge_query)
12321188
else:
1233-
# Already checked in GraphAccess - only here as safeguard.
1189+
# Already checked in GraphAccess - only here as a safeguard.
12341190
raise AttributeError(f"Kind of update root {root} is not a pre-resolved and can not be used!")
12351191

1236-
log.debug(f"Update prepared: {info}. Going to persist the changes.")
1192+
graph_update = parent_change.to_update() + change.to_update()
1193+
log.debug(f"Update prepared: {graph_update}. Going to persist the changes.")
12371194
await self._refresh_marked_update(change_id)
1238-
await self._persist_update(change_id, is_batch, nis, nus, nds, eis, eus, eds, update_history)
1239-
return roots, info
1195+
await self._persist_update(change_id, is_batch, change, update_history)
1196+
return roots, graph_update
12401197
except Exception as ex:
12411198
await self.delete_marked_update(change_id)
12421199
raise ex
12431200

1244-
async def _persist_update(
1245-
self,
1246-
change_id: str,
1247-
is_batch: bool,
1248-
resource_inserts: List[Json],
1249-
resource_updates: List[Json],
1250-
resource_deletes: List[Json],
1251-
edge_inserts: Dict[EdgeType, List[Json]],
1252-
edge_updates: Dict[EdgeType, List[Json]],
1253-
edge_deletes: Dict[EdgeType, List[Json]],
1254-
update_history: bool,
1255-
) -> None:
1201+
async def _persist_update(self, change_id: str, is_batch: bool, change: GraphChange, update_history: bool) -> None:
12561202
async def execute_many_async(
12571203
async_fn: Callable[[str, List[Json]], Any], name: str, array: List[Json], **kwargs: Any
12581204
) -> None:
@@ -1275,20 +1221,20 @@ async def trafo_many(
12751221

12761222
async def store_to_tmp_collection(temp: StandardCollection) -> None:
12771223
tmp = temp.name
1278-
ri = trafo_many(self.db.insert_many, tmp, resource_inserts, {"action": "node_created"})
1279-
ru = trafo_many(self.db.insert_many, tmp, resource_updates, {"action": "node_updated"})
1280-
rd = trafo_many(self.db.insert_many, tmp, resource_deletes, {"action": "node_deleted"})
1224+
ri = trafo_many(self.db.insert_many, tmp, change.node_inserts, {"action": "node_created"})
1225+
ru = trafo_many(self.db.insert_many, tmp, change.node_updates, {"action": "node_updated"})
1226+
rd = trafo_many(self.db.insert_many, tmp, change.node_deletes, {"action": "node_deleted"})
12811227
edge_i = [
12821228
trafo_many(self.db.insert_many, tmp, inserts, {"action": "edge_insert", "edge_type": tpe})
1283-
for tpe, inserts in edge_inserts.items()
1229+
for tpe, inserts in change.edge_inserts.items()
12841230
]
12851231
edge_u = [
12861232
trafo_many(self.db.insert_many, tmp, updates, {"action": "edge_update", "edge_type": tpe})
1287-
for tpe, updates in edge_updates.items()
1233+
for tpe, updates in change.edge_updates.items()
12881234
]
12891235
edge_d = [
12901236
trafo_many(self.db.insert_many, tmp, deletes, {"action": "edge_delete", "edge_type": tpe})
1291-
for tpe, deletes in edge_deletes.items()
1237+
for tpe, deletes in change.edge_deletes.items()
12921238
]
12931239
await asyncio.gather(*([ri, ru, rd] + edge_i + edge_u + edge_d))
12941240

@@ -1562,7 +1508,7 @@ async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> Gr
15621508
if await self.db.has_graph(to_graph):
15631509
raise ValueError(f"Graph {to_graph} already exists")
15641510

1565-
new_graph_db = ArangoGraphDB(db=self.db, name=to_graph, adjust_node=self.node_adjuster, config=self.config)
1511+
new_graph_db = ArangoGraphDB(self.db, to_graph, self.node_adjuster, self.config, self.lock_db)
15661512

15671513
# collection creation can't be a part of a transaction so we do that first
15681514
# we simply reuse the existing create_update_schema method but do not insert any genesis data

0 commit comments

Comments
 (0)