Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 181 additions & 85 deletions openviking/service/reindex_executor.py

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions openviking/storage/content_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ async def _write_direct_with_refresh(
changed_uri=uri,
context_type=context_type,
ctx=ctx,
lifecycle_lock_handle_id="",
change_type="added" if mode == "create" else "modified",
)
semantic_enqueued = True
Expand Down Expand Up @@ -386,7 +385,6 @@ async def _enqueue_semantic_refresh(
changed_uri: str,
context_type: str,
ctx: RequestContext,
lifecycle_lock_handle_id: str,
change_type: str = "modified",
target_uri: str = "",
) -> None:
Expand All @@ -403,7 +401,6 @@ async def _enqueue_semantic_refresh(
role=ctx.role.value,
skip_vectorization=False,
telemetry_id=telemetry.telemetry_id,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
coalesce_key=(
build_semantic_coalesce_key(
context_type=context_type,
Expand Down Expand Up @@ -432,7 +429,6 @@ async def _enqueue_memory_refresh(
root_uri: str,
modified_uri: str,
ctx: RequestContext,
lifecycle_lock_handle_id: str,
) -> None:
queue_manager = get_queue_manager()
semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC, allow_create=True)
Expand All @@ -446,7 +442,6 @@ async def _enqueue_memory_refresh(
role=ctx.role.value,
skip_vectorization=False,
telemetry_id=telemetry.telemetry_id,
lifecycle_lock_handle_id=lifecycle_lock_handle_id,
coalesce_key=build_semantic_coalesce_key(
context_type="memory",
uri=root_uri,
Expand Down Expand Up @@ -563,7 +558,6 @@ async def _write_memory_with_refresh(
root_uri=root_uri,
modified_uri=uri,
ctx=ctx,
lifecycle_lock_handle_id="",
)
await lock_manager.release(handle)
released = True
Expand Down
62 changes: 9 additions & 53 deletions openviking/storage/queuefs/semantic_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from openviking.server.identity import RequestContext
from openviking.storage.queuefs.semantic_sidecar import write_semantic_sidecars
from openviking.storage.transaction import NO_LOCK, LockLease
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
from openviking_cli.utils import VikingURI
Expand Down Expand Up @@ -79,7 +80,7 @@ def __init__(
semantic_msg_id: Optional[str] = None,
telemetry_id: str = "",
recursive: bool = True,
lifecycle_lock_handle_id: str = "",
lock: LockLease = NO_LOCK,
is_code_repo: bool = False,
changes: Optional[Dict[str, List[str]]] = None,
skip_vectorization: bool = False,
Expand All @@ -95,7 +96,7 @@ def __init__(
self._semantic_msg_id = semantic_msg_id
self._telemetry_id = telemetry_id
self._recursive = recursive
self._lifecycle_lock_handle_id = lifecycle_lock_handle_id
self._lock = lock
self._is_code_repo = is_code_repo
self._changes = changes or {}
self._skip_vectorization = skip_vectorization
Expand All @@ -119,7 +120,6 @@ def __init__(
self._dir_change_status: Dict[str, bool] = {}
self._overview_cache: Dict[str, Dict[str, str]] = {}
self._overview_cache_lock = asyncio.Lock()
self._refresh_task: Optional[asyncio.Task] = None

def _create_on_complete_callback(self) -> Callable[[], Awaitable[None]]:
"""Create on_complete callback for incremental update or full update."""
Expand All @@ -144,7 +144,7 @@ async def sync_diff_callback() -> None:
self._target_uri,
ctx=self._ctx,
file_change_status=self._file_change_status,
lifecycle_lock_handle_id=self._lifecycle_lock_handle_id,
lock=self._lock,
)
logger.info(
f"[SyncDiff] Diff computed: "
Expand All @@ -169,20 +169,16 @@ async def run(self, root_uri: str) -> None:
self._root_uri = root_uri
self._root_done = asyncio.Event()

# Start lifecycle lock refresh loop if we hold a lock
if self._lifecycle_lock_handle_id:
self._refresh_task = asyncio.create_task(self._lock_refresh_loop())

try:
await self._dispatch_dir(root_uri, parent_uri=None)
await self._root_done.wait()
except Exception:
await self._release_lifecycle_lock()
await self._lock.close()
raise

original_on_complete = self._create_on_complete_callback()

# Wrap on_complete to release lifecycle lock after all processing
# Release owned semantic locks after downstream vectorization finishes.
async def wrapped_on_complete() -> None:
try:
if original_on_complete:
Expand All @@ -192,7 +188,7 @@ async def wrapped_on_complete() -> None:
self._telemetry_id, self._semantic_msg_id
)
finally:
await self._release_lifecycle_lock()
await self._lock.close()

async with self._vectorize_lock:
task_count = self._vectorize_task_count
Expand Down Expand Up @@ -611,7 +607,7 @@ async def _write_directory_semantics(
abstract=abstract,
ctx=self._ctx,
is_stale=self._is_stale,
lifecycle_lock_handle_id=self._lifecycle_lock_handle_id,
lock=self._lock,
log_prefix="[SemanticDag]",
)
if not wrote:
Expand Down Expand Up @@ -647,7 +643,7 @@ async def _overview_task(self, dir_uri: str) -> None:
abstract = self._processor._extract_abstract_from_overview(overview)
overview, abstract = self._processor._enforce_size_limits(overview, abstract)

# Write directlyprotected by the outer lifecycle tree lock
# Write directly, protected by the outer semantic lock.
try:
wrote = await self._write_directory_semantics(dir_uri, overview, abstract)
if not wrote:
Expand Down Expand Up @@ -701,46 +697,6 @@ async def _add_vectorize_task(self, task: VectorizeTask) -> None:
else: # directory
self._vectorize_task_count += 2

async def _lock_refresh_loop(self) -> None:
"""Periodically refresh lifecycle lock to prevent stale expiry."""
from openviking.storage.transaction import get_lock_manager

try:
interval = get_lock_manager()._path_lock._lock_expire / 2
except Exception:
interval = 150.0

while True:
try:
await asyncio.sleep(interval)
handle = get_lock_manager().get_handle(self._lifecycle_lock_handle_id)
if handle:
await get_lock_manager().refresh_lock(handle)
else:
break
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"[SemanticDag] Lock refresh failed: {e}")

async def _release_lifecycle_lock(self) -> None:
"""Stop refresh loop and release lifecycle lock."""
if self._refresh_task and not self._refresh_task.done():
self._refresh_task.cancel()
self._refresh_task = None
if not self._lifecycle_lock_handle_id:
return
handle_id = self._lifecycle_lock_handle_id
self._lifecycle_lock_handle_id = ""
try:
from openviking.storage.transaction import get_lock_manager

handle = get_lock_manager().get_handle(handle_id)
if handle:
await get_lock_manager().release(handle)
except Exception as e:
logger.warning(f"[SemanticDag] Failed to release lifecycle lock {handle_id}: {e}")

def get_stats(self) -> DagStats:
return DagStats(
total_nodes=self._stats.total_nodes,
Expand Down
37 changes: 37 additions & 0 deletions openviking/storage/queuefs/semantic_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
"""Semantic queue lock resolution."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

from openviking.storage.transaction import NO_LOCK, LockHandoffRef, LockLease, OwnedLockLease


@dataclass
class SemanticLockScope:
"""Resolved lock scope for one semantic message."""

lock: LockLease

@classmethod
async def resolve(
cls,
lock_handoff: Optional[LockHandoffRef],
*,
caller_lock: LockLease = NO_LOCK,
) -> "SemanticLockScope":
if lock_handoff and caller_lock.active:
raise ValueError("semantic lock must come from either message or caller, not both")
if caller_lock is not NO_LOCK and not caller_lock.active:
raise ValueError("caller semantic lock is inactive")
if caller_lock.active:
return cls(caller_lock.as_borrowed())
if lock_handoff:
return cls(await OwnedLockLease.from_handoff(lock_handoff))
return cls(NO_LOCK)

async def close(self) -> None:
await self.lock.close()
10 changes: 6 additions & 4 deletions openviking/storage/queuefs/semantic_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import Any, Dict, List, Optional
from uuid import uuid4

from openviking.storage.transaction import LockHandoffRef


def build_semantic_coalesce_key(
*,
Expand Down Expand Up @@ -50,7 +52,7 @@ class SemanticMsg:
skip_vectorization: bool = False
telemetry_id: str = ""
target_uri: str = ""
lifecycle_lock_handle_id: str = ""
lock_handoff: Optional[LockHandoffRef] = None
is_code_repo: bool = False
coalesce_key: str = ""
coalesce_version: int = 0
Expand All @@ -70,7 +72,7 @@ def __init__(
skip_vectorization: bool = False,
telemetry_id: str = "",
target_uri: str = "",
lifecycle_lock_handle_id: str = "",
lock_handoff: Optional[LockHandoffRef] = None,
is_code_repo: bool = False,
coalesce_key: str = "",
coalesce_version: int = 0,
Expand All @@ -87,7 +89,7 @@ def __init__(
self.skip_vectorization = skip_vectorization
self.telemetry_id = telemetry_id
self.target_uri = target_uri
self.lifecycle_lock_handle_id = lifecycle_lock_handle_id
self.lock_handoff = lock_handoff
self.is_code_repo = is_code_repo
self.coalesce_key = coalesce_key
self.coalesce_version = coalesce_version
Expand Down Expand Up @@ -129,7 +131,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg":
skip_vectorization=data.get("skip_vectorization", False),
telemetry_id=data.get("telemetry_id", ""),
target_uri=data.get("target_uri", ""),
lifecycle_lock_handle_id=data.get("lifecycle_lock_handle_id", ""),
lock_handoff=LockHandoffRef.from_value(data.get("lock_handoff")),
is_code_repo=data.get("is_code_repo", False),
coalesce_key=data.get("coalesce_key", ""),
coalesce_version=data.get("coalesce_version", 0),
Expand Down
Loading
Loading