From 1b7fa97a7ffdb2d6e43c58dd0ffd1593e03b9a55 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Wed, 20 May 2026 14:27:58 +0800 Subject: [PATCH] fix(storage): preserve semantic lock ownership Introduce typed lock leases for semantic queue handoff so resource, memory, and reindex flows can share or transfer lock ownership without releasing caller-owned locks prematurely. --- openviking/service/reindex_executor.py | 266 +++++++---- openviking/storage/content_write.py | 6 - openviking/storage/queuefs/semantic_dag.py | 62 +-- openviking/storage/queuefs/semantic_lock.py | 37 ++ openviking/storage/queuefs/semantic_msg.py | 10 +- .../storage/queuefs/semantic_processor.py | 424 ++++++++---------- .../storage/queuefs/semantic_sidecar.py | 27 +- openviking/storage/transaction/__init__.py | 12 + .../storage/transaction/lock_context.py | 9 +- openviking/storage/transaction/lock_lease.py | 225 ++++++++++ .../storage/transaction/lock_manager.py | 16 + openviking/utils/resource_processor.py | 114 +++-- openviking/utils/summarizer.py | 8 +- tests/client/test_file_operations.py | 2 +- tests/server/test_admin_rebuild_api.py | 83 ++-- tests/server/test_content_write_service.py | 3 +- tests/storage/test_memory_semantic_stall.py | 1 - ...emantic_dag_incremental_missing_summary.py | 2 +- ...test_semantic_processor_lock_ownership.py} | 12 +- 19 files changed, 792 insertions(+), 527 deletions(-) create mode 100644 openviking/storage/queuefs/semantic_lock.py create mode 100644 openviking/storage/transaction/lock_lease.py rename tests/storage/{test_semantic_processor_lifecycle_lock.py => test_semantic_processor_lock_ownership.py} (85%) diff --git a/openviking/service/reindex_executor.py b/openviking/service/reindex_executor.py index 6de627e76..42ebd1de8 100644 --- a/openviking/service/reindex_executor.py +++ b/openviking/service/reindex_executor.py @@ -25,12 +25,19 @@ from openviking.server.identity import RequestContext from openviking.service.task_tracker import get_task_tracker from openviking.session.memory.utils.messages import parse_memory_file_with_fields -from openviking.storage.collection_schemas import TextEmbeddingHandler from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter from openviking.storage.queuefs.semantic_msg import SemanticMsg from openviking.storage.queuefs.semantic_processor import SemanticProcessor -from openviking.storage.transaction import LockContext, get_lock_manager +from openviking.storage.transaction import ( + NO_LOCK, + BorrowedLockLease, + LockContext, + LockLease, + get_lock_manager, +) from openviking.storage.viking_fs import get_viking_fs +from openviking.telemetry import get_current_telemetry +from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking.utils.embedding_utils import get_resource_content_type from openviking.utils.skill_processor import SkillProcessor from openviking_cli.exceptions import NotFoundError, OpenVikingError @@ -60,6 +67,13 @@ class _ReindexCounters: warnings: list[str] = field(default_factory=list) +@dataclass +class _ReindexRunContext: + ctx: RequestContext + counters: _ReindexCounters + lock: LockLease = NO_LOCK + + class ReindexExecutor: """Non-destructive reindex orchestration for admin maintenance flows.""" @@ -196,9 +210,10 @@ async def _refresh_namespace_resource_semantics( target_root: str, directories: list[str], files: list[str], - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> tuple[list[str], list[str]]: + counters = run.counters + ctx = run.ctx prefix = self._child_prefix(target_root) semantic_roots = sorted( { @@ -235,6 +250,7 @@ async def _refresh_namespace_resource_semantics( uri=semantic_root, context_type="resource", ctx=ctx, + lock=run.lock, ) return filtered_directories, filtered_files @@ -244,6 +260,22 @@ def _child_prefix(root: str) -> str: return "viking://" return root.rstrip("/") + "/" + @staticmethod + def _apply_embedding_wait_status( + counters: _ReindexCounters, + queue_status: dict[str, Any], + ) -> None: + embedding_status = queue_status.get("Embedding") or {} + error_count = int(embedding_status.get("error_count", 0) or 0) + if error_count <= 0: + return + counters.failed_records += error_count + counters.rebuilt_records = max(0, counters.rebuilt_records - error_count) + for error in embedding_status.get("errors", []) or []: + message = error.get("message") if isinstance(error, dict) else str(error) + if message: + counters.warnings.append(f"Embedding queue failed during reindex: {message}") + def _is_resource_entry_for_namespace(self, uri: str, target_root: str) -> bool: if not uri.startswith(self._child_prefix(target_root)): return False @@ -260,9 +292,10 @@ async def _reindex_skill_namespace( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx viking_fs = get_viking_fs() try: entries = await self._tree_all(viking_fs, uri, show_all_hidden=True, ctx=ctx) @@ -276,7 +309,11 @@ async def _reindex_skill_namespace( skill_roots.append(entry_uri) for skill_root in sorted(set(skill_roots)): - await self._reindex_skill(uri=skill_root, mode=mode, counters=counters, ctx=ctx) + await self._reindex_skill( + uri=skill_root, + mode=mode, + run=run, + ) if not skill_roots: counters.unsupported_records += 1 @@ -306,52 +343,86 @@ async def _run( service = get_service() if service.viking_fs is None or service.vikingdb_manager is None: raise RuntimeError("OpenVikingService not initialized") + if not service.vikingdb_manager.has_queue_manager: + raise OpenVikingError( + "Reindex requires embedding queue", + code="FAILED_PRECONDITION", + details={"uri": uri}, + ) path = service.viking_fs._uri_to_path(uri, ctx=ctx) started_at = time.perf_counter() counters = _ReindexCounters() + telemetry_id = get_current_telemetry().telemetry_id + wait_tracker = get_request_wait_tracker() + if telemetry_id: + wait_tracker.register_request(telemetry_id) - async with LockContext(get_lock_manager(), [path], lock_mode="tree"): - if object_type == "global_namespace": - await self._reindex_global_namespace( - uri=uri, - mode=mode, - counters=counters, - ctx=ctx, - ) - elif object_type == "agent_namespace": - await self._reindex_agent_namespace( - uri=uri, - mode=mode, - counters=counters, - ctx=ctx, - ) - elif object_type == "user_namespace": - await self._reindex_user_namespace( - uri=uri, - mode=mode, - counters=counters, + try: + async with LockContext(get_lock_manager(), [path], lock_mode="tree") as lock_handle: + run = _ReindexRunContext( ctx=ctx, - ) - elif object_type == "skill_namespace": - await self._reindex_skill_namespace( - uri=uri, - mode=mode, counters=counters, - ctx=ctx, - ) - elif object_type == "resource": - await self._reindex_resource(uri=uri, mode=mode, counters=counters, ctx=ctx) - elif object_type == "skill": - await self._reindex_skill(uri=uri, mode=mode, counters=counters, ctx=ctx) - elif object_type == "memory": - await self._reindex_memory(uri=uri, mode=mode, counters=counters, ctx=ctx) - else: - raise OpenVikingError( - f"Unsupported reindex type: {object_type}", - code="UNSUPPORTED_URI", - details={"uri": uri}, + lock=BorrowedLockLease.from_handle(get_lock_manager(), lock_handle), ) + if object_type == "global_namespace": + await self._reindex_global_namespace( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "agent_namespace": + await self._reindex_agent_namespace( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "user_namespace": + await self._reindex_user_namespace( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "skill_namespace": + await self._reindex_skill_namespace( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "resource": + await self._reindex_resource( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "skill": + await self._reindex_skill( + uri=uri, + mode=mode, + run=run, + ) + elif object_type == "memory": + await self._reindex_memory( + uri=uri, + mode=mode, + run=run, + ) + else: + raise OpenVikingError( + f"Unsupported reindex type: {object_type}", + code="UNSUPPORTED_URI", + details={"uri": uri}, + ) + + if telemetry_id: + await wait_tracker.wait_for_request(telemetry_id) + self._apply_embedding_wait_status( + counters, + wait_tracker.build_queue_status(telemetry_id), + ) + finally: + if telemetry_id: + wait_tracker.cleanup(telemetry_id) return { "status": "completed", @@ -393,11 +464,17 @@ async def _reindex_resource( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx if mode == "semantic_and_vectors": - await self._run_semantic_processor(uri=uri, context_type="resource", ctx=ctx) + await self._run_semantic_processor( + uri=uri, + context_type="resource", + ctx=ctx, + lock=run.lock, + ) await self._reindex_resource_vectors(uri=uri, counters=counters, ctx=ctx) return await self._reindex_resource_vectors(uri=uri, counters=counters, ctx=ctx) @@ -407,9 +484,10 @@ async def _reindex_skill( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx if mode == "semantic_and_vectors": await self._regenerate_skill_semantics(uri=uri, ctx=ctx) await self._reindex_skill_vectors(uri=uri, counters=counters, ctx=ctx) @@ -419,17 +497,28 @@ async def _reindex_memory( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx if mode == "semantic_and_vectors": - await self._run_semantic_processor(uri=uri, context_type="memory", ctx=ctx) + await self._run_semantic_processor( + uri=uri, + context_type="memory", + ctx=ctx, + lock=run.lock, + ) await self._reindex_memory_vectors(uri=uri, counters=counters, ctx=ctx) return await self._reindex_memory_vectors(uri=uri, counters=counters, ctx=ctx) async def _run_semantic_processor( - self, *, uri: str, context_type: str, ctx: RequestContext + self, + *, + uri: str, + context_type: str, + ctx: RequestContext, + lock: LockLease = NO_LOCK, ) -> None: processor = SemanticProcessor() msg = SemanticMsg( @@ -442,7 +531,7 @@ async def _run_semantic_processor( role=ctx.role.value, skip_vectorization=True, ) - await processor.on_dequeue({"data": msg.to_json()}) + await processor.on_dequeue({"data": msg.to_json()}, lock=lock.as_borrowed()) async def _reindex_resource_vectors( self, @@ -586,9 +675,10 @@ async def _reindex_user_namespace( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx normalized_uri = uri.rstrip("/") target_root = normalized_uri if normalized_uri else uri viking_fs = get_viking_fs() @@ -608,8 +698,7 @@ async def _reindex_user_namespace( await self._reindex_user_namespace( uri=user_root, mode=mode, - counters=counters, - ctx=ctx, + run=run, ) return @@ -638,7 +727,9 @@ async def _reindex_user_namespace( "semantic_and_vectors" if mode == "semantic_and_vectors" else "vectors_only" ) await self._reindex_memory( - uri=memory_root, mode=memory_mode, counters=counters, ctx=ctx + uri=memory_root, + mode=memory_mode, + run=run, ) if mode == "semantic_and_vectors": @@ -649,8 +740,7 @@ async def _reindex_user_namespace( target_root=target_root, directories=resource_directories, files=resource_files, - counters=counters, - ctx=ctx, + run=run, ) await self._reindex_resource_vectors_from_entries( @@ -666,9 +756,10 @@ async def _reindex_agent_namespace( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx normalized_uri = uri.rstrip("/") target_root = normalized_uri if normalized_uri else uri viking_fs = get_viking_fs() @@ -688,8 +779,7 @@ async def _reindex_agent_namespace( await self._reindex_agent_namespace( uri=agent_root, mode=mode, - counters=counters, - ctx=ctx, + run=run, ) return @@ -723,14 +813,20 @@ async def _reindex_agent_namespace( "semantic_and_vectors" if mode == "semantic_and_vectors" else "vectors_only" ) await self._reindex_memory( - uri=memory_root, mode=memory_mode, counters=counters, ctx=ctx + uri=memory_root, + mode=memory_mode, + run=run, ) for skill_root in sorted(set(skill_roots)): skill_mode = ( "semantic_and_vectors" if mode == "semantic_and_vectors" else "vectors_only" ) - await self._reindex_skill(uri=skill_root, mode=skill_mode, counters=counters, ctx=ctx) + await self._reindex_skill( + uri=skill_root, + mode=skill_mode, + run=run, + ) if mode == "semantic_and_vectors": ( @@ -740,8 +836,7 @@ async def _reindex_agent_namespace( target_root=target_root, directories=resource_directories, files=resource_files, - counters=counters, - ctx=ctx, + run=run, ) await self._reindex_resource_vectors_from_entries( @@ -757,9 +852,10 @@ async def _reindex_global_namespace( *, uri: str, mode: str, - counters: _ReindexCounters, - ctx: RequestContext, + run: _ReindexRunContext, ) -> None: + counters = run.counters + ctx = run.ctx target_root = "viking://" viking_fs = get_viking_fs() try: @@ -801,16 +897,14 @@ async def _reindex_global_namespace( await self._reindex_user_namespace( uri=user_root, mode=mode, - counters=counters, - ctx=ctx, + run=run, ) for agent_root in sorted(set(agent_roots)): await self._reindex_agent_namespace( uri=agent_root, mode=mode, - counters=counters, - ctx=ctx, + run=run, ) if mode == "semantic_and_vectors": @@ -821,8 +915,7 @@ async def _reindex_global_namespace( target_root=target_root, directories=resource_directories, files=resource_files, - counters=counters, - ctx=ctx, + run=run, ) await self._reindex_resource_vectors_from_entries( @@ -1229,17 +1322,20 @@ async def _upsert_context( code="FAILED_PRECONDITION", details={"uri": uri}, ) - - result = await TextEmbeddingHandler(service.vikingdb_manager).on_dequeue( - {"data": msg.to_json()} - ) - if result is None: + wait_tracker = get_request_wait_tracker() + wait_tracker.register_embedding_root(msg.telemetry_id, msg.id) + enqueued = await service.vikingdb_manager.enqueue_embedding_msg(msg) + if not enqueued: + wait_tracker.mark_embedding_failed( + msg.telemetry_id, + msg.id, + f"Failed to enqueue reindex vector for {uri}", + ) raise OpenVikingError( - f"Failed to reindex vector for {uri}", + f"Failed to enqueue reindex vector for {uri}", code="PROCESSING_ERROR", details={"uri": uri, "level": int(level)}, ) - logger.debug("Reindexed vector for %s level=%s", uri, int(level)) async def _fetch_existing_record( self, diff --git a/openviking/storage/content_write.py b/openviking/storage/content_write.py index 82b2a79d0..53074ed64 100644 --- a/openviking/storage/content_write.py +++ b/openviking/storage/content_write.py @@ -201,7 +201,6 @@ async def _write_direct_with_refresh( changed_uri=uri, context_type=context_type, ctx=ctx, - lifecycle_lock_handle_id="", change_type="added" if mode == "create" else "modified", ) semantic_enqueued = True @@ -386,7 +385,6 @@ async def _enqueue_semantic_refresh( changed_uri: str, context_type: str, ctx: RequestContext, - lifecycle_lock_handle_id: str, change_type: str = "modified", target_uri: str = "", ) -> None: @@ -403,7 +401,6 @@ async def _enqueue_semantic_refresh( role=ctx.role.value, skip_vectorization=False, telemetry_id=telemetry.telemetry_id, - lifecycle_lock_handle_id=lifecycle_lock_handle_id, coalesce_key=( build_semantic_coalesce_key( context_type=context_type, @@ -432,7 +429,6 @@ async def _enqueue_memory_refresh( root_uri: str, modified_uri: str, ctx: RequestContext, - lifecycle_lock_handle_id: str, ) -> None: queue_manager = get_queue_manager() semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC, allow_create=True) @@ -446,7 +442,6 @@ async def _enqueue_memory_refresh( role=ctx.role.value, skip_vectorization=False, telemetry_id=telemetry.telemetry_id, - lifecycle_lock_handle_id=lifecycle_lock_handle_id, coalesce_key=build_semantic_coalesce_key( context_type="memory", uri=root_uri, @@ -563,7 +558,6 @@ async def _write_memory_with_refresh( root_uri=root_uri, modified_uri=uri, ctx=ctx, - lifecycle_lock_handle_id="", ) await lock_manager.release(handle) released = True diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index d849c5208..8452d9b5b 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -8,6 +8,7 @@ from openviking.server.identity import RequestContext from openviking.storage.queuefs.semantic_sidecar import write_semantic_sidecars +from openviking.storage.transaction import NO_LOCK, LockLease from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking_cli.utils import VikingURI @@ -79,7 +80,7 @@ def __init__( semantic_msg_id: Optional[str] = None, telemetry_id: str = "", recursive: bool = True, - lifecycle_lock_handle_id: str = "", + lock: LockLease = NO_LOCK, is_code_repo: bool = False, changes: Optional[Dict[str, List[str]]] = None, skip_vectorization: bool = False, @@ -95,7 +96,7 @@ def __init__( self._semantic_msg_id = semantic_msg_id self._telemetry_id = telemetry_id self._recursive = recursive - self._lifecycle_lock_handle_id = lifecycle_lock_handle_id + self._lock = lock self._is_code_repo = is_code_repo self._changes = changes or {} self._skip_vectorization = skip_vectorization @@ -119,7 +120,6 @@ def __init__( self._dir_change_status: Dict[str, bool] = {} self._overview_cache: Dict[str, Dict[str, str]] = {} self._overview_cache_lock = asyncio.Lock() - self._refresh_task: Optional[asyncio.Task] = None def _create_on_complete_callback(self) -> Callable[[], Awaitable[None]]: """Create on_complete callback for incremental update or full update.""" @@ -144,7 +144,7 @@ async def sync_diff_callback() -> None: self._target_uri, ctx=self._ctx, file_change_status=self._file_change_status, - lifecycle_lock_handle_id=self._lifecycle_lock_handle_id, + lock=self._lock, ) logger.info( f"[SyncDiff] Diff computed: " @@ -169,20 +169,16 @@ async def run(self, root_uri: str) -> None: self._root_uri = root_uri self._root_done = asyncio.Event() - # Start lifecycle lock refresh loop if we hold a lock - if self._lifecycle_lock_handle_id: - self._refresh_task = asyncio.create_task(self._lock_refresh_loop()) - try: await self._dispatch_dir(root_uri, parent_uri=None) await self._root_done.wait() except Exception: - await self._release_lifecycle_lock() + await self._lock.close() raise original_on_complete = self._create_on_complete_callback() - # Wrap on_complete to release lifecycle lock after all processing + # Release owned semantic locks after downstream vectorization finishes. async def wrapped_on_complete() -> None: try: if original_on_complete: @@ -192,7 +188,7 @@ async def wrapped_on_complete() -> None: self._telemetry_id, self._semantic_msg_id ) finally: - await self._release_lifecycle_lock() + await self._lock.close() async with self._vectorize_lock: task_count = self._vectorize_task_count @@ -611,7 +607,7 @@ async def _write_directory_semantics( abstract=abstract, ctx=self._ctx, is_stale=self._is_stale, - lifecycle_lock_handle_id=self._lifecycle_lock_handle_id, + lock=self._lock, log_prefix="[SemanticDag]", ) if not wrote: @@ -647,7 +643,7 @@ async def _overview_task(self, dir_uri: str) -> None: abstract = self._processor._extract_abstract_from_overview(overview) overview, abstract = self._processor._enforce_size_limits(overview, abstract) - # Write directly — protected by the outer lifecycle tree lock + # Write directly, protected by the outer semantic lock. try: wrote = await self._write_directory_semantics(dir_uri, overview, abstract) if not wrote: @@ -701,46 +697,6 @@ async def _add_vectorize_task(self, task: VectorizeTask) -> None: else: # directory self._vectorize_task_count += 2 - async def _lock_refresh_loop(self) -> None: - """Periodically refresh lifecycle lock to prevent stale expiry.""" - from openviking.storage.transaction import get_lock_manager - - try: - interval = get_lock_manager()._path_lock._lock_expire / 2 - except Exception: - interval = 150.0 - - while True: - try: - await asyncio.sleep(interval) - handle = get_lock_manager().get_handle(self._lifecycle_lock_handle_id) - if handle: - await get_lock_manager().refresh_lock(handle) - else: - break - except asyncio.CancelledError: - break - except Exception as e: - logger.warning(f"[SemanticDag] Lock refresh failed: {e}") - - async def _release_lifecycle_lock(self) -> None: - """Stop refresh loop and release lifecycle lock.""" - if self._refresh_task and not self._refresh_task.done(): - self._refresh_task.cancel() - self._refresh_task = None - if not self._lifecycle_lock_handle_id: - return - handle_id = self._lifecycle_lock_handle_id - self._lifecycle_lock_handle_id = "" - try: - from openviking.storage.transaction import get_lock_manager - - handle = get_lock_manager().get_handle(handle_id) - if handle: - await get_lock_manager().release(handle) - except Exception as e: - logger.warning(f"[SemanticDag] Failed to release lifecycle lock {handle_id}: {e}") - def get_stats(self) -> DagStats: return DagStats( total_nodes=self._stats.total_nodes, diff --git a/openviking/storage/queuefs/semantic_lock.py b/openviking/storage/queuefs/semantic_lock.py new file mode 100644 index 000000000..9e00165e5 --- /dev/null +++ b/openviking/storage/queuefs/semantic_lock.py @@ -0,0 +1,37 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Semantic queue lock resolution.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from openviking.storage.transaction import NO_LOCK, LockHandoffRef, LockLease, OwnedLockLease + + +@dataclass +class SemanticLockScope: + """Resolved lock scope for one semantic message.""" + + lock: LockLease + + @classmethod + async def resolve( + cls, + lock_handoff: Optional[LockHandoffRef], + *, + caller_lock: LockLease = NO_LOCK, + ) -> "SemanticLockScope": + if lock_handoff and caller_lock.active: + raise ValueError("semantic lock must come from either message or caller, not both") + if caller_lock is not NO_LOCK and not caller_lock.active: + raise ValueError("caller semantic lock is inactive") + if caller_lock.active: + return cls(caller_lock.as_borrowed()) + if lock_handoff: + return cls(await OwnedLockLease.from_handoff(lock_handoff)) + return cls(NO_LOCK) + + async def close(self) -> None: + await self.lock.close() diff --git a/openviking/storage/queuefs/semantic_msg.py b/openviking/storage/queuefs/semantic_msg.py index bc46dc7d3..aecb0bbf7 100644 --- a/openviking/storage/queuefs/semantic_msg.py +++ b/openviking/storage/queuefs/semantic_msg.py @@ -8,6 +8,8 @@ from typing import Any, Dict, List, Optional from uuid import uuid4 +from openviking.storage.transaction import LockHandoffRef + def build_semantic_coalesce_key( *, @@ -50,7 +52,7 @@ class SemanticMsg: skip_vectorization: bool = False telemetry_id: str = "" target_uri: str = "" - lifecycle_lock_handle_id: str = "" + lock_handoff: Optional[LockHandoffRef] = None is_code_repo: bool = False coalesce_key: str = "" coalesce_version: int = 0 @@ -70,7 +72,7 @@ def __init__( skip_vectorization: bool = False, telemetry_id: str = "", target_uri: str = "", - lifecycle_lock_handle_id: str = "", + lock_handoff: Optional[LockHandoffRef] = None, is_code_repo: bool = False, coalesce_key: str = "", coalesce_version: int = 0, @@ -87,7 +89,7 @@ def __init__( self.skip_vectorization = skip_vectorization self.telemetry_id = telemetry_id self.target_uri = target_uri - self.lifecycle_lock_handle_id = lifecycle_lock_handle_id + self.lock_handoff = lock_handoff self.is_code_repo = is_code_repo self.coalesce_key = coalesce_key self.coalesce_version = coalesce_version @@ -129,7 +131,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg": skip_vectorization=data.get("skip_vectorization", False), telemetry_id=data.get("telemetry_id", ""), target_uri=data.get("target_uri", ""), - lifecycle_lock_handle_id=data.get("lifecycle_lock_handle_id", ""), + lock_handoff=LockHandoffRef.from_value(data.get("lock_handoff")), is_code_repo=data.get("is_code_repo", False), coalesce_key=data.get("coalesce_key", ""), coalesce_version=data.get("coalesce_version", 0), diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 6057405ab..f41bbb158 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -31,7 +31,9 @@ from openviking.storage.queuefs.semantic_dag import DagStats, SemanticDagExecutor from openviking.storage.queuefs.semantic_msg import SemanticMsg, build_semantic_coalesce_key from openviking.storage.queuefs.semantic_queue import is_semantic_msg_stale +from openviking.storage.queuefs.semantic_lock import SemanticLockScope from openviking.storage.queuefs.semantic_sidecar import write_semantic_sidecars +from openviking.storage.transaction import NO_LOCK, LockLease from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry import bind_telemetry, bind_telemetry_stage, resolve_telemetry from openviking.telemetry.request_wait_tracker import get_request_wait_tracker @@ -271,11 +273,14 @@ async def _enqueue_parent_refresh(self, msg: SemanticMsg, uri: str) -> None: await semantic_queue.enqueue(parent_msg) logger.info("Enqueued parent semantic refresh: %s", parent_uri) - async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + async def on_dequeue( + self, + data: Optional[Dict[str, Any]], + lock: LockLease = NO_LOCK, + ) -> Optional[Dict[str, Any]]: """Process dequeued SemanticMsg, recursively process all subdirectories.""" msg: Optional[SemanticMsg] = None collector = None - release_lock_in_finally = True try: import json @@ -332,69 +337,74 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, logger.info(f"Processing semantic generation for: {msg})") - if msg.context_type == "memory": - await self._process_memory_directory(msg) - else: - is_incremental = False - target_uri = msg.target_uri - viking_fs = get_viking_fs() - if msg.target_uri: - target_exists = await viking_fs.exists( - msg.target_uri, ctx=self._current_ctx + semantic_lock = await SemanticLockScope.resolve( + msg.lock_handoff, + caller_lock=lock, + ) + lock_transferred = False + try: + if msg.context_type == "memory": + lock_transferred = True + await self._process_memory_directory( + msg, + lock=semantic_lock.lock, ) - # Check if target URI exists and is not the same as the source URI(避免重复处理) - if target_exists and msg.uri != msg.target_uri: - is_incremental = True - logger.info( - f"Target URI exists, using incremental update: {msg.target_uri}" + else: + is_incremental = False + target_uri = msg.target_uri + viking_fs = get_viking_fs() + if msg.target_uri: + target_exists = await viking_fs.exists( + msg.target_uri, ctx=self._current_ctx ) - elif target_exists and msg.changes and msg.uri == msg.target_uri: + # Check if target URI exists and is not the same as the source URI(避免重复处理) + if target_exists and msg.uri != msg.target_uri: + is_incremental = True + logger.info( + f"Target URI exists, using incremental update: {msg.target_uri}" + ) + elif target_exists and msg.changes and msg.uri == msg.target_uri: + is_incremental = True + logger.info( + f"Using direct incremental semantic update for: {msg.uri}" + ) + elif msg.changes: is_incremental = True + target_uri = msg.uri logger.info( f"Using direct incremental semantic update for: {msg.uri}" ) - elif msg.changes: - is_incremental = True - target_uri = msg.uri - logger.info(f"Using direct incremental semantic update for: {msg.uri}") - - # Re-acquire lifecycle lock if handle was lost (e.g. server restart) - if msg.lifecycle_lock_handle_id: - lock_uri = target_uri or msg.uri - msg.lifecycle_lock_handle_id = await self._ensure_lifecycle_lock( - msg.lifecycle_lock_handle_id, - viking_fs._uri_to_path(lock_uri, ctx=self._current_ctx), - ) - executor = SemanticDagExecutor( - processor=self, - context_type=msg.context_type, - max_concurrent_llm=self.max_concurrent_llm, - ctx=self._current_ctx, - incremental_update=is_incremental, - target_uri=target_uri, - semantic_msg_id=msg.id, - telemetry_id=msg.telemetry_id, - recursive=msg.recursive, - lifecycle_lock_handle_id=msg.lifecycle_lock_handle_id, - is_code_repo=msg.is_code_repo, - changes=msg.changes, - skip_vectorization=msg.skip_vectorization, - coalesce_key=msg.coalesce_key, - coalesce_version=msg.coalesce_version, - ) - self._dag_executor = executor - if msg.lifecycle_lock_handle_id: - # The DAG owns lifecycle lock release after this point. - release_lock_in_finally = False - await executor.run(msg.uri) - self._cache_dag_stats( - msg.telemetry_id, - msg.uri, - executor.get_stats(), - ) - if not executor.stale: - await self._enqueue_parent_refresh(msg, target_uri or msg.uri) + executor = SemanticDagExecutor( + processor=self, + context_type=msg.context_type, + max_concurrent_llm=self.max_concurrent_llm, + ctx=self._current_ctx, + incremental_update=is_incremental, + target_uri=target_uri, + semantic_msg_id=msg.id, + telemetry_id=msg.telemetry_id, + recursive=msg.recursive, + lock=semantic_lock.lock, + is_code_repo=msg.is_code_repo, + changes=msg.changes, + skip_vectorization=msg.skip_vectorization, + coalesce_key=msg.coalesce_key, + coalesce_version=msg.coalesce_version, + ) + self._dag_executor = executor + lock_transferred = True + await executor.run(msg.uri) + self._cache_dag_stats( + msg.telemetry_id, + msg.uri, + executor.get_stats(), + ) + if not executor.stale: + await self._enqueue_parent_refresh(msg, target_uri or msg.uri) + finally: + if not lock_transferred: + await semantic_lock.close() self._merge_request_stats(msg.telemetry_id, processed=1) logger.info(f"Completed semantic generation for: {msg.uri}") self.report_success() @@ -443,22 +453,6 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, self.report_error(str(e), data) return None finally: - # Safety net: release lifecycle lock if still held (e.g. on exception - # before the DAG executor took ownership) - if release_lock_in_finally and msg and msg.lifecycle_lock_handle_id: - try: - from openviking.storage.transaction import get_lock_manager - - lm = get_lock_manager() - handle = lm.get_handle(msg.lifecycle_lock_handle_id) - if handle: - await lm.release(handle) - logger.info( - f"[SemanticProcessor] Safety-net released lifecycle lock " - f"{msg.lifecycle_lock_handle_id}" - ) - except Exception: - pass self._current_msg = None self._current_ctx = None @@ -467,36 +461,10 @@ def get_dag_stats(self) -> Optional["DagStats"]: return None return self._dag_executor.get_stats() - @staticmethod - async def _ensure_lifecycle_lock(handle_id: str, lock_path: str) -> str: - """If the handle is missing (server restart), re-acquire a TreeLock. - - Returns the (possibly new) handle ID, or "" on failure. - """ - from openviking.storage.transaction import get_lock_manager - - lm = get_lock_manager() - if lm.get_handle(handle_id): - return handle_id - new_handle = lm.create_handle() - if await lm.acquire_tree(new_handle, lock_path): - logger.info(f"Re-acquired lifecycle lock on {lock_path} (handle {new_handle.id})") - return new_handle.id - logger.warning(f"Failed to re-acquire lifecycle lock on {lock_path}") - await lm.release(new_handle) - return "" - - async def _process_memory_directory(self, msg: SemanticMsg) -> None: - """Process a memory directory with special handling. - - For memory directories: - - Memory files are already vectorized via embedding queue - - Only generate abstract.md and overview.md - - Vectorize the generated abstract.md and overview.md - - Args: - msg: The semantic message containing directory info and changes - """ + async def _process_memory_directory( + self, msg: SemanticMsg, lock: LockLease = NO_LOCK + ) -> None: + """Process a memory directory with special handling.""" viking_fs = get_viking_fs() dir_uri = msg.uri ctx = self._current_ctx @@ -508,140 +476,133 @@ def _mark_done() -> None: request_wait_tracker.mark_semantic_done(msg.telemetry_id, msg.id) try: - entries = await viking_fs.ls(dir_uri, ctx=ctx) - except Exception as e: - if msg.lifecycle_lock_handle_id: - await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) - raise RuntimeError(f"Failed to list memory directory {dir_uri}: {e}") from e - - file_paths: List[str] = [] - for entry in entries: - name = entry.get("name", "") - if not name or name.startswith(".") or name in [".", ".."]: - continue - if not entry.get("isDir", False): - item_uri = VikingURI(dir_uri).join(name).uri - file_paths.append(item_uri) - - if not file_paths: - logger.info(f"No memory files found in {dir_uri}") - _mark_done() - if msg.lifecycle_lock_handle_id: - await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) - return - - file_summaries: List[Dict[str, str]] = [] - existing_summaries: Dict[str, str] = {} - - if msg.changes: try: - old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) - if old_overview: - existing_summaries = self._parse_overview_md(old_overview) - logger.info( - f"Parsed {len(existing_summaries)} existing summaries from overview.md" - ) + entries = await viking_fs.ls(dir_uri, ctx=ctx) except Exception as e: - logger.debug(f"No existing overview.md found for {dir_uri}: {e}") + raise RuntimeError(f"Failed to list memory directory {dir_uri}: {e}") from e - changed_files: Set[str] = set() - if msg.changes: - changed_files = set(msg.changes.get("added", []) + msg.changes.get("modified", [])) - deleted_files = set(msg.changes.get("deleted", [])) - logger.info( - f"Processing memory directory {dir_uri} with changes: " - f"added={len(msg.changes.get('added', []))}, " - f"modified={len(msg.changes.get('modified', []))}, " - f"deleted={len(deleted_files)}" - ) + file_paths: List[str] = [] + for entry in entries: + name = entry.get("name", "") + if not name or name.startswith(".") or name in [".", ".."]: + continue + if not entry.get("isDir", False): + item_uri = VikingURI(dir_uri).join(name).uri + file_paths.append(item_uri) - # Separate cached from changed files to allow concurrent VLM calls - pending_indices: List[Tuple[int, str]] = [] - file_summaries: List[Optional[Dict[str, str]]] = [None] * len(file_paths) + if not file_paths: + logger.info(f"No memory files found in {dir_uri}") + _mark_done() + return + + existing_summaries: Dict[str, str] = {} + if msg.changes: + try: + old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) + if old_overview: + existing_summaries = self._parse_overview_md(old_overview) + logger.info( + f"Parsed {len(existing_summaries)} existing summaries from overview.md" + ) + except Exception as e: + logger.debug(f"No existing overview.md found for {dir_uri}: {e}") - for idx, file_path in enumerate(file_paths): - file_name = file_path.split("/")[-1] + changed_files: Set[str] = set() + if msg.changes: + changed_files = set(msg.changes.get("added", []) + msg.changes.get("modified", [])) + deleted_files = set(msg.changes.get("deleted", [])) + logger.info( + f"Processing memory directory {dir_uri} with changes: " + f"added={len(msg.changes.get('added', []))}, " + f"modified={len(msg.changes.get('modified', []))}, " + f"deleted={len(deleted_files)}" + ) - if file_path not in changed_files and file_name in existing_summaries: - file_summaries[idx] = {"name": file_name, "summary": existing_summaries[file_name]} - logger.debug(f"Reused existing summary for {file_name}") - else: - pending_indices.append((idx, file_path)) + pending_indices: List[Tuple[int, str]] = [] + file_summaries: List[Optional[Dict[str, str]]] = [None] * len(file_paths) - if file_paths and not pending_indices: - try: - from openviking.metrics.datasources.cache import CacheEventDataSource + for idx, file_path in enumerate(file_paths): + file_name = file_path.split("/")[-1] + if file_path not in changed_files and file_name in existing_summaries: + file_summaries[idx] = { + "name": file_name, + "summary": existing_summaries[file_name], + } + logger.debug(f"Reused existing summary for {file_name}") + else: + pending_indices.append((idx, file_path)) - CacheEventDataSource.record_hit("L1") - except Exception: - pass - elif file_paths and pending_indices: - try: - from openviking.metrics.datasources.cache import CacheEventDataSource + if file_paths and not pending_indices: + try: + from openviking.metrics.datasources.cache import CacheEventDataSource - if len(file_paths) > len(pending_indices): CacheEventDataSource.record_hit("L1") - CacheEventDataSource.record_miss("L1") - except Exception: - pass + except Exception: + pass + elif file_paths and pending_indices: + try: + from openviking.metrics.datasources.cache import CacheEventDataSource - if pending_indices: - logger.info( - f"Generating summaries for {len(pending_indices)} changed files " - f"(reused {len(file_paths) - len(pending_indices)} cached)" - ) + if len(file_paths) > len(pending_indices): + CacheEventDataSource.record_hit("L1") + CacheEventDataSource.record_miss("L1") + except Exception: + pass - async def _gen(idx: int, file_path: str) -> None: - file_name = file_path.split("/")[-1] - try: - summary_dict = await self._generate_single_file_summary( - file_path, llm_sem=llm_sem, ctx=ctx - ) - file_summaries[idx] = summary_dict - logger.debug(f"Generated summary for {file_name}") - except Exception as e: - logger.warning(f"Failed to generate summary for {file_path}: {e}") - file_summaries[idx] = {"name": file_name, "summary": ""} - - # Fix for Issue #1245: Batch processing to prevent coroutine scheduling storms - # Use a reasonable batch size (min of semaphore and 10) to keep event loop responsive - batch_size = max(1, min(self.max_concurrent_llm, 10)) - for batch_start in range(0, len(pending_indices), batch_size): - batch = pending_indices[batch_start : batch_start + batch_size] + if pending_indices: logger.info( - f"[MemorySemantic] Processing batch {batch_start // batch_size + 1}/" - f"{(len(pending_indices) + batch_size - 1) // batch_size} " - f"({len(batch)} files)" + f"Generating summaries for {len(pending_indices)} changed files " + f"(reused {len(file_paths) - len(pending_indices)} cached)" ) - await asyncio.gather(*[_gen(i, fp) for i, fp in batch]) - file_summaries = [s for s in file_summaries if s is not None] + async def _gen(idx: int, file_path: str) -> None: + file_name = file_path.split("/")[-1] + try: + summary_dict = await self._generate_single_file_summary( + file_path, llm_sem=llm_sem, ctx=ctx + ) + file_summaries[idx] = summary_dict + logger.debug(f"Generated summary for {file_name}") + except Exception as e: + logger.warning(f"Failed to generate summary for {file_path}: {e}") + file_summaries[idx] = {"name": file_name, "summary": ""} - overview = await self._generate_overview(dir_uri, file_summaries, [], llm_sem=llm_sem) - abstract = self._extract_abstract_from_overview(overview) - overview, abstract = self._enforce_size_limits(overview, abstract) + batch_size = max(1, min(self.max_concurrent_llm, 10)) + for batch_start in range(0, len(pending_indices), batch_size): + batch = pending_indices[batch_start : batch_start + batch_size] + logger.info( + f"[MemorySemantic] Processing batch {batch_start // batch_size + 1}/" + f"{(len(pending_indices) + batch_size - 1) // batch_size} " + f"({len(batch)} files)" + ) + await asyncio.gather(*[_gen(i, fp) for i, fp in batch]) - try: - wrote_semantics = await self._write_memory_directory_semantics( - msg=msg, - viking_fs=viking_fs, - dir_uri=dir_uri, - overview=overview, - abstract=abstract, - ctx=ctx, + completed_summaries = [s for s in file_summaries if s is not None] + overview = await self._generate_overview( + dir_uri, completed_summaries, [], llm_sem=llm_sem ) - except Exception as e: - if msg.lifecycle_lock_handle_id: - await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) - raise RuntimeError(f"Failed to write abstract/overview for {dir_uri}: {e}") from e - if not wrote_semantics: - _mark_done() - if msg.lifecycle_lock_handle_id: - await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) - return - logger.info(f"Generated abstract.md and overview.md for {dir_uri}") + abstract = self._extract_abstract_from_overview(overview) + overview, abstract = self._enforce_size_limits(overview, abstract) + + try: + wrote_semantics = await self._write_memory_directory_semantics( + msg=msg, + viking_fs=viking_fs, + dir_uri=dir_uri, + overview=overview, + abstract=abstract, + ctx=ctx, + lock=lock, + ) + except Exception as e: + raise RuntimeError( + f"Failed to write abstract/overview for {dir_uri}: {e}" + ) from e + if not wrote_semantics: + _mark_done() + return + logger.info(f"Generated abstract.md and overview.md for {dir_uri}") - try: if msg.skip_vectorization: logger.info(f"Skipping vectorization for {dir_uri} (requested via SemanticMsg)") _mark_done() @@ -669,8 +630,7 @@ async def _on_complete() -> None: ) logger.info(f"Vectorized abstract.md and overview.md for {dir_uri}") finally: - if msg.lifecycle_lock_handle_id: - await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) + await lock.close() async def _write_memory_directory_semantics( self, @@ -681,6 +641,7 @@ async def _write_memory_directory_semantics( overview: str, abstract: str, ctx: Optional[RequestContext], + lock: LockLease = NO_LOCK, ) -> bool: return await write_semantic_sidecars( viking_fs=viking_fs, @@ -689,36 +650,21 @@ async def _write_memory_directory_semantics( abstract=abstract, ctx=ctx, is_stale=lambda: is_semantic_msg_stale(msg), - lifecycle_lock_handle_id=msg.lifecycle_lock_handle_id, + lock=lock, log_prefix="[MemorySemantic]", ) - async def _release_memory_lifecycle_lock(self, handle_id: str) -> None: - """Release a lifecycle lock held by in-place memory refresh.""" - try: - from openviking.storage.transaction import get_lock_manager - - handle = get_lock_manager().get_handle(handle_id) - if handle: - await get_lock_manager().release(handle) - except Exception as e: - logger.warning(f"[SemanticProcessor] Failed to release memory lifecycle lock: {e}") - async def _sync_topdown_recursive( self, root_uri: str, target_uri: str, ctx: Optional[RequestContext] = None, file_change_status: Optional[Dict[str, bool]] = None, - lifecycle_lock_handle_id: str = "", + lock: LockLease = NO_LOCK, ) -> DiffResult: viking_fs = get_viking_fs() diff = DiffResult() - lock_handle = None - if lifecycle_lock_handle_id: - from openviking.storage.transaction import get_lock_manager - - lock_handle = get_lock_manager().get_handle(lifecycle_lock_handle_id) + lock_handle = lock.handle async def list_children(dir_uri: str) -> Tuple[Dict[str, str], Dict[str, str]]: files: Dict[str, str] = {} diff --git a/openviking/storage/queuefs/semantic_sidecar.py b/openviking/storage/queuefs/semantic_sidecar.py index 58db8bb58..7e512f821 100644 --- a/openviking/storage/queuefs/semantic_sidecar.py +++ b/openviking/storage/queuefs/semantic_sidecar.py @@ -5,6 +5,7 @@ from typing import Any, Callable, Optional from openviking.server.identity import RequestContext +from openviking.storage.transaction import NO_LOCK, LockLease from openviking_cli.utils.logger import get_logger logger = get_logger(__name__) @@ -18,7 +19,7 @@ async def write_semantic_sidecars( abstract: str, ctx: Optional[RequestContext], is_stale: Callable[[], bool], - lifecycle_lock_handle_id: str = "", + lock: LockLease = NO_LOCK, log_prefix: str = "[Semantic]", ) -> bool: if is_stale(): @@ -33,28 +34,16 @@ async def write_semantic_sidecars( await _write_sidecars(viking_fs, dir_uri, overview, abstract, ctx) return True - handle = None - owns_handle = False - if lifecycle_lock_handle_id: - handle = lock_manager.get_handle(lifecycle_lock_handle_id) - if handle is None: - handle = lock_manager.create_handle() - owns_handle = True - lock_paths = [ viking_fs._uri_to_path(f"{dir_uri}/.overview.md", ctx=ctx), viking_fs._uri_to_path(f"{dir_uri}/.abstract.md", ctx=ctx), ] - try: - async with LockContext(lock_manager, lock_paths, lock_mode="exact", handle=handle): - if is_stale(): - logger.info("%s Skipping stale semantic write for %s", log_prefix, dir_uri) - return False - await _write_sidecars(viking_fs, dir_uri, overview, abstract, ctx) - return True - finally: - if owns_handle: - await lock_manager.release(handle) + async with LockContext(lock_manager, lock_paths, lock_mode="exact", handle=lock.handle): + if is_stale(): + logger.info("%s Skipping stale semantic write for %s", log_prefix, dir_uri) + return False + await _write_sidecars(viking_fs, dir_uri, overview, abstract, ctx) + return True async def _write_sidecars( diff --git a/openviking/storage/transaction/__init__.py b/openviking/storage/transaction/__init__.py index 6d16e6e37..2aa5a40dc 100644 --- a/openviking/storage/transaction/__init__.py +++ b/openviking/storage/transaction/__init__.py @@ -8,6 +8,13 @@ from openviking.storage.transaction.lock_context import LockContext from openviking.storage.transaction.lock_handle import LockHandle, LockOwner +from openviking.storage.transaction.lock_lease import ( + NO_LOCK, + BorrowedLockLease, + LockHandoffRef, + LockLease, + OwnedLockLease, +) from openviking.storage.transaction.lock_manager import ( LockManager, get_lock_manager, @@ -19,10 +26,15 @@ from openviking.storage.transaction.redo_log import RedoLog __all__ = [ + "BorrowedLockLease", "LockContext", "LockHandle", + "LockHandoffRef", + "LockLease", "LockManager", "LockOwner", + "NO_LOCK", + "OwnedLockLease", "PathLockEngine", "RedoLog", "get_lock_manager", diff --git a/openviking/storage/transaction/lock_context.py b/openviking/storage/transaction/lock_context.py index ec4a561fa..05212ddf0 100644 --- a/openviking/storage/transaction/lock_context.py +++ b/openviking/storage/transaction/lock_context.py @@ -6,6 +6,7 @@ from openviking.storage.errors import LockAcquisitionError from openviking.storage.transaction.lock_handle import LockHandle +from openviking.storage.transaction.lock_lease import OwnedLockLease from openviking.storage.transaction.lock_manager import LockManager @@ -34,6 +35,7 @@ def __init__( self._owns_handle = handle is None self._locks_before: list[str] = [] self._acquired_lock_paths: list[str] = [] + self._owned_lease: Optional[OwnedLockLease] = None async def __aenter__(self) -> LockHandle: if self._handle is None: @@ -72,12 +74,17 @@ async def __aenter__(self) -> LockHandle: raise LockAcquisitionError( f"Failed to acquire {self._lock_mode} lock for {self._paths}" ) + if self._owns_handle and self._handle.locks: + self._owned_lease = OwnedLockLease.from_handle(self._manager, self._handle) return self._handle async def __aexit__(self, exc_type, exc_val, exc_tb): if self._handle: if self._owns_handle: - await self._manager.release(self._handle) + if self._owned_lease: + await self._owned_lease.close() + else: + await self._manager.release(self._handle) else: await self._manager.release_selected(self._handle, self._acquired_lock_paths) return False diff --git a/openviking/storage/transaction/lock_lease.py b/openviking/storage/transaction/lock_lease.py new file mode 100644 index 000000000..021ba9af5 --- /dev/null +++ b/openviking/storage/transaction/lock_lease.py @@ -0,0 +1,225 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Typed lock ownership helpers for path-lock users.""" + +from __future__ import annotations + +import asyncio +from contextlib import suppress +from dataclasses import dataclass +from typing import Any, Iterable, Optional + +from openviking.storage.errors import LockAcquisitionError +from openviking.storage.transaction.lock_handle import LockHandle +from openviking.storage.transaction.lock_manager import LockManager, get_lock_manager +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +@dataclass(frozen=True) +class LockHandoffRef: + """Serializable reference used to hand lock ownership through a queue.""" + + handle_id: str + lock_paths: tuple[str, ...] = () + + @classmethod + def from_value(cls, value: Any) -> Optional["LockHandoffRef"]: + if value is None: + return None + if isinstance(value, LockHandoffRef): + return value + if not isinstance(value, dict): + raise ValueError("lock_handoff must be an object") + + handle_id = value.get("handle_id") + if not handle_id: + return None + lock_paths = tuple(str(path) for path in value.get("lock_paths", []) if path) + return cls(handle_id=str(handle_id), lock_paths=lock_paths) + + def to_dict(self) -> dict[str, Any]: + return {"handle_id": self.handle_id, "lock_paths": list(self.lock_paths)} + + +class LockLease: + """Base lock lease interface.""" + + @property + def handle(self) -> Optional[LockHandle]: + return None + + @property + def handle_id(self) -> str: + handle = self.handle + return handle.id if handle else "" + + @property + def active(self) -> bool: + handle = self.handle + return bool(handle and handle.locks) + + def as_borrowed(self) -> "LockLease": + return self + + def to_handoff(self) -> Optional[LockHandoffRef]: + return None + + async def close(self) -> None: + return + + async def handoff(self) -> None: + return + + +class _NoLockLease(LockLease): + pass + + +NO_LOCK: LockLease = _NoLockLease() + + +@dataclass +class BorrowedLockLease(LockLease): + """A lock lease borrowed from an outer owner.""" + + manager: LockManager + _handle: LockHandle + + @classmethod + def from_handle(cls, manager: LockManager, handle: LockHandle) -> "BorrowedLockLease": + return cls(manager=manager, _handle=handle) + + @property + def handle(self) -> Optional[LockHandle]: + return self.manager.get_handle(self._handle.id) + + def as_borrowed(self) -> "LockLease": + return self + + +class OwnedLockLease(LockLease): + """An owned lock lease that refreshes and releases its handle.""" + + def __init__( + self, + manager: LockManager, + handle: LockHandle, + *, + start_refresh: bool = True, + ): + self._manager = manager + self._handle: Optional[LockHandle] = handle + self._refresh_task: Optional[asyncio.Task] = None + if start_refresh and handle.locks: + self._start_refresh() + + @classmethod + def from_handle(cls, manager: LockManager, handle: LockHandle) -> "OwnedLockLease": + return cls(manager, handle) + + @classmethod + async def from_handoff( + cls, + ref: LockHandoffRef, + manager: Optional[LockManager] = None, + ) -> "OwnedLockLease": + manager = manager or get_lock_manager() + handle = manager.get_handle(ref.handle_id) + if handle is None: + handle = manager.adopt_handle(ref.handle_id, ref.lock_paths) + if handle is None: + raise LockAcquisitionError(f"Lock handle is no longer active: {ref.handle_id}") + return cls(manager, handle) + + @classmethod + async def acquire_tree( + cls, + manager: LockManager, + path: str, + *, + timeout: Optional[float] = None, + ) -> "OwnedLockLease": + handle = manager.create_handle() + if await manager.acquire_tree(handle, path, timeout=timeout): + return cls(manager, handle) + await manager.release(handle) + raise LockAcquisitionError(f"Failed to acquire tree lock for {path}") + + @classmethod + async def acquire_exact_paths( + cls, + manager: LockManager, + paths: Iterable[str], + *, + timeout: Optional[float] = None, + ) -> "OwnedLockLease": + handle = manager.create_handle() + path_list = list(paths) + if await manager.acquire_exact_path_batch(handle, path_list, timeout=timeout): + return cls(manager, handle) + await manager.release(handle) + raise LockAcquisitionError(f"Failed to acquire exact lock for {path_list}") + + @property + def handle(self) -> Optional[LockHandle]: + if self._handle is None: + return None + return self._manager.get_handle(self._handle.id) + + def as_borrowed(self) -> LockLease: + handle = self.handle + if handle is None: + return NO_LOCK + return BorrowedLockLease.from_handle(self._manager, handle) + + def to_handoff(self) -> Optional[LockHandoffRef]: + handle = self.handle + if handle is None: + return None + return LockHandoffRef(handle_id=handle.id, lock_paths=tuple(handle.locks)) + + async def close(self) -> None: + await self._stop_refresh() + handle = self.handle or self._handle + self._handle = None + if handle is not None: + await self._manager.release(handle) + + async def handoff(self) -> None: + """Stop managing this lease after another worker has received its handle.""" + await self._stop_refresh() + self._handle = None + + def _start_refresh(self) -> None: + if self._refresh_task is not None: + return + self._refresh_task = asyncio.create_task(self._refresh_loop()) + + async def _refresh_loop(self) -> None: + try: + expire = self._manager._path_lock._lock_expire + interval = max(0.1, expire / 2) + except Exception: + interval = 150.0 + + while True: + try: + await asyncio.sleep(interval) + handle = self.handle + if handle is None: + return + await self._manager.refresh_lock(handle) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.warning("Failed to refresh lock handle %s: %s", self.handle_id, exc) + + async def _stop_refresh(self) -> None: + if self._refresh_task is None: + return + self._refresh_task.cancel() + with suppress(asyncio.CancelledError): + await self._refresh_task + self._refresh_task = None diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index f8338bab6..8eccd14ab 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -278,6 +278,22 @@ def get_handle(self, handle_id: str) -> Optional[LockHandle]: return None return current + def adopt_handle(self, handle_id: str, lock_paths: List[str]) -> Optional[LockHandle]: + handle = self.get_handle(handle_id) + if handle is not None: + return handle + + adopted = LockHandle(id=handle_id) + for lock_path in dict.fromkeys(lock_paths): + if self._path_lock.is_lock_owned_by(lock_path, handle_id): + adopted.add_lock(lock_path) + if not adopted.locks: + return None + + self._handles[adopted.id] = adopted + self._mark_handle_active(adopted) + return adopted + def is_path_locked(self, path: str, ignore_stale: bool = True) -> bool: """Check whether *path* is currently locked. diff --git a/openviking/utils/resource_processor.py b/openviking/utils/resource_processor.py index d92431826..2023e2d2b 100644 --- a/openviking/utils/resource_processor.py +++ b/openviking/utils/resource_processor.py @@ -13,7 +13,9 @@ from openviking.parse.tree_builder import TreeBuilder from openviking.server.identity import RequestContext +from openviking.storage.errors import LockAcquisitionError from openviking.storage import VikingDBManager +from openviking.storage.transaction import NO_LOCK, LockLease, OwnedLockLease from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry import get_current_telemetry from openviking.utils.embedding_utils import index_resource @@ -247,12 +249,12 @@ async def process_resource( except Exception: pass - # ============ Phase 3.5: Source commit + lifecycle lock ============ + # ============ Phase 3.5: Source commit + resource lock ============ root_uri = result.get("root_uri") temp_uri = result.get("temp_uri") # temp_doc_uri original_temp_uri = temp_uri # 保存原始 temp_uri 用于最终输出 candidate_uri = getattr(context_tree, "_candidate_uri", None) if context_tree else None - lifecycle_lock_handle_id = "" + resource_lock: LockLease = NO_LOCK if root_uri and temp_uri: from openviking.storage.transaction import get_lock_manager @@ -263,22 +265,16 @@ async def process_resource( lock_manager = get_lock_manager() try: if candidate_uri: - root_uri, lifecycle_lock_handle_id = await self._commit_unique_candidate( + root_uri, resource_lock = await self._commit_unique_candidate( candidate_uri=candidate_uri, ctx=ctx, ) result["root_uri"] = root_uri else: dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx) - handle = lock_manager.create_handle() - try: - lifecycle_lock_handle_id = await self._acquire_lifecycle_lock( - lock_manager, dst_path, uri=root_uri, handle=handle - ) - except Exception: - await lock_manager.release(handle) - lifecycle_lock_handle_id = "" - raise + resource_lock = await self._acquire_resource_lock( + lock_manager, dst_path, uri=root_uri + ) except Exception: stage_status = "error" raise @@ -304,15 +300,22 @@ async def process_resource( stage_start = time.perf_counter() stage_status = "ok" with telemetry.measure("resource.summarize"): - await self._get_summarizer().summarize( + summary_result = await self._get_summarizer().summarize( resource_uris=[result["root_uri"]], ctx=ctx, skip_vectorization=skip_vec, - lifecycle_lock_handle_id=lifecycle_lock_handle_id, + lock=resource_lock, temp_uris=[temp_uri_for_summarize], is_code_repo=is_code_repo, **kwargs, ) + if ( + resource_lock.active + and summary_result.get("status") == "success" + and summary_result.get("enqueued_count", 0) > 0 + ): + await resource_lock.handoff() + resource_lock = NO_LOCK except Exception as e: logger.error(f"Summarization failed: {e}") result["warnings"] = result.get("warnings", []) + [f"Summarization failed: {e}"] @@ -327,28 +330,23 @@ async def process_resource( ) except Exception: pass - elif lifecycle_lock_handle_id: - # No downstream worker will sync temp content or release the lifecycle lock. - from openviking.storage.transaction import get_lock_manager - lock_manager = get_lock_manager() - handle = lock_manager.get_handle(lifecycle_lock_handle_id) - if handle: - if temp_uri: - from openviking.pyagfs.helpers import cp as agfs_cp - - viking_fs = get_viking_fs() - src_path = viking_fs._uri_to_path(temp_uri, ctx=ctx) - dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx) - await asyncio.to_thread( - agfs_cp, - viking_fs.agfs, - src_path, - dst_path, - recursive=True, - ) - await viking_fs.delete_temp(parse_result.temp_dir_path, ctx=ctx) - await lock_manager.release(handle) + if resource_lock.active: + if not should_summarize and temp_uri: + from openviking.pyagfs.helpers import cp as agfs_cp + + viking_fs = get_viking_fs() + src_path = viking_fs._uri_to_path(temp_uri, ctx=ctx) + dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx) + await asyncio.to_thread( + agfs_cp, + viking_fs.agfs, + src_path, + dst_path, + recursive=True, + ) + await viking_fs.delete_temp(parse_result.temp_dir_path, ctx=ctx) + await resource_lock.close() # 恢复原始 temp_uri 用于输出 if original_temp_uri is not None: @@ -362,8 +360,8 @@ async def _commit_unique_candidate( candidate_uri: str, ctx: RequestContext, max_attempts: int = 100, - ) -> tuple[str, str]: - """Pick the first free candidate URI and reserve it with a lifecycle TreeLock.""" + ) -> tuple[str, OwnedLockLease]: + """Pick the first free candidate URI and reserve it with a resource TreeLock.""" from openviking.storage.errors import ResourceBusyError from openviking.storage.transaction import get_lock_manager @@ -376,46 +374,36 @@ async def _commit_unique_candidate( continue dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx) - handle = lock_manager.create_handle() try: - lifecycle_lock_handle_id = await self._acquire_lifecycle_lock( - lock_manager, dst_path, uri=root_uri, handle=handle, timeout=0.0 + resource_lock = await self._acquire_resource_lock( + lock_manager, dst_path, uri=root_uri, timeout=0.0 ) - return root_uri, lifecycle_lock_handle_id + return root_uri, resource_lock except ResourceBusyError: - await lock_manager.release(handle) continue - except Exception: - await lock_manager.release(handle) - raise raise FileExistsError( f"Cannot resolve unique name for {candidate_uri} after {max_attempts} attempts" ) @staticmethod - async def _acquire_lifecycle_lock( + async def _acquire_resource_lock( lock_manager, path: str, *, uri: str = "", - handle=None, timeout: Optional[float] = None, - ) -> str: - """Acquire per-resource TreeLock lifecycle lock or raise a structured conflict.""" + ) -> OwnedLockLease: + """Acquire the per-resource TreeLock or raise a structured conflict.""" from openviking.storage.errors import ResourceBusyError - owns_handle = handle is None - if handle is None: - handle = lock_manager.create_handle() - if await lock_manager.acquire_tree(handle, path, timeout=timeout): - return handle.id - if owns_handle: - await lock_manager.release(handle) - logger.warning(f"[ResourceProcessor] Failed to acquire lifecycle lock on {path}") - raise ResourceBusyError( - f"Resource is busy: {uri or path}", - uri=uri or path, - conflict_type="path_busy", - retryable=True, - ) + try: + return await OwnedLockLease.acquire_tree(lock_manager, path, timeout=timeout) + except LockAcquisitionError as exc: + logger.warning(f"[ResourceProcessor] Failed to acquire resource lock on {path}") + raise ResourceBusyError( + f"Resource is busy: {uri or path}", + uri=uri or path, + conflict_type="path_busy", + retryable=True, + ) from exc diff --git a/openviking/utils/summarizer.py b/openviking/utils/summarizer.py index 72b47787e..2a073588b 100644 --- a/openviking/utils/summarizer.py +++ b/openviking/utils/summarizer.py @@ -9,6 +9,7 @@ from openviking.core.namespace import context_type_for_uri from openviking.storage.queuefs import SemanticMsg, get_queue_manager +from openviking.storage.transaction import NO_LOCK, LockLease from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry import get_current_telemetry from openviking.telemetry.request_wait_tracker import get_request_wait_tracker @@ -35,7 +36,7 @@ async def summarize( resource_uris: List[str], ctx: "RequestContext", skip_vectorization: bool = False, - lifecycle_lock_handle_id: str = "", + lock: LockLease = NO_LOCK, **kwargs, ) -> Dict[str, Any]: """ @@ -59,6 +60,7 @@ async def summarize( enqueued_count = 0 telemetry = get_current_telemetry() + lock_handoff = lock.to_handoff() def is_resources_root(uri: str) -> bool: return (uri or "").rstrip("/") == "viking://resources" @@ -80,7 +82,7 @@ async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]: context_type = context_type_for_uri(uri) enqueue_units: List[Tuple[str, str]] = [] - if is_resources_root(uri) and uri != temp_uri: + if is_resources_root(uri) and uri != temp_uri and lock_handoff is None: children = await list_top_children(temp_uri) if not children: return { @@ -104,7 +106,7 @@ async def list_top_children(temp_uri: str) -> List[Tuple[str, str]]: skip_vectorization=skip_vectorization, telemetry_id=telemetry.telemetry_id, target_uri=target_uri if target_uri != source_uri else None, - lifecycle_lock_handle_id=lifecycle_lock_handle_id, + lock_handoff=lock_handoff, is_code_repo=kwargs.get("is_code_repo", False), ) if msg.telemetry_id: diff --git a/tests/client/test_file_operations.py b/tests/client/test_file_operations.py index 316f460ce..d292911ea 100644 --- a/tests/client/test_file_operations.py +++ b/tests/client/test_file_operations.py @@ -37,7 +37,7 @@ async def test_rm_directory_recursive(self, client: AsyncOpenViking, sample_dire for f in sample_directory.glob("**/*.txt"): await client.add_resource(path=str(f), reason="Test rm dir") - # Release lifecycle locks held by add_resource before rm + # Release resource locks held by add_resource before rm await release_all_locks() entries = await client.ls("viking://resources/") for data in entries: diff --git a/tests/server/test_admin_rebuild_api.py b/tests/server/test_admin_rebuild_api.py index 313830eb5..d1a46a911 100644 --- a/tests/server/test_admin_rebuild_api.py +++ b/tests/server/test_admin_rebuild_api.py @@ -21,6 +21,12 @@ } +def _make_reindex_run(ctx, counters): + from openviking.service.reindex_executor import _ReindexRunContext + + return _ReindexRunContext(ctx=ctx, counters=counters) + + async def test_reindex_requires_admin_role(admin_client: httpx.AsyncClient): resp = await admin_client.post( "/api/v1/content/reindex", @@ -167,7 +173,7 @@ async def test_reindex_memory_semantic_and_vectors_rebuilds_full_subtree(monkeyp seen = {"semantic": [], "vectors": []} - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic"].append((uri, context_type)) async def fake_reindex_memory_vectors(self, *, uri, counters, ctx): @@ -186,8 +192,7 @@ async def fake_reindex_memory_vectors(self, *, uri, counters, ctx): await service._reindex_memory( uri="viking://user/default/memories", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["semantic"] == [("viking://user/default/memories", "memory")] @@ -291,10 +296,10 @@ async def tree( seen = {"memory_modes": [], "semantic_calls": [], "resource_calls": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory_modes"].append((uri, mode)) - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -321,8 +326,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_user_namespace( uri="viking://user/default", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory_modes"] == [("viking://user/default/memories", "semantic_and_vectors")] @@ -354,10 +358,10 @@ async def tree( seen = {"memory_modes": [], "semantic_calls": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory_modes"].append((uri, mode)) - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -384,8 +388,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_user_namespace( uri="viking://user/default", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory_modes"] == [("viking://user/default/memories", "semantic_and_vectors")] @@ -416,7 +419,7 @@ async def tree( seen = {"semantic_calls": [], "resource_files": []} - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -442,8 +445,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_user_namespace( uri="viking://user/default", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["semantic_calls"] == [("viking://user/default/resources", "resource")] @@ -476,13 +478,13 @@ async def tree( seen = {"memory_modes": [], "skill_modes": [], "semantic_calls": [], "resource_calls": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory_modes"].append((uri, mode)) - async def fake_reindex_skill(self, *, uri, mode, counters, ctx): + async def fake_reindex_skill(self, *, uri, mode, run): seen["skill_modes"].append((uri, mode)) - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -510,8 +512,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_agent_namespace( uri="viking://agent/default", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory_modes"] == [("viking://agent/default/memories", "semantic_and_vectors")] @@ -545,13 +546,13 @@ async def tree( seen = {"memory_modes": [], "skill_modes": [], "semantic_calls": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory_modes"].append((uri, mode)) - async def fake_reindex_skill(self, *, uri, mode, counters, ctx): + async def fake_reindex_skill(self, *, uri, mode, run): seen["skill_modes"].append((uri, mode)) - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -579,8 +580,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_agent_namespace( uri="viking://agent/default", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory_modes"] == [("viking://agent/default/memories", "semantic_and_vectors")] @@ -612,7 +612,7 @@ async def tree( seen = [] - async def fake_reindex_skill(self, *, uri, mode, counters, ctx): + async def fake_reindex_skill(self, *, uri, mode, run): seen.append((uri, mode)) monkeypatch.setattr("openviking.service.reindex_executor.get_viking_fs", lambda: FakeVikingFS()) @@ -628,8 +628,7 @@ async def fake_reindex_skill(self, *, uri, mode, counters, ctx): await service._reindex_skill_namespace( uri="viking://agent/default/skills", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen == [("viking://agent/default/skills/my_skill", "semantic_and_vectors")] @@ -659,13 +658,13 @@ async def tree( seen = {"user_modes": [], "agent_modes": [], "semantic_calls": [], "resource_calls": []} - async def fake_reindex_user_namespace(self, *, uri, mode, counters, ctx): + async def fake_reindex_user_namespace(self, *, uri, mode, run): seen["user_modes"].append((uri, mode)) - async def fake_reindex_agent_namespace(self, *, uri, mode, counters, ctx): + async def fake_reindex_agent_namespace(self, *, uri, mode, run): seen["agent_modes"].append((uri, mode)) - async def fake_run_semantic_processor(self, *, uri, context_type, ctx): + async def fake_run_semantic_processor(self, *, uri, context_type, ctx, lock=None): seen["semantic_calls"].append((uri, context_type)) async def fake_reindex_resource_vectors_from_entries( @@ -693,8 +692,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_global_namespace( uri="viking://", mode="semantic_and_vectors", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["user_modes"] == [("viking://user/default", "semantic_and_vectors")] @@ -826,7 +824,7 @@ async def test_reindex_semantic_processor_runs_with_skip_vectorization(monkeypat seen = {} class FakeSemanticProcessor: - async def on_dequeue(self, payload): + async def on_dequeue(self, payload, lock=None): seen["payload"] = payload monkeypatch.setattr( @@ -1325,7 +1323,7 @@ async def tree( seen = {"memory": [], "resource_dirs": [], "resource_files": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory"].append((uri, mode)) async def fake_reindex_resource_vectors_from_entries( @@ -1358,8 +1356,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_user_namespace( uri="viking://user/", mode="vectors_only", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory"] == [("viking://user/default/memories", "vectors_only")] @@ -1399,10 +1396,10 @@ async def tree( seen = {"memory": [], "skill": [], "resource_dirs": [], "resource_files": []} - async def fake_reindex_memory(self, *, uri, mode, counters, ctx): + async def fake_reindex_memory(self, *, uri, mode, run): seen["memory"].append((uri, mode)) - async def fake_reindex_skill(self, *, uri, mode, counters, ctx): + async def fake_reindex_skill(self, *, uri, mode, run): seen["skill"].append((uri, mode)) async def fake_reindex_resource_vectors_from_entries( @@ -1436,8 +1433,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_agent_namespace( uri="viking://agent/", mode="vectors_only", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["memory"] == [("viking://agent/default/memories", "vectors_only")] @@ -1482,10 +1478,10 @@ async def tree( seen = {"user": [], "agent": [], "resource_dirs": [], "resource_files": []} - async def fake_reindex_user_namespace(self, *, uri, mode, counters, ctx): + async def fake_reindex_user_namespace(self, *, uri, mode, run): seen["user"].append((uri, mode)) - async def fake_reindex_agent_namespace(self, *, uri, mode, counters, ctx): + async def fake_reindex_agent_namespace(self, *, uri, mode, run): seen["agent"].append((uri, mode)) async def fake_reindex_resource_vectors_from_entries( @@ -1519,8 +1515,7 @@ async def fake_reindex_resource_vectors_from_entries( await service._reindex_global_namespace( uri="viking://", mode="vectors_only", - counters=counters, - ctx=ctx, + run=_make_reindex_run(ctx, counters), ) assert seen["user"] == [("viking://user/default", "vectors_only")] diff --git a/tests/server/test_content_write_service.py b/tests/server/test_content_write_service.py index 97011faaa..bb70021cd 100644 --- a/tests/server/test_content_write_service.py +++ b/tests/server/test_content_write_service.py @@ -274,14 +274,13 @@ async def test_resource_write_semantic_refresh_uses_coalesce_key(monkeypatch): changed_uri=file_uri, context_type="resource", ctx=ctx, - lifecycle_lock_handle_id="lock-1", ) assert len(queue.messages) == 1 assert queue.messages[0].coalesce_key == ( "resource|default|default|default|viking://resources/demo" ) - assert queue.messages[0].lifecycle_lock_handle_id == "lock-1" + assert queue.messages[0].lock_handoff is None @pytest.mark.asyncio diff --git a/tests/storage/test_memory_semantic_stall.py b/tests/storage/test_memory_semantic_stall.py index 02949785a..502b086e2 100644 --- a/tests/storage/test_memory_semantic_stall.py +++ b/tests/storage/test_memory_semantic_stall.py @@ -28,7 +28,6 @@ def _make_msg(uri="viking://user/memories", context_type="memory", **kwargs): "agent_id": "test-agent", "telemetry_id": "", "target_uri": "", - "lifecycle_lock_handle_id": "", "changes": None, "is_code_repo": False, } diff --git a/tests/storage/test_semantic_dag_incremental_missing_summary.py b/tests/storage/test_semantic_dag_incremental_missing_summary.py index 88ec20a9f..beb989290 100644 --- a/tests/storage/test_semantic_dag_incremental_missing_summary.py +++ b/tests/storage/test_semantic_dag_incremental_missing_summary.py @@ -93,7 +93,7 @@ def _enforce_size_limits(self, overview, abstract): return overview, abstract async def _sync_topdown_recursive( - self, root_uri, target_uri, ctx=None, file_change_status=None, lifecycle_lock_handle_id="" + self, root_uri, target_uri, ctx=None, file_change_status=None, lock=None ): self.sync_calls.append((root_uri, target_uri)) root_uri = self._fs._norm(root_uri) diff --git a/tests/storage/test_semantic_processor_lifecycle_lock.py b/tests/storage/test_semantic_processor_lock_ownership.py similarity index 85% rename from tests/storage/test_semantic_processor_lifecycle_lock.py rename to tests/storage/test_semantic_processor_lock_ownership.py index b1cc09f79..95c2a652b 100644 --- a/tests/storage/test_semantic_processor_lifecycle_lock.py +++ b/tests/storage/test_semantic_processor_lock_ownership.py @@ -8,11 +8,13 @@ from openviking.storage.queuefs.semantic_dag import DagStats from openviking.storage.queuefs.semantic_msg import SemanticMsg from openviking.storage.queuefs.semantic_processor import SemanticProcessor +from openviking.storage.transaction import BorrowedLockLease class _FakeHandle: def __init__(self, handle_id: str): self.id = handle_id + self.locks = ["/fake/root/.path.ovlock"] class _FakeLockManager: @@ -48,17 +50,17 @@ def _uri_to_path(self, uri, ctx=None): @pytest.mark.asyncio -async def test_semantic_processor_does_not_release_lock_owned_by_dag(monkeypatch): +async def test_semantic_processor_borrows_caller_owned_lock(monkeypatch): processor = SemanticProcessor() lock_manager = _FakeLockManager() class _FakeDagExecutor: def __init__(self, **kwargs): - self.lifecycle_lock_handle_id = kwargs.get("lifecycle_lock_handle_id", "") + self.lock = kwargs["lock"] async def run(self, root_uri): assert root_uri == "viking://resources/demo" - assert self.lifecycle_lock_handle_id == "lock-1" + assert self.lock.handle_id == "lock-1" def get_stats(self): return DagStats() @@ -81,8 +83,8 @@ def get_stats(self): uri="viking://resources/demo", context_type="resource", recursive=False, - lifecycle_lock_handle_id="lock-1", - ).to_dict() + ).to_dict(), + lock=BorrowedLockLease.from_handle(lock_manager, lock_manager.get_handle("lock-1")), ) assert lock_manager.release_calls == []