diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 768a8a92..5d977d8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,8 +17,10 @@ jobs: name: Detect Changes runs-on: ubuntu-latest outputs: - backend-matrix: ${{ steps.build-matrix.outputs.matrix }} + backend-matrix: ${{ steps.build-matrix.outputs.backend-matrix }} + integration-matrix: ${{ steps.build-matrix.outputs.integration-matrix }} has-backend: ${{ steps.build-matrix.outputs.has-backend }} + has-integration: ${{ steps.build-matrix.outputs.has-integration }} frontend: ${{ steps.filter.outputs.frontend }} steps: - uses: actions/checkout@v4 @@ -87,9 +89,6 @@ jobs: - 'libs/**' - 'pyproject.toml' - 'uv.lock' - - 'libs/**' - - 'pyproject.toml' - - 'uv.lock' worker-search: - 'services/worker-search/**' - 'libs/**' @@ -100,30 +99,72 @@ jobs: - 'libs/**' - 'pyproject.toml' - 'uv.lock' + plugin-search-providers: + - 'plugins/backend-engine-search-providers/**' + - 'libs/kt-providers/**' + - 'libs/kt-core-engine-api/**' + - 'libs/kt-db/**' + - 'libs/kt-config/**' + - 'pyproject.toml' + - 'uv.lock' frontend: - 'frontend/**' - - name: Build test matrix from changed packages + - name: Build test matrices from changed packages id: build-matrix run: | - MATRIX="[]" - add() { MATRIX=$(echo "$MATRIX" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]'); } + BACKEND="[]" + INTEGRATION="[]" - [ "${{ steps.filter.outputs.kt-config }}" = "true" ] && add "kt-config" "libs/kt-config" - [ "${{ steps.filter.outputs.kt-db }}" = "true" ] && add "kt-db" "libs/kt-db" - [ "${{ steps.filter.outputs.kt-models }}" = "true" ] && add "kt-models" "libs/kt-models" - [ "${{ steps.filter.outputs.kt-providers }}" = "true" ] && add "kt-providers" "libs/kt-providers" - [ "${{ steps.filter.outputs.kt-graph }}" = "true" ] && add "kt-graph" "libs/kt-graph" - [ "${{ steps.filter.outputs.kt-facts }}" = "true" ] && add "kt-facts" "libs/kt-facts" - [ "${{ steps.filter.outputs.api }}" = "true" ] && add "api" "services/api" - [ "${{ steps.filter.outputs.mcp }}" = "true" ] && add "mcp" "services/mcp" - [ "${{ steps.filter.outputs.worker-bottomup }}" = "true" ] && add "worker-bottomup" "services/worker-bottomup" - [ "${{ steps.filter.outputs.worker-nodes }}" = "true" ] && add "worker-nodes" "services/worker-nodes" - [ "${{ steps.filter.outputs.worker-search }}" = "true" ] && add "worker-search" "services/worker-search" - [ "${{ steps.filter.outputs.worker-sync }}" = "true" ] && add "worker-sync" "services/worker-sync" + add_backend() { + BACKEND=$(echo "$BACKEND" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') + } + add_integration() { + # $3 = pytest -n workers (optional, default "auto"). Serialise + # suites that hit external rate-limited APIs (e.g. kt-models + # → OpenRouter) by passing workers="1". + local workers="${3:-auto}" + INTEGRATION=$(echo "$INTEGRATION" \ + | jq --arg n "$1" --arg p "$2" --arg w "$workers" \ + '. + [{"name": $n, "path": $p, "workers": $w}]') + } + # Detect real integration tests (not just a conftest/__init__) so + # empty integration/ dirs don't produce "no tests ran" failures. + has_integration_tests() { + local path="$1" + [ -d "$path/tests/integration" ] || return 1 + local count + count=$(find "$path/tests/integration" -maxdepth 1 -name 'test_*.py' -type f | wc -l) + [ "$count" -gt 0 ] + } + maybe_add() { + local name="$1" path="$2" changed="$3" workers="${4:-auto}" + [ "$changed" = "true" ] || return 0 + add_backend "$name" "$path" + if has_integration_tests "$path"; then + add_integration "$name" "$path" "$workers" + fi + } - echo "matrix={\"suite\":$(echo "$MATRIX" | jq -c '.')}" >> "$GITHUB_OUTPUT" - echo "has-backend=$([ "$(echo "$MATRIX" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" - echo "Changed suites: $(echo "$MATRIX" | jq -r '[.[].name] | join(", ")')" + maybe_add kt-config libs/kt-config "${{ steps.filter.outputs.kt-config }}" + maybe_add kt-db libs/kt-db "${{ steps.filter.outputs.kt-db }}" + maybe_add kt-models libs/kt-models "${{ steps.filter.outputs.kt-models }}" 1 + maybe_add kt-providers libs/kt-providers "${{ steps.filter.outputs.kt-providers }}" + maybe_add kt-graph libs/kt-graph "${{ steps.filter.outputs.kt-graph }}" + maybe_add kt-facts libs/kt-facts "${{ steps.filter.outputs.kt-facts }}" + maybe_add api services/api "${{ steps.filter.outputs.api }}" + maybe_add mcp services/mcp "${{ steps.filter.outputs.mcp }}" + maybe_add worker-bottomup services/worker-bottomup "${{ steps.filter.outputs.worker-bottomup }}" + maybe_add worker-nodes services/worker-nodes "${{ steps.filter.outputs.worker-nodes }}" + maybe_add worker-search services/worker-search "${{ steps.filter.outputs.worker-search }}" + maybe_add worker-sync services/worker-sync "${{ steps.filter.outputs.worker-sync }}" + maybe_add plugin-search-providers plugins/backend-engine-search-providers "${{ steps.filter.outputs.plugin-search-providers }}" + + echo "backend-matrix={\"suite\":$(echo "$BACKEND" | jq -c '.')}" >> "$GITHUB_OUTPUT" + echo "integration-matrix={\"suite\":$(echo "$INTEGRATION" | jq -c '.')}" >> "$GITHUB_OUTPUT" + echo "has-backend=$([ "$(echo "$BACKEND" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" + echo "has-integration=$([ "$(echo "$INTEGRATION" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" + echo "Changed unit suites: $(echo "$BACKEND" | jq -r '[.[].name] | join(", ")')" + echo "Changed integration suites: $(echo "$INTEGRATION" | jq -r '[.[].name] | join(", ")')" backend-lint: name: Backend Lint @@ -140,14 +181,113 @@ jobs: - run: uv run --frozen ruff check . - run: uv run --frozen ruff format --check . + migrations-check: + name: Migrations + runs-on: ubuntu-latest + services: + postgres: + image: pgvector/pgvector:pg16 + env: + POSTGRES_DB: knowledge_tree + POSTGRES_USER: kt + POSTGRES_PASSWORD: localdev + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U kt -d knowledge_tree" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + postgres-write: + image: postgres:16-alpine + env: + POSTGRES_DB: knowledge_tree_write + POSTGRES_USER: kt + POSTGRES_PASSWORD: localdev + ports: + - 5435:5432 + options: >- + --health-cmd "pg_isready -U kt -d knowledge_tree_write" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + env: + DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5432/knowledge_tree + WRITE_DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5435/knowledge_tree_write + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + with: + version: "latest" + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: uv sync --all-packages --frozen + - name: Verify single graph-db head + run: | + cd libs/kt-db + heads=$(uv run --frozen alembic heads 2>/dev/null | grep -c '(head)' || true) + if [ "$heads" != "1" ]; then + echo "::error::Expected exactly 1 graph-db alembic head, found $heads" + uv run --frozen alembic heads + exit 1 + fi + - name: Verify single write-db head + run: | + cd libs/kt-db + heads=$(uv run --frozen alembic -c alembic_write.ini heads 2>/dev/null | grep -c '(head)' || true) + if [ "$heads" != "1" ]; then + echo "::error::Expected exactly 1 write-db alembic head, found $heads" + uv run --frozen alembic -c alembic_write.ini heads + exit 1 + fi + - name: Apply graph-db migrations + run: cd libs/kt-db && uv run --frozen alembic upgrade head + - name: Apply write-db migrations + run: cd libs/kt-db && uv run --frozen alembic -c alembic_write.ini upgrade head + - name: Downgrade and re-upgrade graph-db (sanity) + run: | + cd libs/kt-db + uv run --frozen alembic downgrade -1 + uv run --frozen alembic upgrade head + - name: Downgrade and re-upgrade write-db (sanity) + run: | + cd libs/kt-db + uv run --frozen alembic -c alembic_write.ini downgrade -1 + uv run --frozen alembic -c alembic_write.ini upgrade head + backend-test: - name: Test ${{ matrix.suite.name }} + name: Unit ${{ matrix.suite.name }} needs: changes if: needs.changes.outputs.has-backend == 'true' runs-on: ubuntu-latest strategy: fail-fast: false matrix: ${{ fromJSON(needs.changes.outputs.backend-matrix) }} + env: + SKIP_AUTH: "true" + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + with: + version: "latest" + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: uv sync --all-packages --frozen + - name: Run unit tests + run: >- + uv run --frozen pytest ${{ matrix.suite.path }}/tests/ -x -n auto + --ignore=${{ matrix.suite.path }}/tests/integration + + backend-integration-test: + name: Integration ${{ matrix.suite.name }} + needs: changes + if: needs.changes.outputs.has-integration == 'true' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: ${{ fromJSON(needs.changes.outputs.integration-matrix) }} services: postgres: image: pgvector/pgvector:pg16 @@ -184,12 +324,26 @@ jobs: --health-interval 5s --health-timeout 5s --health-retries 5 + qdrant: + image: qdrant/qdrant:v1.17.1 + ports: + - 6333:6333 + - 6334:6334 env: DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5432/knowledge_tree WRITE_DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5435/knowledge_tree_write REDIS_URL: redis://localhost:6379 + QDRANT_URL: http://localhost:6333 SKIP_AUTH: "true" + # Fake JWT accepted by Hatchet-lite's ClientConfig validator. + # Payload: {server_url, grpc_broadcast_address, sub, iat}. HATCHET_CLIENT_TOKEN: "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ.ZmFrZXNpZw" + USE_HATCHET: "false" + # External-API secrets — BRAVE_KEY intentionally omitted (Serper is + # the primary provider; Brave tests skip cleanly via pytest.skip). + # OpenAI embeddings go through OpenRouter, so no OPENAI_API_KEY needed. + SERPER_KEY: ${{ secrets.SERPER_KEY }} + OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }} steps: - uses: actions/checkout@v4 - uses: astral-sh/setup-uv@v4 @@ -199,10 +353,17 @@ jobs: with: python-version: "3.12" - run: uv sync --all-packages --frozen - - name: Run tests - run: >- - uv run --frozen pytest ${{ matrix.suite.path }}/tests/ -x -n auto - --ignore=${{ matrix.suite.path }}/tests/integration + # Note: migrations applied to public schema for connectivity sanity. + # Integration fixtures still use Base.metadata.create_all against + # per-worker schemas for isolation under xdist. Model/migration + # drift is therefore caught by the `migrations-check` job above, + # not here. + - name: Apply graph-db migrations + run: cd libs/kt-db && uv run --frozen alembic upgrade head + - name: Apply write-db migrations + run: cd libs/kt-db && uv run --frozen alembic -c alembic_write.ini upgrade head + - name: Run integration tests + run: uv run --frozen pytest ${{ matrix.suite.path }}/tests/integration -x -n ${{ matrix.suite.workers }} frontend-test: name: Frontend Tests diff --git a/justfile b/justfile index 481b5b8e..93e4973b 100644 --- a/justfile +++ b/justfile @@ -103,34 +103,46 @@ migrate-write: cd libs/kt-db && uv run alembic -c alembic_write.ini upgrade head # ── Testing ─────────────────────────────────────────────────────── +# Unit tests are infra-free and key-free. Integration tests need infra +# (`just up`) and run only against `tests/integration/` directories. -# Run all lib tests -test-libs: - cd libs/kt-config && uv run pytest -x - cd libs/kt-db && uv run pytest -x - cd libs/kt-flags && uv run pytest -x - cd libs/kt-models && uv run pytest -x - cd libs/kt-providers && uv run pytest -x - cd libs/kt-graph && uv run pytest -x - cd libs/kt-facts && uv run pytest -x - -# Run API tests -test-api: - cd services/api && uv run pytest -x - -# Run MCP tests -test-mcp: - cd services/mcp && uv run pytest -x - -# Run worker tests -test-workers: - cd services/worker-bottomup && uv run pytest -x - cd services/worker-search && uv run pytest -x - cd services/worker-nodes && uv run pytest -x - cd services/worker-ingest && uv run pytest -x - -# Run all tests -test-all: test-libs test-api test-workers +# Run unit tests for every backend package (no infra required) +test-unit: + #!/usr/bin/env bash + set -euo pipefail + for pkg in \ + libs/kt-config libs/kt-db libs/kt-models libs/kt-providers \ + libs/kt-graph libs/kt-facts libs/kt-hatchet libs/kt-qdrant \ + libs/kt-agents-core libs/kt-auth libs/kt-core-engine-api \ + libs/kt-flags libs/kt-rbac \ + services/api services/mcp \ + services/worker-bottomup services/worker-nodes services/worker-sync \ + services/worker-synthesis services/worker-ingest \ + plugins/backend-engine-search-providers \ + plugins/backend-engine-concept-extractor; do + if [ -d "$pkg/tests" ]; then + echo "── $pkg ──" + uv run --frozen pytest "$pkg/tests/" --ignore="$pkg/tests/integration" -x -q + fi + done + +# Run integration tests for every package that has an integration/ dir +# Requires `just up` (postgres, postgres-write, redis, qdrant, hatchet) +test-integration: + #!/usr/bin/env bash + set -euo pipefail + for pkg in \ + libs/kt-db libs/kt-facts libs/kt-graph libs/kt-models libs/kt-providers \ + services/api services/worker-bottomup services/worker-nodes services/worker-sync \ + plugins/backend-engine-search-providers; do + if [ -d "$pkg/tests/integration" ]; then + echo "── $pkg (integration) ──" + uv run --frozen pytest "$pkg/tests/integration" -x -q + fi + done + +# Run all backend tests (unit + integration). Needs infra up. +test-all: test-unit test-integration # Frontend tests test-frontend: diff --git a/lefthook.yml b/lefthook.yml index fa7436af..9319ed31 100644 --- a/lefthook.yml +++ b/lefthook.yml @@ -28,13 +28,6 @@ pre-commit: pre-push: parallel: true jobs: - - name: python-tests + - name: python-unit-tests glob: "**/*.py" - run: | - just test-all || { - echo "" - echo "Tests failed. Make sure infrastructure is running:" - echo " just up # starts postgres, postgres-write, redis, qdrant, hatchet" - echo "" - exit 1 - } + run: just test-unit diff --git a/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py b/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py index 2dbebe52..deac7a01 100644 --- a/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py +++ b/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py @@ -1,7 +1,7 @@ """drop convergence_reports and divergent_claims Revision ID: 212efc51d897 -Revises: 489643109ccd +Revises: e3eb15f51d7c Create Date: 2026-04-17 17:29:09.860226 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "212efc51d897" -down_revision: Union[str, Sequence[str], None] = "489643109ccd" +down_revision: Union[str, Sequence[str], None] = "e3eb15f51d7c" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/libs/kt-db/src/kt_db/testing.py b/libs/kt-db/src/kt_db/testing.py new file mode 100644 index 00000000..593654a0 --- /dev/null +++ b/libs/kt-db/src/kt_db/testing.py @@ -0,0 +1,183 @@ +"""Pytest plugin providing per-module schema isolation for integration tests. + +Activate by adding to an integration conftest: + + pytest_plugins = ["kt_db.testing"] + +Fixtures provided: + + settings — cached Settings instance (session scope) + schema_name — unique graph-db schema per test module + engine — AsyncEngine bound to that schema (module scope) + db_session — per-test AsyncSession wrapped in a rolled-back txn + write_schema_name — unique write-db schema per test module + write_engine — AsyncEngine bound to that schema (module scope) + write_db_session — per-test write-db AsyncSession in a rolled-back txn + +Rationale: one schema per test module gives cross-file isolation (the +source of the previous flakiness — tests in ``test_trigram_dedup.py`` +committing state that leaked into ``test_write_seeds.py``) without +paying the CREATE SCHEMA / create_all cost per-test. Within a module, +``db_session`` wraps each test in a txn + rollback. + +Schemas are dropped on module teardown so the database doesn't +accumulate ``test_*`` leftovers across runs. Extensions are installed +once per pytest worker (session scope, xdist-safe via advisory lock). + +Packages that diverge — kt-facts (graph+write in one schema) and +worker-sync (renamed fixtures) — define their own fixtures in +conftest using the helpers exported here. +""" + +from __future__ import annotations + +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event, text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def unique_schema(prefix: str = "test") -> str: + """Return a short random schema name, e.g. ``test_a3f91b0c``.""" + return f"{prefix}_{uuid.uuid4().hex[:8]}" + + +async def install_graph_extensions(base_url: str) -> None: + """Install pgvector + pg_trgm on graph-db (idempotent, xdist-safe).""" + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await eng.dispose() + + +async def install_write_extensions(base_url: str) -> None: + """Install pg_trgm on write-db (idempotent, xdist-safe).""" + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await eng.dispose() + + +async def create_schema(base_url: str, schema: str) -> None: + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema}"')) + await eng.dispose() + + +async def drop_schema(base_url: str, schema: str) -> None: + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE')) + await eng.dispose() + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +# ── Graph-db fixtures (per-module schema) ──────────────────────────── + + +@pytest.fixture(scope="module") +def schema_name() -> str: + return unique_schema() + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + await install_graph_extensions(base_url) + await create_schema(base_url, schema_name) + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, schema_name) + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +# ── Write-db fixtures (per-module schema) ──────────────────────────── + + +@pytest.fixture(scope="module") +def write_schema_name() -> str: + return unique_schema() + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + await install_write_extensions(base_url) + await create_schema(base_url, write_schema_name) + + eng = create_async_engine(base_url, echo=False) + + # Write-db runs behind pgbouncer in transaction-pooling mode, which + # rejects startup-time search_path. Set it per connection instead. + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f'SET search_path TO "{write_schema_name}", public') + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, write_schema_name) + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: + factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/libs/kt-db/tests/conftest.py b/libs/kt-db/tests/conftest.py index f68ec9af..160a1a0b 100644 --- a/libs/kt-db/tests/conftest.py +++ b/libs/kt-db/tests/conftest.py @@ -1,116 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - # Advisory lock prevents xdist workers from racing on CREATE EXTENSION - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Set search_path on every new connection (write-db doesn't support startup params) - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-db/tests/integration/conftest.py b/libs/kt-db/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/libs/kt-db/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-db/tests/test_write_dimensions.py b/libs/kt-db/tests/integration/test_write_dimensions.py similarity index 100% rename from libs/kt-db/tests/test_write_dimensions.py rename to libs/kt-db/tests/integration/test_write_dimensions.py diff --git a/libs/kt-db/tests/test_write_seeds.py b/libs/kt-db/tests/integration/test_write_seeds.py similarity index 100% rename from libs/kt-db/tests/test_write_seeds.py rename to libs/kt-db/tests/integration/test_write_seeds.py diff --git a/libs/kt-facts/tests/conftest.py b/libs/kt-facts/tests/conftest.py index ba4bd353..ca4bced3 100644 --- a/libs/kt-facts/tests/conftest.py +++ b/libs/kt-facts/tests/conftest.py @@ -1,7 +1,5 @@ import os import sys -import uuid -from collections.abc import AsyncGenerator from pathlib import Path # Ensure the tests directory is on sys.path so that seed_fixtures can be imported @@ -9,79 +7,3 @@ os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - # Also create write-db tables so write_session tests work - for table in WriteBase.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_session(engine) -> AsyncGenerator[AsyncSession, None]: - """Separate session for write-db operations (pipeline calls commit()).""" - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - yield session diff --git a/libs/kt-facts/tests/integration/conftest.py b/libs/kt-facts/tests/integration/conftest.py new file mode 100644 index 00000000..2d9cd804 --- /dev/null +++ b/libs/kt-facts/tests/integration/conftest.py @@ -0,0 +1,87 @@ +"""kt-facts integration conftest. + +kt-facts writes both graph-db and write-db tables into a *single* schema +so the fact pipeline can exercise both sides within one session. Schema +is unique per test module — see kt_db.testing for the helpers. +""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.testing import ( + create_schema, + drop_schema, + install_graph_extensions, + unique_schema, +) +from kt_db.write_models import WriteBase + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="module") +def schema_name() -> str: + return unique_schema() + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + await install_graph_extensions(base_url) + await create_schema(base_url, schema_name) + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None + for table in WriteBase.metadata.sorted_tables: + table.schema = schema_name + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, schema_name) + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_session(engine) -> AsyncGenerator[AsyncSession, None]: + """Session that does NOT roll back (pipelines call commit() internally). + + Safe because the surrounding module-scoped schema is dropped on teardown. + """ + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + yield session diff --git a/libs/kt-facts/tests/test_tiered_dedup_experiment.py b/libs/kt-facts/tests/integration/test_tiered_dedup_experiment.py similarity index 100% rename from libs/kt-facts/tests/test_tiered_dedup_experiment.py rename to libs/kt-facts/tests/integration/test_tiered_dedup_experiment.py diff --git a/libs/kt-graph/tests/conftest.py b/libs/kt-graph/tests/conftest.py index e9531a50..160a1a0b 100644 --- a/libs/kt-graph/tests/conftest.py +++ b/libs/kt-graph/tests/conftest.py @@ -1,118 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -# ── Write-db fixtures (used by PublicGraphBridge integration tests) ── - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Write-db doesn't accept startup-time search_path (pgbouncer in - # transaction mode rejects it). Set it on every new connection - # instead so each session lands in the test schema. - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-graph/tests/integration/conftest.py b/libs/kt-graph/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/libs/kt-graph/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-hatchet/src/kt_hatchet/testing.py b/libs/kt-hatchet/src/kt_hatchet/testing.py new file mode 100644 index 00000000..36cfb9c5 --- /dev/null +++ b/libs/kt-hatchet/src/kt_hatchet/testing.py @@ -0,0 +1,35 @@ +"""Test helpers for packages that import the Hatchet client at module load. + +Some worker modules (worker-bottomup, worker-nodes, worker-ingest, +worker-sync) call ``get_hatchet()`` at import time. Unit tests that +only touch pure helpers still trigger that import, and the Hatchet +client refuses to initialise without a valid JWT. This module exports +a fake-but-parseable JWT and a one-call setup helper so test conftests +don't duplicate the literal. +""" + +from __future__ import annotations + +import os + +# JWT with header {"alg": "HS256", "typ": "JWT"} and payload +# {"server_url": "http://localhost:8080", "grpc_broadcast_address": +# "localhost:7070", "sub": "ci", "iat": 1516239022}. Signature is not +# verified by the Hatchet client — only the payload fields are read. +FAKE_HATCHET_JWT = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw" +) + + +def install_fake_hatchet_env() -> None: + """Populate Hatchet env vars with opaque placeholders for test collection. + + Uses ``setdefault`` so a real token from ``.env`` or CI takes precedence. + Safe to call multiple times. + """ + os.environ.setdefault("USE_HATCHET", "false") + os.environ.setdefault("HATCHET_CLIENT_TOKEN", FAKE_HATCHET_JWT) + os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") + os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") diff --git a/libs/kt-models/tests/conftest.py b/libs/kt-models/tests/conftest.py index faa86030..14bb13b1 100644 --- a/libs/kt-models/tests/conftest.py +++ b/libs/kt-models/tests/conftest.py @@ -1,17 +1,10 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -27,56 +20,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-models/tests/integration/conftest.py b/libs/kt-models/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/libs/kt-models/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-models/tests/integration/test_model_gateway.py b/libs/kt-models/tests/integration/test_model_gateway.py index 97f49c5f..77a6e6d4 100644 --- a/libs/kt-models/tests/integration/test_model_gateway.py +++ b/libs/kt-models/tests/integration/test_model_gateway.py @@ -16,7 +16,7 @@ async def test_generate(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): result = await gateway.generate( - "openrouter/google/gemini-2.0-flash-001", + "openrouter/google/gemma-4-26b-a4b-it:nitro", [{"role": "user", "content": "Say hello in one word."}], max_tokens=50, ) @@ -30,7 +30,7 @@ async def test_generate_parallel(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): results = await gateway.generate_parallel( - model_ids=["openrouter/google/gemini-2.0-flash-001"], + model_ids=["openrouter/google/gemma-4-26b-a4b-it:nitro"], messages=[{"role": "user", "content": "Say hello in one word."}], max_tokens=50, ) @@ -47,7 +47,7 @@ async def test_generate_with_system_prompt(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): result = await gateway.generate( - "openrouter/google/gemini-2.0-flash-001", + "openrouter/google/gemma-4-26b-a4b-it:nitro", [{"role": "user", "content": "What should I say?"}], system_prompt="Always respond with exactly the word 'banana'.", max_tokens=50, diff --git a/libs/kt-providers/tests/conftest.py b/libs/kt-providers/tests/conftest.py index a44bfaa3..160a1a0b 100644 --- a/libs/kt-providers/tests/conftest.py +++ b/libs/kt-providers/tests/conftest.py @@ -1,67 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-providers/tests/integration/conftest.py b/libs/kt-providers/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/libs/kt-providers/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/plugins/backend-engine-search-providers/tests/integration/conftest.py b/plugins/backend-engine-search-providers/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/plugins/backend-engine-search-providers/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/services/api/tests/conftest.py b/services/api/tests/conftest.py index e8805c00..160a1a0b 100644 --- a/services/api/tests/conftest.py +++ b/services/api/tests/conftest.py @@ -1,82 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) -async def _ensure_qdrant_collections(): - """Ensure Qdrant collections exist before any tests run.""" - try: - from kt_qdrant.client import get_qdrant_client - from kt_qdrant.repositories.facts import QdrantFactRepository - from kt_qdrant.repositories.nodes import QdrantNodeRepository - - client = get_qdrant_client() - await QdrantFactRepository(client).ensure_collection() - await QdrantNodeRepository(client).ensure_collection() - except Exception: - pass # Qdrant may not be running in all test environments - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/api/tests/integration/conftest.py b/services/api/tests/integration/conftest.py new file mode 100644 index 00000000..68e92669 --- /dev/null +++ b/services/api/tests/integration/conftest.py @@ -0,0 +1,18 @@ +import pytest_asyncio + +pytest_plugins = ["kt_db.testing"] + + +@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) +async def _ensure_qdrant_collections(): + """Ensure Qdrant collections exist before any integration tests run.""" + try: + from kt_qdrant.client import get_qdrant_client + from kt_qdrant.repositories.facts import QdrantFactRepository + from kt_qdrant.repositories.nodes import QdrantNodeRepository + + client = get_qdrant_client() + await QdrantFactRepository(client).ensure_collection() + await QdrantNodeRepository(client).ensure_collection() + except Exception: + pass # Qdrant may not be running in all test environments diff --git a/services/api/tests/integration/test_api.py b/services/api/tests/integration/test_api.py index 84be0a1c..93233b41 100644 --- a/services/api/tests/integration/test_api.py +++ b/services/api/tests/integration/test_api.py @@ -21,13 +21,13 @@ ) -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_session_factory(engine): """Session factory for API integration tests, session-scoped.""" return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_app(api_session_factory): """Create a test FastAPI app with DB session overridden to use the test DB.""" application = create_app() @@ -40,7 +40,7 @@ async def override_get_db_session() -> AsyncGenerator[AsyncSession, None]: return application -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_client(api_app): """Create an async HTTP client for testing.""" transport = ASGITransport(app=api_app) diff --git a/services/api/tests/integration/test_permissions.py b/services/api/tests/integration/test_permissions.py index f3ded9bf..b19e15a9 100644 --- a/services/api/tests/integration/test_permissions.py +++ b/services/api/tests/integration/test_permissions.py @@ -29,7 +29,7 @@ def _make_stub_user(user_id: uuid.UUID, *, is_superuser: bool) -> User: return user -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def perm_session_factory(engine): return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) diff --git a/services/api/tests/integration/test_waitlist_invites.py b/services/api/tests/integration/test_waitlist_invites.py index 47b74f21..d31eb883 100644 --- a/services/api/tests/integration/test_waitlist_invites.py +++ b/services/api/tests/integration/test_waitlist_invites.py @@ -18,12 +18,12 @@ STUB_USER_ID = uuid.UUID("00000000-0000-0000-0000-000000000001") -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_session_factory(engine): return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) +@pytest_asyncio.fixture(scope="module", loop_scope="session", autouse=True) async def _ensure_stub_user(wi_session_factory): """Ensure the SKIP_AUTH stub user exists in the DB for FK constraints.""" async with wi_session_factory() as session: @@ -48,7 +48,7 @@ async def _ensure_stub_user(wi_session_factory): await session.commit() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_app(wi_session_factory, _ensure_stub_user): application = create_app() @@ -60,7 +60,7 @@ async def override_get_db_session() -> AsyncGenerator[AsyncSession, None]: return application -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_client(wi_app): transport = ASGITransport(app=wi_app) async with AsyncClient(transport=transport, base_url="http://test") as c: diff --git a/services/mcp/tests/conftest.py b/services/mcp/tests/conftest.py index eba7f1ca..03714efd 100644 --- a/services/mcp/tests/conftest.py +++ b/services/mcp/tests/conftest.py @@ -1,80 +1,3 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) -async def _ensure_qdrant_collections(): - try: - from kt_qdrant.client import get_qdrant_client - from kt_qdrant.repositories.facts import QdrantFactRepository - from kt_qdrant.repositories.nodes import QdrantNodeRepository - - client = get_qdrant_client() - await QdrantFactRepository(client).ensure_collection() - await QdrantNodeRepository(client).ensure_collection() - except Exception: - pass - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-bottomup/tests/conftest.py b/services/worker-bottomup/tests/conftest.py index 9e9a7acd..4e665815 100644 --- a/services/worker-bottomup/tests/conftest.py +++ b/services/worker-bottomup/tests/conftest.py @@ -1,17 +1,14 @@ import os -import uuid -from collections.abc import AsyncGenerator -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") +from kt_hatchet.testing import install_fake_hatchet_env + +# bottom_up.workflow imports the Hatchet client at module load. +install_fake_hatchet_env() + import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -29,55 +26,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-bottomup/tests/integration/conftest.py b/services/worker-bottomup/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/services/worker-bottomup/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/services/worker-ingest/tests/conftest.py b/services/worker-ingest/tests/conftest.py new file mode 100644 index 00000000..72b4ec87 --- /dev/null +++ b/services/worker-ingest/tests/conftest.py @@ -0,0 +1,8 @@ +import os + +os.environ.setdefault("SKIP_AUTH", "true") + +from kt_hatchet.testing import install_fake_hatchet_env + +# Ingest workflow modules import the Hatchet client at module load. +install_fake_hatchet_env() diff --git a/services/worker-nodes/tests/conftest.py b/services/worker-nodes/tests/conftest.py index 7e629631..cea82307 100644 --- a/services/worker-nodes/tests/conftest.py +++ b/services/worker-nodes/tests/conftest.py @@ -1,17 +1,14 @@ import os -import uuid -from collections.abc import AsyncGenerator -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") +from kt_hatchet.testing import install_fake_hatchet_env + +# auto_build imports the Hatchet client at module load. +install_fake_hatchet_env() + import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -27,56 +24,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-nodes/tests/integration/conftest.py b/services/worker-nodes/tests/integration/conftest.py new file mode 100644 index 00000000..a680a760 --- /dev/null +++ b/services/worker-nodes/tests/integration/conftest.py @@ -0,0 +1 @@ +pytest_plugins = ["kt_db.testing"] diff --git a/services/worker-sync/tests/conftest.py b/services/worker-sync/tests/conftest.py index bce65015..51b8edd7 100644 --- a/services/worker-sync/tests/conftest.py +++ b/services/worker-sync/tests/conftest.py @@ -1,141 +1,15 @@ -"""Shared fixtures for worker-sync integration tests. +"""Shared setup for worker-sync tests. -Mirrors the dual-DB schema-per-worker pattern from libs/kt-db/tests/conftest.py -so we can stand up a real SyncEngine against isolated graph-db + write-db -schemas. +The dedup workflow module imports the Hatchet client at import time, +so a fake token must be set before any kt_worker_sync import happens. """ from __future__ import annotations import os -import uuid -from collections.abc import AsyncGenerator -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -# The dedup workflow module imports the Hatchet client at import time, -# which refuses to initialise without a token. Unit-level tests only -# touch pure helpers, so any opaque placeholder is sufficient. -os.environ.setdefault( - "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkdW1teSJ9.dummy", -) -os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") -os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import ( - AsyncEngine, - AsyncSession, - async_sessionmaker, - create_async_engine, -) +from kt_hatchet.testing import install_fake_hatchet_env -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def graph_schema_name(): - return _worker_schema() - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {graph_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{graph_schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = graph_schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {graph_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest.fixture(scope="session") -def graph_session_factory(graph_engine) -> async_sessionmaker[AsyncSession]: - return async_sessionmaker(graph_engine, class_=AsyncSession, expire_on_commit=False) - - -@pytest.fixture(scope="session") -def write_session_factory(write_engine) -> async_sessionmaker[AsyncSession]: - return async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - - -@pytest.fixture -def sync_engine(write_session_factory, graph_session_factory): - """Real SyncEngine wired to per-worker isolated schemas.""" - from kt_worker_sync.sync_engine import SyncEngine - - return SyncEngine( - write_session_factory=write_session_factory, - graph_session_factory=graph_session_factory, - batch_size=100, - ) +install_fake_hatchet_env() diff --git a/services/worker-sync/tests/integration/conftest.py b/services/worker-sync/tests/integration/conftest.py new file mode 100644 index 00000000..0f4656c6 --- /dev/null +++ b/services/worker-sync/tests/integration/conftest.py @@ -0,0 +1,123 @@ +"""Shared fixtures for worker-sync integration tests. + +Per-module graph-db + write-db schemas so SyncEngine runs are isolated +across files and across concurrent CI runs. Each test still gets a +fresh session rolled back at teardown. +""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.testing import ( + create_schema, + drop_schema, + install_graph_extensions, + install_write_extensions, + unique_schema, +) +from kt_db.write_models import WriteBase + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="module") +def graph_schema_name() -> str: + return unique_schema() + + +@pytest.fixture(scope="module") +def write_schema_name() -> str: + return unique_schema() + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + await install_graph_extensions(base_url) + await create_schema(base_url, graph_schema_name) + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{graph_schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = graph_schema_name + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, graph_schema_name) + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + await install_write_extensions(base_url) + await create_schema(base_url, write_schema_name) + + eng = create_async_engine(base_url, echo=False) + + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f'SET search_path TO "{write_schema_name}", public') + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, write_schema_name) + + +@pytest.fixture(scope="module") +def graph_session_factory(graph_engine) -> async_sessionmaker[AsyncSession]: + return async_sessionmaker(graph_engine, class_=AsyncSession, expire_on_commit=False) + + +@pytest.fixture(scope="module") +def write_session_factory(write_engine) -> async_sessionmaker[AsyncSession]: + return async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + + +@pytest.fixture +def sync_engine(write_session_factory, graph_session_factory): + """Real SyncEngine wired to per-module isolated schemas.""" + from kt_worker_sync.sync_engine import SyncEngine + + return SyncEngine( + write_session_factory=write_session_factory, + graph_session_factory=graph_session_factory, + batch_size=100, + )