Skip to content

Commit

Permalink
[resotocore][fix] Do not create unnecessary collections when making s…
Browse files Browse the repository at this point in the history
…napshots (#1691)

* [resotocore][fix] Do not create unnecessary collections when making snapshots

* do not skip indexes

* refactor the create_node_indexes

* rename snapshot to to_snapshot

* fix the test
  • Loading branch information
meln1k committed Jun 28, 2023
1 parent 1eea19f commit 17bd451
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 34 deletions.
37 changes: 22 additions & 15 deletions resotocore/resotocore/db/graphdb.py
Expand Up @@ -155,7 +155,7 @@ async def create_update_schema(self) -> None:
pass

@abstractmethod
async def copy_graph(self, to_graph: GraphName) -> "GraphDB":
async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> "GraphDB":
pass


Expand Down Expand Up @@ -911,7 +911,7 @@ async def insert_genesis_data(self) -> None:
# ignore if the root not is already created
return None

async def create_update_schema(self, init_with_data: bool = True) -> None:
async def create_update_schema(self, init_with_data: bool = True, to_snapshot: bool = False) -> None:
db = self.db

async def create_update_graph(
Expand All @@ -930,10 +930,7 @@ async def create_update_graph(
)
return graph, vertex_collection, edge_collection

def create_update_collection_indexes(
nodes: VertexCollection, progress: StandardCollection, node_history: StandardCollection
) -> None:
# node indexes ------
def create_node_indexes(nodes: VertexCollection) -> None:
node_idxes = {idx["name"]: idx for idx in cast(List[Json], nodes.indexes())}
# this index will hold all the necessary data to query for an update (index only query)
if "update_nodes_ref_id" not in node_idxes:
Expand All @@ -952,6 +949,8 @@ def create_update_collection_indexes(
sparse=False,
name="kinds_id_name_ctime",
)

def create_update_collection_indexes(progress: StandardCollection, node_history: StandardCollection) -> None:
# progress indexes ------
progress_idxes = {idx["name"]: idx for idx in cast(List[Json], progress.indexes())}
if "parent_nodes" not in progress_idxes:
Expand Down Expand Up @@ -1031,9 +1030,17 @@ async def create_update_views(nodes: VertexCollection) -> None:
await create_update_graph(self.name, self.vertex_name, edge_type_name)

vertex = db.graph(self.name).vertex_collection(self.vertex_name)
in_progress = await create_collection(self.in_progress)
node_history_collection = await create_collection(self.node_history)
create_update_collection_indexes(vertex, in_progress, node_history_collection)

if to_snapshot:
# since the snapshots are immutable, we don't in_progress or node_history collections
# we only create the indexes on the vertex collection
create_node_indexes(vertex)
else:
in_progress = await create_collection(self.in_progress)
node_history_collection = await create_collection(self.node_history)
create_node_indexes(vertex)
create_update_collection_indexes(in_progress, node_history_collection)

for edge_type in EdgeTypes.all:
edge_collection = db.graph(self.name).edge_collection(self.edge_collection(edge_type))
create_update_edge_indexes(edge_collection)
Expand All @@ -1042,16 +1049,16 @@ async def create_update_views(nodes: VertexCollection) -> None:
if init_with_data:
await self.insert_genesis_data()

async def copy_graph(self, to_graph: GraphName) -> GraphDB:
async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> GraphDB:
if await self.db.has_graph(to_graph):
raise ValueError(f"Graph {to_graph} already exists")

new_graph_db = ArangoGraphDB(db=self.db, name=to_graph, adjust_node=self.node_adjuster, config=self.config)

# collection creation can't be a part of a transaction so we do that first
# we simply reuse the existing create_update_schema method but do not insert any genesis data
async def create_new_collections(new_db: ArangoGraphDB) -> None:
await new_db.create_update_schema(init_with_data=False)
async def create_new_collections(new_db: ArangoGraphDB, to_snapshot: bool) -> None:
await new_db.create_update_schema(init_with_data=False, to_snapshot=to_snapshot)

# we want to have a consistent snapshot view of the graph
async def copy_data() -> None:
Expand Down Expand Up @@ -1110,7 +1117,7 @@ async def copy_data() -> None:
write=[new_vertex, new_default_edge, new_delete_edge],
)

await create_new_collections(new_graph_db)
await create_new_collections(new_graph_db, to_snapshot=to_snapshot)
await copy_data()

return cast(GraphDB, new_graph_db)
Expand Down Expand Up @@ -1364,9 +1371,9 @@ async def to_query(self, query_model: QueryModel, with_edges: bool = False) -> T
async def create_update_schema(self) -> None:
await self.real.create_update_schema()

async def copy_graph(self, to_graph: GraphName) -> GraphDB:
async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> GraphDB:
await self.event_sender.core_event(
CoreEvent.GraphCopied,
{"graph": self.graph_name, "to_graph": to_graph},
)
return await self.real.copy_graph(to_graph)
return await self.real.copy_graph(to_graph, to_snapshot=to_snapshot)
17 changes: 12 additions & 5 deletions resotocore/resotocore/graph_manager/graph_manager.py
Expand Up @@ -134,7 +134,12 @@ async def snapshot_at(self, *, time: datetime, graph_name: GraphName) -> Optiona
return None

async def copy(
self, source: GraphName, destination: GraphName, replace_existing: bool, validate_name: bool = True
self,
source: GraphName,
destination: GraphName,
replace_existing: bool,
validate_name: bool = True,
to_snapshot: bool = False,
) -> GraphName:
if not self.lock:
raise RuntimeError("GraphManager has not been started")
Expand All @@ -148,9 +153,11 @@ async def copy(
await self.delete(destination)
else:
raise ValueError(f"Destination graph {destination} already exists")
return await self._copy_graph(source, destination, validate_name)
return await self._copy_graph(source, destination, validate_name, to_snapshot)

async def _copy_graph(self, source: GraphName, destination: GraphName, validate_name: bool = True) -> GraphName:
async def _copy_graph(
self, source: GraphName, destination: GraphName, validate_name: bool = True, to_snapshot: bool = False
) -> GraphName:
destination = GraphName(_compress_timestamps(destination))

if validate_name:
Expand All @@ -161,7 +168,7 @@ async def _copy_graph(self, source: GraphName, destination: GraphName, validate_

source_db = self.db_access.get_graph_db(source, no_check=True)

await source_db.copy_graph(destination)
await source_db.copy_graph(destination, to_snapshot)

source_model_db = await self.db_access.get_graph_model_db(source)
destination_model_db = await self.db_access.get_graph_model_db(destination)
Expand All @@ -181,7 +188,7 @@ async def snapshot(self, source: GraphName, label: str, timestamp: Optional[date
time = utc_str(timestamp, date_format=UTC_Date_Format_short)
check_graph_name(label)
snapshot_name = GraphName(f"snapshot-{source}-{label}-{time}")
return await self.copy(source, snapshot_name, replace_existing=False, validate_name=False)
return await self.copy(source, snapshot_name, replace_existing=False, validate_name=False, to_snapshot=True)

async def delete(self, graph_name: GraphName) -> None:
await self.db_access.delete_graph(graph_name)
Expand Down
37 changes: 23 additions & 14 deletions resotocore/tests/resotocore/db/graphdb_test.py
Expand Up @@ -604,20 +604,29 @@ async def test_db_copy(graph_db: ArangoGraphDB, foo_model: Model, db_access: DbA
copy_db = await graph_db.copy_graph(copy_db_name)
assert copy_db.name == copy_db_name

# validate the vertices
existing_vertex_ids = {a["_key"] for a in await db.all(graph_db.name)}
copy_vertex_ids = {a["_key"] for a in await db.all(copy_db_name)}
assert existing_vertex_ids == copy_vertex_ids

# validate the default edges
existing_default_edge_ids = {a["_key"] for a in await db.all(f"{graph_db.name}_default")}
copy_default_edge_ids = {a["_key"] for a in await db.all(f"{copy_db_name}_default")}
assert existing_default_edge_ids == copy_default_edge_ids

# validate the delete edges
existing_delete_edge_ids = {a["_key"] for a in await db.all(f"{graph_db.name}_delete")}
copy_delete_edge_ids = {a["_key"] for a in await db.all(f"{copy_db_name}_delete")}
assert existing_delete_edge_ids == copy_delete_edge_ids
async def validate(original_db_name: GraphName, copy_db_name: str) -> None:
# validate the vertices
existing_vertex_ids = {a["_key"] for a in await db.all(original_db_name)}
copy_vertex_ids = {a["_key"] for a in await db.all(copy_db_name)}
assert existing_vertex_ids == copy_vertex_ids

# validate the default edges
existing_default_edge_ids = {a["_key"] for a in await db.all(f"{original_db_name}_default")}
copy_default_edge_ids = {a["_key"] for a in await db.all(f"{copy_db_name}_default")}
assert existing_default_edge_ids == copy_default_edge_ids

# validate the delete edges
existing_delete_edge_ids = {a["_key"] for a in await db.all(f"{original_db_name}_delete")}
copy_delete_edge_ids = {a["_key"] for a in await db.all(f"{copy_db_name}_delete")}
assert existing_delete_edge_ids == copy_delete_edge_ids

await validate(graph_db.name, copy_db.name)

# check snapshots
snapshot_db_name = GraphName("snapshot_" + graph_db.name)
snapshot_db = await graph_db.copy_graph(snapshot_db_name, to_snapshot=True)
assert snapshot_db.name == snapshot_db_name
await validate(graph_db.name, snapshot_db.name)


def test_render_metadata_section(foo_model: Model) -> None:
Expand Down

0 comments on commit 17bd451

Please sign in to comment.