Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,13 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
root_files, root_dirs = await list_children(root_dir)
target_files, target_dirs = await list_children(target_dir)

try:
await viking_fs._mv_vector_store_l0_l1(root_dir, target_dir, ctx=ctx)
except Exception as e:
logger.error(
f"[SyncDiff] Failed to move L0/L1 index: {root_dir} -> {target_dir}, error={e}"
)

file_names = set(root_files.keys()) | set(target_files.keys())
for name in sorted(file_names):
root_file = root_files.get(name)
Expand Down
61 changes: 61 additions & 0 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ async def _update_vector_store_uris(
old_base: str,
new_base: str,
ctx: Optional[RequestContext] = None,
levels: Optional[List[int]] = None,
) -> None:
"""Update URIs in vector store (when moving files).
Expand All @@ -1232,11 +1233,71 @@ async def _update_vector_store_uris(
uri=uri,
new_uri=new_uri,
new_parent_uri=new_parent_uri,
levels=levels,
)
logger.debug(f"[VikingFS] Updated URI: {uri} -> {new_uri}")
except Exception as e:
logger.warning(f"[VikingFS] Failed to update {uri} in vector store: {e}")

async def _mv_vector_store_l0_l1(
self,
old_uri: str,
new_uri: str,
ctx: Optional[RequestContext] = None,
) -> None:
from openviking.storage.errors import LockAcquisitionError, ResourceBusyError
from openviking.storage.transaction import LockContext, get_lock_manager

self._ensure_access(old_uri, ctx)
self._ensure_access(new_uri, ctx)

real_ctx = self._ctx_or_default(ctx)
old_dir = VikingURI.normalize(old_uri).rstrip("/")
new_dir = VikingURI.normalize(new_uri).rstrip("/")
if old_dir == new_dir:
return

for uri in (old_dir, new_dir):
if uri.endswith(("/.abstract.md", "/.overview.md")):
raise ValueError(f"mv_vector_store expects directory URIs, got: {uri}")

try:
old_stat = await self.stat(old_dir, ctx=real_ctx)
except Exception as e:
raise FileNotFoundError(f"mv_vector_store old_uri not found: {old_dir}") from e
try:
new_stat = await self.stat(new_dir, ctx=real_ctx)
except Exception as e:
raise FileNotFoundError(f"mv_vector_store new_uri not found: {new_dir}") from e

if not (isinstance(old_stat, dict) and old_stat.get("isDir", False)):
raise ValueError(f"mv_vector_store expects old_uri to be a directory: {old_dir}")
if not (isinstance(new_stat, dict) and new_stat.get("isDir", False)):
raise ValueError(f"mv_vector_store expects new_uri to be a directory: {new_dir}")

old_path = self._uri_to_path(old_dir, ctx=real_ctx)
new_path = self._uri_to_path(new_dir, ctx=real_ctx)
dst_parent = new_path.rsplit("/", 1)[0] if "/" in new_path else new_path

try:
async with LockContext(
get_lock_manager(),
[old_path],
lock_mode="mv",
mv_dst_parent_path=dst_parent,
src_is_dir=True,
):
await self._update_vector_store_uris(
uris=[old_dir],
old_base=old_dir,
new_base=new_dir,
ctx=real_ctx,
levels=[0, 1],
)

except LockAcquisitionError:
raise ResourceBusyError(f"Resource is being processed: {old_dir}")

def _get_vector_store(self) -> Optional["VikingVectorIndexBackend"]:
"""Get vector store instance."""
return self.vector_store
Expand Down
18 changes: 13 additions & 5 deletions openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,14 +829,22 @@ async def update_uri_mapping(
uri: str,
new_uri: str,
new_parent_uri: str,
levels: Optional[List[int]] = None,
) -> bool:
import hashlib

records = await self.filter(
filter=And([Eq("uri", uri), Eq("account_id", ctx.account_id)]),
limit=100,
ctx=ctx,
)
conds: List[FilterExpr] = [Eq("uri", uri), Eq("account_id", ctx.account_id)]
if levels:
conds.append(In("level", levels))
if ctx.role == Role.USER and uri.startswith(("viking://user/", "viking://agent/")):
owner_space = (
ctx.user.user_space_name()
if uri.startswith("viking://user/")
else ctx.user.agent_space_name()
)
conds.append(Eq("owner_space", owner_space))

records = await self.filter(filter=And(conds), limit=100, ctx=ctx)
if not records:
return False

Expand Down
217 changes: 217 additions & 0 deletions tests/storage/test_semantic_processor_mv_vector_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from __future__ import annotations

import hashlib
from typing import Any, Dict, List, Optional

import pytest

from openviking.server.identity import RequestContext, Role
from openviking.storage.expr import And, Eq, In
from openviking_cli.session.user_id import UserIdentifier


class _FakeVectorStore:
def __init__(self, records: List[Dict[str, Any]]):
self.records = list(records)
self.deleted_ids: List[str] = []

async def update_uri_mapping(
self,
*,
ctx: RequestContext,
uri: str,
new_uri: str,
new_parent_uri: str,
levels: Optional[List[int]] = None,
) -> bool:
def seed_uri_for_id(target_uri: str, level: int) -> str:
if level == 0:
return (
target_uri
if target_uri.endswith("/.abstract.md")
else f"{target_uri}/.abstract.md"
)
if level == 1:
return (
target_uri
if target_uri.endswith("/.overview.md")
else f"{target_uri}/.overview.md"
)
return target_uri

touched = False
ids_to_delete: List[str] = []
for record in list(self.records):
if record.get("account_id") != ctx.account_id:
continue
if record.get("uri") != uri:
continue
try:
level = int(record.get("level", 2))
except (TypeError, ValueError):
level = 2
if levels is not None and level not in set(levels):
continue

seed_uri = seed_uri_for_id(new_uri, level)
new_id = hashlib.md5(f"{ctx.account_id}:{seed_uri}".encode("utf-8")).hexdigest()
new_record = dict(record)
new_record["id"] = new_id
new_record["uri"] = new_uri
new_record["parent_uri"] = new_parent_uri
self.records.append(new_record)
touched = True

old_id = record.get("id")
if old_id and old_id != new_id:
ids_to_delete.append(old_id)

if ids_to_delete:
await self.delete(list(set(ids_to_delete)), ctx=ctx)

return touched

async def filter(self, *, filter=None, limit: int = 100, ctx: RequestContext):
conds = []
if filter is not None:
if isinstance(filter, And):
conds = list(filter.conds)
else:
conds = [filter]

uri: Optional[str] = None
account_id: Optional[str] = None
owner_space: Optional[str] = None
levels: Optional[List[int]] = None

for cond in conds:
if isinstance(cond, Eq) and cond.field == "uri":
uri = cond.value
elif isinstance(cond, Eq) and cond.field == "account_id":
account_id = cond.value
elif isinstance(cond, Eq) and cond.field == "owner_space":
owner_space = cond.value
elif isinstance(cond, In) and cond.field == "level":
levels = [int(v) for v in cond.values]

matched = [
r
for r in self.records
if (uri is None or r.get("uri") == uri)
and (account_id is None or r.get("account_id") == account_id)
and (owner_space is None or r.get("owner_space") == owner_space)
and (levels is None or int(r.get("level", 2)) in levels)
]
return matched[:limit]

async def delete(self, ids: List[str], *, ctx: RequestContext) -> int:
id_set = set(ids)
self.deleted_ids.extend(ids)
self.records = [r for r in self.records if r.get("id") not in id_set]
return len(ids)


class _NoopLockContext:
def __init__(self, *_args, **_kwargs):
return None

async def __aenter__(self):
return None

async def __aexit__(self, exc_type, exc, tb):
return False


@pytest.mark.asyncio
async def test_mv_vector_store_moves_records(monkeypatch):
from openviking.storage.viking_fs import VikingFS

ctx = RequestContext(user=UserIdentifier("acc", "user", "agent"), role=Role.ROOT)
old_uri = "viking://resources/a"
new_uri = "viking://resources/b"

store = _FakeVectorStore(
[
{"id": "l0", "uri": old_uri, "level": 0, "account_id": ctx.account_id, "owner_space": ""},
{"id": "l1", "uri": old_uri, "level": 1, "account_id": ctx.account_id, "owner_space": ""},
{"id": "l2", "uri": old_uri, "level": 2, "account_id": ctx.account_id, "owner_space": ""},
{
"id": "child-l0",
"uri": f"{old_uri}/x",
"level": 0,
"account_id": ctx.account_id,
"owner_space": "",
},
]
)

class _FakeAGFS:
def rm(self, _path, recursive: bool = False):
return None

class _FakeVikingFS(VikingFS):
def __init__(self):
super().__init__(agfs=_FakeAGFS(), vector_store=store)

def _uri_to_path(self, uri, ctx=None):
return f"/mock/{uri.replace('viking://', '')}"

async def stat(self, uri, ctx=None):
return {"isDir": True}

def _ensure_access(self, uri, ctx):
return None

monkeypatch.setattr(
"openviking.storage.viking_fs.get_viking_fs",
lambda: _FakeVikingFS(),
)
monkeypatch.setattr("openviking.storage.transaction.get_lock_manager", lambda: None)
monkeypatch.setattr("openviking.storage.transaction.LockContext", _NoopLockContext)

fs = _FakeVikingFS()
await fs._mv_vector_store_l0_l1(old_uri, new_uri, ctx=ctx)

assert {r["id"] for r in store.records if r.get("uri") == old_uri} == {"l2"}
assert {r["id"] for r in store.records if r.get("uri") == f"{old_uri}/x"} == {"child-l0"}
assert {int(r["level"]) for r in store.records if r.get("uri") == new_uri} == {0, 1}
assert set(store.deleted_ids) == {"l0", "l1"}


@pytest.mark.asyncio
async def test_mv_vector_store_requires_directories(monkeypatch):
from openviking.storage.viking_fs import VikingFS

ctx = RequestContext(user=UserIdentifier("acc", "user", "agent"), role=Role.ROOT)
old_uri = "viking://resources/a"
new_uri = "viking://resources/b"

store = _FakeVectorStore([])

class _FakeAGFS:
def rm(self, _path, recursive: bool = False):
return None

class _FakeVikingFS(VikingFS):
def __init__(self):
super().__init__(agfs=_FakeAGFS(), vector_store=store)

def _uri_to_path(self, uri, ctx=None):
return f"/mock/{uri.replace('viking://', '')}"

async def stat(self, uri, ctx=None):
return {"isDir": uri == old_uri}

def _ensure_access(self, uri, ctx):
return None

monkeypatch.setattr(
"openviking.storage.viking_fs.get_viking_fs",
lambda: _FakeVikingFS(),
)
monkeypatch.setattr("openviking.storage.transaction.get_lock_manager", lambda: None)
monkeypatch.setattr("openviking.storage.transaction.LockContext", _NoopLockContext)

fs = _FakeVikingFS()
with pytest.raises(ValueError):
await fs._mv_vector_store_l0_l1(old_uri, new_uri, ctx=ctx)
Loading