From 12453fcfcfa4e160cb1de950018e262497990ccf Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Sun, 19 Apr 2026 18:43:25 -0600 Subject: [PATCH 1/7] feat(ci): container-based integration tests + migration gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #3 and #5. Split unit tests (infra-free, key-free) from integration tests (real postgres/redis/qdrant/hatchet + API secrets). Removes integration tests from lefthook pre-push for faster local feedback; CI runs them per-package on PRs only when relevant code changes. CI changes (.github/workflows/test.yml) - backend-test job: drop service containers + DB env vars. Unit tests now run hermetically (1240 passing in under 60s). - backend-integration-test job (new): matrix per-package, boots postgres, postgres-write, redis, qdrant service containers; injects SERPER_KEY, OPENROUTER_API_KEY, OPENAI_API_KEY secrets. No BRAVE_KEY - Serper is primary provider; Brave tests skip cleanly via existing pytest.skip guards. - migrations-check job (new): applies both alembic migrations from scratch, asserts exactly one head per DB, sanity-tests downgrade/upgrade roundtrip. Runs on every PR regardless of path filter. - Path filter: add plugin-search-providers; dedupe worker-nodes block. Conftest split (tests/conftest.py → tests/integration/conftest.py) - libs/kt-{db,facts,graph,models,providers}, services/{api,worker-nodes, worker-bottomup,worker-sync,mcp}, plugins/backend-engine-search-providers - Top-level conftests keep only env setup (USE_HATCHET, SKIP_AUTH) and lightweight mock-friendly fixtures (_ambient_test_expense in models/ worker-nodes/worker-bottomup). - worker-{bottomup,sync,ingest} set a fake HATCHET_CLIENT_TOKEN to let modules that import the Hatchet client at load time collect tests. Test relocations (tests/ → tests/integration/) - libs/kt-db/tests/test_write_{dimensions,seeds}.py - use write_db_session - libs/kt-facts/tests/test_tiered_dedup_experiment.py - connects to write-db directly Alembic fix (libs/kt-db/alembic/versions/) - 212efc51d897 chained off e3eb15f51d7c instead of the shared initial_schema parent. Prior divergence would have failed the new migrations-check job. lefthook.yml / justfile - pre-push runs `just test-unit` (infra-free) instead of `just test-all`. - New `test-unit` and `test-integration` recipes; `test-all` remains as an alias that chains both. --- .github/workflows/test.yml | 202 +++++++++++++++--- justfile | 72 ++++--- lefthook.yml | 11 +- ...drop_convergence_reports_and_divergent_.py | 4 +- libs/kt-db/tests/conftest.py | 112 ---------- libs/kt-db/tests/integration/conftest.py | 112 ++++++++++ .../test_write_dimensions.py | 0 .../{ => integration}/test_write_seeds.py | 0 libs/kt-facts/tests/conftest.py | 78 ------- libs/kt-facts/tests/integration/conftest.py | 78 +++++++ .../test_tiered_dedup_experiment.py | 0 libs/kt-graph/tests/conftest.py | 114 ---------- libs/kt-graph/tests/integration/conftest.py | 114 ++++++++++ libs/kt-models/tests/conftest.py | 60 ------ libs/kt-models/tests/integration/conftest.py | 63 ++++++ libs/kt-providers/tests/conftest.py | 63 ------ .../tests/integration/conftest.py | 63 ++++++ .../tests/integration/conftest.py | 63 ++++++ services/api/tests/conftest.py | 78 ------- services/api/tests/integration/conftest.py | 78 +++++++ services/mcp/tests/conftest.py | 77 ------- services/worker-bottomup/tests/conftest.py | 70 +----- .../tests/integration/conftest.py | 62 ++++++ services/worker-ingest/tests/conftest.py | 15 ++ services/worker-nodes/tests/conftest.py | 60 ------ .../tests/integration/conftest.py | 63 ++++++ services/worker-sync/tests/conftest.py | 135 +----------- .../worker-sync/tests/integration/conftest.py | 128 +++++++++++ 28 files changed, 1082 insertions(+), 893 deletions(-) create mode 100644 libs/kt-db/tests/integration/conftest.py rename libs/kt-db/tests/{ => integration}/test_write_dimensions.py (100%) rename libs/kt-db/tests/{ => integration}/test_write_seeds.py (100%) create mode 100644 libs/kt-facts/tests/integration/conftest.py rename libs/kt-facts/tests/{ => integration}/test_tiered_dedup_experiment.py (100%) create mode 100644 libs/kt-graph/tests/integration/conftest.py create mode 100644 libs/kt-models/tests/integration/conftest.py create mode 100644 libs/kt-providers/tests/integration/conftest.py create mode 100644 plugins/backend-engine-search-providers/tests/integration/conftest.py create mode 100644 services/api/tests/integration/conftest.py create mode 100644 services/worker-bottomup/tests/integration/conftest.py create mode 100644 services/worker-ingest/tests/conftest.py create mode 100644 services/worker-nodes/tests/integration/conftest.py create mode 100644 services/worker-sync/tests/integration/conftest.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 768a8a92..af48bca7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,8 +17,10 @@ jobs: name: Detect Changes runs-on: ubuntu-latest outputs: - backend-matrix: ${{ steps.build-matrix.outputs.matrix }} + backend-matrix: ${{ steps.build-matrix.outputs.backend-matrix }} + integration-matrix: ${{ steps.build-matrix.outputs.integration-matrix }} has-backend: ${{ steps.build-matrix.outputs.has-backend }} + has-integration: ${{ steps.build-matrix.outputs.has-integration }} frontend: ${{ steps.filter.outputs.frontend }} steps: - uses: actions/checkout@v4 @@ -87,9 +89,6 @@ jobs: - 'libs/**' - 'pyproject.toml' - 'uv.lock' - - 'libs/**' - - 'pyproject.toml' - - 'uv.lock' worker-search: - 'services/worker-search/**' - 'libs/**' @@ -100,30 +99,64 @@ jobs: - 'libs/**' - 'pyproject.toml' - 'uv.lock' + plugin-search-providers: + - 'plugins/backend-engine-search-providers/**' + - 'libs/kt-providers/**' + - 'libs/kt-core-engine-api/**' + - 'libs/kt-db/**' + - 'libs/kt-config/**' + - 'pyproject.toml' + - 'uv.lock' frontend: - 'frontend/**' - - name: Build test matrix from changed packages + - name: Build test matrices from changed packages id: build-matrix run: | - MATRIX="[]" - add() { MATRIX=$(echo "$MATRIX" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]'); } + BACKEND="[]" + INTEGRATION="[]" + # Packages that have tests/integration/ directories in the tree. + # Keep this in sync with the physical layout. + declare -A HAS_INTEGRATION=( + [kt-db]=1 [kt-facts]=1 [kt-graph]=1 [kt-models]=1 [kt-providers]=1 + [api]=1 [worker-bottomup]=1 [worker-nodes]=1 [worker-sync]=1 + [plugin-search-providers]=1 + ) - [ "${{ steps.filter.outputs.kt-config }}" = "true" ] && add "kt-config" "libs/kt-config" - [ "${{ steps.filter.outputs.kt-db }}" = "true" ] && add "kt-db" "libs/kt-db" - [ "${{ steps.filter.outputs.kt-models }}" = "true" ] && add "kt-models" "libs/kt-models" - [ "${{ steps.filter.outputs.kt-providers }}" = "true" ] && add "kt-providers" "libs/kt-providers" - [ "${{ steps.filter.outputs.kt-graph }}" = "true" ] && add "kt-graph" "libs/kt-graph" - [ "${{ steps.filter.outputs.kt-facts }}" = "true" ] && add "kt-facts" "libs/kt-facts" - [ "${{ steps.filter.outputs.api }}" = "true" ] && add "api" "services/api" - [ "${{ steps.filter.outputs.mcp }}" = "true" ] && add "mcp" "services/mcp" - [ "${{ steps.filter.outputs.worker-bottomup }}" = "true" ] && add "worker-bottomup" "services/worker-bottomup" - [ "${{ steps.filter.outputs.worker-nodes }}" = "true" ] && add "worker-nodes" "services/worker-nodes" - [ "${{ steps.filter.outputs.worker-search }}" = "true" ] && add "worker-search" "services/worker-search" - [ "${{ steps.filter.outputs.worker-sync }}" = "true" ] && add "worker-sync" "services/worker-sync" + add_backend() { + BACKEND=$(echo "$BACKEND" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') + } + add_integration() { + INTEGRATION=$(echo "$INTEGRATION" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') + } + maybe_add() { + local name="$1" path="$2" changed="$3" + [ "$changed" = "true" ] || return 0 + add_backend "$name" "$path" + if [ -n "${HAS_INTEGRATION[$name]:-}" ]; then + add_integration "$name" "$path" + fi + } - echo "matrix={\"suite\":$(echo "$MATRIX" | jq -c '.')}" >> "$GITHUB_OUTPUT" - echo "has-backend=$([ "$(echo "$MATRIX" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" - echo "Changed suites: $(echo "$MATRIX" | jq -r '[.[].name] | join(", ")')" + maybe_add kt-config libs/kt-config "${{ steps.filter.outputs.kt-config }}" + maybe_add kt-db libs/kt-db "${{ steps.filter.outputs.kt-db }}" + maybe_add kt-models libs/kt-models "${{ steps.filter.outputs.kt-models }}" + maybe_add kt-providers libs/kt-providers "${{ steps.filter.outputs.kt-providers }}" + maybe_add kt-graph libs/kt-graph "${{ steps.filter.outputs.kt-graph }}" + maybe_add kt-facts libs/kt-facts "${{ steps.filter.outputs.kt-facts }}" + maybe_add api services/api "${{ steps.filter.outputs.api }}" + maybe_add mcp services/mcp "${{ steps.filter.outputs.mcp }}" + maybe_add worker-bottomup services/worker-bottomup "${{ steps.filter.outputs.worker-bottomup }}" + maybe_add worker-nodes services/worker-nodes "${{ steps.filter.outputs.worker-nodes }}" + maybe_add worker-search services/worker-search "${{ steps.filter.outputs.worker-search }}" + maybe_add worker-sync services/worker-sync "${{ steps.filter.outputs.worker-sync }}" + maybe_add plugin-search-providers plugins/backend-engine-search-providers "${{ steps.filter.outputs.plugin-search-providers }}" + + echo "backend-matrix={\"suite\":$(echo "$BACKEND" | jq -c '.')}" >> "$GITHUB_OUTPUT" + echo "integration-matrix={\"suite\":$(echo "$INTEGRATION" | jq -c '.')}" >> "$GITHUB_OUTPUT" + echo "has-backend=$([ "$(echo "$BACKEND" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" + echo "has-integration=$([ "$(echo "$INTEGRATION" | jq 'length')" -gt 0 ] && echo true || echo false)" >> "$GITHUB_OUTPUT" + echo "Changed unit suites: $(echo "$BACKEND" | jq -r '[.[].name] | join(", ")')" + echo "Changed integration suites: $(echo "$INTEGRATION" | jq -r '[.[].name] | join(", ")')" backend-lint: name: Backend Lint @@ -140,14 +173,113 @@ jobs: - run: uv run --frozen ruff check . - run: uv run --frozen ruff format --check . + migrations-check: + name: Migrations + runs-on: ubuntu-latest + services: + postgres: + image: pgvector/pgvector:pg16 + env: + POSTGRES_DB: knowledge_tree + POSTGRES_USER: kt + POSTGRES_PASSWORD: localdev + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U kt -d knowledge_tree" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + postgres-write: + image: postgres:16-alpine + env: + POSTGRES_DB: knowledge_tree_write + POSTGRES_USER: kt + POSTGRES_PASSWORD: localdev + ports: + - 5435:5432 + options: >- + --health-cmd "pg_isready -U kt -d knowledge_tree_write" + --health-interval 5s + --health-timeout 5s + --health-retries 5 + env: + DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5432/knowledge_tree + WRITE_DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5435/knowledge_tree_write + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + with: + version: "latest" + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: uv sync --all-packages --frozen + - name: Verify single graph-db head + run: | + cd libs/kt-db + heads=$(uv run --frozen alembic heads 2>/dev/null | grep -c '(head)' || true) + if [ "$heads" != "1" ]; then + echo "::error::Expected exactly 1 graph-db alembic head, found $heads" + uv run --frozen alembic heads + exit 1 + fi + - name: Verify single write-db head + run: | + cd libs/kt-db + heads=$(uv run --frozen alembic -c alembic_write.ini heads 2>/dev/null | grep -c '(head)' || true) + if [ "$heads" != "1" ]; then + echo "::error::Expected exactly 1 write-db alembic head, found $heads" + uv run --frozen alembic -c alembic_write.ini heads + exit 1 + fi + - name: Apply graph-db migrations + run: cd libs/kt-db && uv run --frozen alembic upgrade head + - name: Apply write-db migrations + run: cd libs/kt-db && uv run --frozen alembic -c alembic_write.ini upgrade head + - name: Downgrade and re-upgrade graph-db (sanity) + run: | + cd libs/kt-db + uv run --frozen alembic downgrade -1 + uv run --frozen alembic upgrade head + - name: Downgrade and re-upgrade write-db (sanity) + run: | + cd libs/kt-db + uv run --frozen alembic -c alembic_write.ini downgrade -1 + uv run --frozen alembic -c alembic_write.ini upgrade head + backend-test: - name: Test ${{ matrix.suite.name }} + name: Unit ${{ matrix.suite.name }} needs: changes if: needs.changes.outputs.has-backend == 'true' runs-on: ubuntu-latest strategy: fail-fast: false matrix: ${{ fromJSON(needs.changes.outputs.backend-matrix) }} + env: + SKIP_AUTH: "true" + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + with: + version: "latest" + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: uv sync --all-packages --frozen + - name: Run unit tests + run: >- + uv run --frozen pytest ${{ matrix.suite.path }}/tests/ -x -n auto + --ignore=${{ matrix.suite.path }}/tests/integration + + backend-integration-test: + name: Integration ${{ matrix.suite.name }} + needs: changes + if: needs.changes.outputs.has-integration == 'true' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: ${{ fromJSON(needs.changes.outputs.integration-matrix) }} services: postgres: image: pgvector/pgvector:pg16 @@ -184,12 +316,26 @@ jobs: --health-interval 5s --health-timeout 5s --health-retries 5 + qdrant: + image: qdrant/qdrant:v1.17.2 + ports: + - 6333:6333 + - 6334:6334 env: DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5432/knowledge_tree WRITE_DATABASE_URL: postgresql+asyncpg://kt:localdev@localhost:5435/knowledge_tree_write REDIS_URL: redis://localhost:6379 + QDRANT_URL: http://localhost:6333 SKIP_AUTH: "true" + # Fake JWT accepted by Hatchet-lite's ClientConfig validator. + # Payload: {server_url, grpc_broadcast_address, sub, iat}. HATCHET_CLIENT_TOKEN: "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ.ZmFrZXNpZw" + USE_HATCHET: "false" + # External-API secrets — BRAVE_KEY intentionally omitted (Serper is + # the primary provider; Brave tests skip cleanly via pytest.skip). + SERPER_KEY: ${{ secrets.SERPER_KEY }} + OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} steps: - uses: actions/checkout@v4 - uses: astral-sh/setup-uv@v4 @@ -199,10 +345,12 @@ jobs: with: python-version: "3.12" - run: uv sync --all-packages --frozen - - name: Run tests - run: >- - uv run --frozen pytest ${{ matrix.suite.path }}/tests/ -x -n auto - --ignore=${{ matrix.suite.path }}/tests/integration + - name: Apply graph-db migrations + run: cd libs/kt-db && uv run --frozen alembic upgrade head + - name: Apply write-db migrations + run: cd libs/kt-db && uv run --frozen alembic -c alembic_write.ini upgrade head + - name: Run integration tests + run: uv run --frozen pytest ${{ matrix.suite.path }}/tests/integration -x -n auto frontend-test: name: Frontend Tests diff --git a/justfile b/justfile index 481b5b8e..22bc703b 100644 --- a/justfile +++ b/justfile @@ -103,34 +103,52 @@ migrate-write: cd libs/kt-db && uv run alembic -c alembic_write.ini upgrade head # ── Testing ─────────────────────────────────────────────────────── +# Unit tests are infra-free and key-free. Integration tests need infra +# (`just up`) and run only against `tests/integration/` directories. -# Run all lib tests -test-libs: - cd libs/kt-config && uv run pytest -x - cd libs/kt-db && uv run pytest -x - cd libs/kt-flags && uv run pytest -x - cd libs/kt-models && uv run pytest -x - cd libs/kt-providers && uv run pytest -x - cd libs/kt-graph && uv run pytest -x - cd libs/kt-facts && uv run pytest -x - -# Run API tests -test-api: - cd services/api && uv run pytest -x - -# Run MCP tests -test-mcp: - cd services/mcp && uv run pytest -x - -# Run worker tests -test-workers: - cd services/worker-bottomup && uv run pytest -x - cd services/worker-search && uv run pytest -x - cd services/worker-nodes && uv run pytest -x - cd services/worker-ingest && uv run pytest -x - -# Run all tests -test-all: test-libs test-api test-workers +# Run unit tests for every backend package (no infra required) +test-unit: + #!/usr/bin/env bash + set -euo pipefail + for pkg in \ + libs/kt-config libs/kt-db libs/kt-models libs/kt-providers \ + libs/kt-graph libs/kt-facts libs/kt-hatchet libs/kt-qdrant \ + libs/kt-agents-core libs/kt-auth libs/kt-core-engine-api \ + libs/kt-flags libs/kt-rbac \ + services/api services/mcp \ + services/worker-bottomup services/worker-nodes services/worker-sync \ + services/worker-synthesis services/worker-ingest \ + plugins/backend-engine-search-providers \ + plugins/backend-engine-concept-extractor; do + if [ -d "$pkg/tests" ]; then + echo "── $pkg ──" + uv run --frozen pytest "$pkg/tests/" --ignore="$pkg/tests/integration" -x -q + fi + done + +# Run integration tests for every package that has an integration/ dir +# Requires `just up` (postgres, postgres-write, redis, qdrant, hatchet) +test-integration: + #!/usr/bin/env bash + set -euo pipefail + for pkg in \ + libs/kt-db libs/kt-facts libs/kt-graph libs/kt-models libs/kt-providers \ + services/api services/worker-bottomup services/worker-nodes services/worker-sync \ + plugins/backend-engine-search-providers; do + if [ -d "$pkg/tests/integration" ]; then + echo "── $pkg (integration) ──" + uv run --frozen pytest "$pkg/tests/integration" -x -q + fi + done + +# Run all backend tests (unit + integration). Needs infra up. +test-all: test-unit test-integration + +# Legacy aliases — prefer test-unit / test-integration +test-libs: test-unit +test-api: test-unit +test-mcp: test-unit +test-workers: test-unit # Frontend tests test-frontend: diff --git a/lefthook.yml b/lefthook.yml index fa7436af..9319ed31 100644 --- a/lefthook.yml +++ b/lefthook.yml @@ -28,13 +28,6 @@ pre-commit: pre-push: parallel: true jobs: - - name: python-tests + - name: python-unit-tests glob: "**/*.py" - run: | - just test-all || { - echo "" - echo "Tests failed. Make sure infrastructure is running:" - echo " just up # starts postgres, postgres-write, redis, qdrant, hatchet" - echo "" - exit 1 - } + run: just test-unit diff --git a/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py b/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py index 2dbebe52..deac7a01 100644 --- a/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py +++ b/libs/kt-db/alembic/versions/212efc51d897_drop_convergence_reports_and_divergent_.py @@ -1,7 +1,7 @@ """drop convergence_reports and divergent_claims Revision ID: 212efc51d897 -Revises: 489643109ccd +Revises: e3eb15f51d7c Create Date: 2026-04-17 17:29:09.860226 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "212efc51d897" -down_revision: Union[str, Sequence[str], None] = "489643109ccd" +down_revision: Union[str, Sequence[str], None] = "e3eb15f51d7c" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/libs/kt-db/tests/conftest.py b/libs/kt-db/tests/conftest.py index f68ec9af..160a1a0b 100644 --- a/libs/kt-db/tests/conftest.py +++ b/libs/kt-db/tests/conftest.py @@ -1,116 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - # Advisory lock prevents xdist workers from racing on CREATE EXTENSION - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Set search_path on every new connection (write-db doesn't support startup params) - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-db/tests/integration/conftest.py b/libs/kt-db/tests/integration/conftest.py new file mode 100644 index 00000000..67123d9c --- /dev/null +++ b/libs/kt-db/tests/integration/conftest.py @@ -0,0 +1,112 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event, text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + # Advisory lock prevents xdist workers from racing on CREATE EXTENSION + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +@pytest.fixture(scope="session") +def write_schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine(base_url, echo=False) + + # Set search_path on every new connection (write-db doesn't support startup params) + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + await conn.run_sync(WriteBase.metadata.create_all) + for table in WriteBase.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/libs/kt-db/tests/test_write_dimensions.py b/libs/kt-db/tests/integration/test_write_dimensions.py similarity index 100% rename from libs/kt-db/tests/test_write_dimensions.py rename to libs/kt-db/tests/integration/test_write_dimensions.py diff --git a/libs/kt-db/tests/test_write_seeds.py b/libs/kt-db/tests/integration/test_write_seeds.py similarity index 100% rename from libs/kt-db/tests/test_write_seeds.py rename to libs/kt-db/tests/integration/test_write_seeds.py diff --git a/libs/kt-facts/tests/conftest.py b/libs/kt-facts/tests/conftest.py index ba4bd353..ca4bced3 100644 --- a/libs/kt-facts/tests/conftest.py +++ b/libs/kt-facts/tests/conftest.py @@ -1,7 +1,5 @@ import os import sys -import uuid -from collections.abc import AsyncGenerator from pathlib import Path # Ensure the tests directory is on sys.path so that seed_fixtures can be imported @@ -9,79 +7,3 @@ os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - # Also create write-db tables so write_session tests work - for table in WriteBase.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_session(engine) -> AsyncGenerator[AsyncSession, None]: - """Separate session for write-db operations (pipeline calls commit()).""" - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - yield session diff --git a/libs/kt-facts/tests/integration/conftest.py b/libs/kt-facts/tests/integration/conftest.py new file mode 100644 index 00000000..ec1e7cfe --- /dev/null +++ b/libs/kt-facts/tests/integration/conftest.py @@ -0,0 +1,78 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + # Also create write-db tables so write_session tests work + for table in WriteBase.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(WriteBase.metadata.create_all) + for table in WriteBase.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_session(engine) -> AsyncGenerator[AsyncSession, None]: + """Separate session for write-db operations (pipeline calls commit()).""" + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + yield session diff --git a/libs/kt-facts/tests/test_tiered_dedup_experiment.py b/libs/kt-facts/tests/integration/test_tiered_dedup_experiment.py similarity index 100% rename from libs/kt-facts/tests/test_tiered_dedup_experiment.py rename to libs/kt-facts/tests/integration/test_tiered_dedup_experiment.py diff --git a/libs/kt-graph/tests/conftest.py b/libs/kt-graph/tests/conftest.py index e9531a50..160a1a0b 100644 --- a/libs/kt-graph/tests/conftest.py +++ b/libs/kt-graph/tests/conftest.py @@ -1,118 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -# ── Write-db fixtures (used by PublicGraphBridge integration tests) ── - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Write-db doesn't accept startup-time search_path (pgbouncer in - # transaction mode rejects it). Set it on every new connection - # instead so each session lands in the test schema. - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-graph/tests/integration/conftest.py b/libs/kt-graph/tests/integration/conftest.py new file mode 100644 index 00000000..3b6258f5 --- /dev/null +++ b/libs/kt-graph/tests/integration/conftest.py @@ -0,0 +1,114 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event, text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +# Write-db fixtures (used by PublicGraphBridge integration tests) + + +@pytest.fixture(scope="session") +def write_schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) + await setup_eng.dispose() + + eng = create_async_engine(base_url, echo=False) + + # Write-db doesn't accept startup-time search_path (pgbouncer in + # transaction mode rejects it). Set it on every new connection + # instead so each session lands in the test schema. + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + await conn.run_sync(WriteBase.metadata.create_all) + for table in WriteBase.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/libs/kt-models/tests/conftest.py b/libs/kt-models/tests/conftest.py index faa86030..14bb13b1 100644 --- a/libs/kt-models/tests/conftest.py +++ b/libs/kt-models/tests/conftest.py @@ -1,17 +1,10 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -27,56 +20,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-models/tests/integration/conftest.py b/libs/kt-models/tests/integration/conftest.py new file mode 100644 index 00000000..b1018dbd --- /dev/null +++ b/libs/kt-models/tests/integration/conftest.py @@ -0,0 +1,63 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/libs/kt-providers/tests/conftest.py b/libs/kt-providers/tests/conftest.py index a44bfaa3..160a1a0b 100644 --- a/libs/kt-providers/tests/conftest.py +++ b/libs/kt-providers/tests/conftest.py @@ -1,67 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/libs/kt-providers/tests/integration/conftest.py b/libs/kt-providers/tests/integration/conftest.py new file mode 100644 index 00000000..b1018dbd --- /dev/null +++ b/libs/kt-providers/tests/integration/conftest.py @@ -0,0 +1,63 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/plugins/backend-engine-search-providers/tests/integration/conftest.py b/plugins/backend-engine-search-providers/tests/integration/conftest.py new file mode 100644 index 00000000..b1018dbd --- /dev/null +++ b/plugins/backend-engine-search-providers/tests/integration/conftest.py @@ -0,0 +1,63 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/services/api/tests/conftest.py b/services/api/tests/conftest.py index e8805c00..160a1a0b 100644 --- a/services/api/tests/conftest.py +++ b/services/api/tests/conftest.py @@ -1,82 +1,4 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) -async def _ensure_qdrant_collections(): - """Ensure Qdrant collections exist before any tests run.""" - try: - from kt_qdrant.client import get_qdrant_client - from kt_qdrant.repositories.facts import QdrantFactRepository - from kt_qdrant.repositories.nodes import QdrantNodeRepository - - client = get_qdrant_client() - await QdrantFactRepository(client).ensure_collection() - await QdrantNodeRepository(client).ensure_collection() - except Exception: - pass # Qdrant may not be running in all test environments - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/api/tests/integration/conftest.py b/services/api/tests/integration/conftest.py new file mode 100644 index 00000000..6a797204 --- /dev/null +++ b/services/api/tests/integration/conftest.py @@ -0,0 +1,78 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) +async def _ensure_qdrant_collections(): + """Ensure Qdrant collections exist before any integration tests run.""" + try: + from kt_qdrant.client import get_qdrant_client + from kt_qdrant.repositories.facts import QdrantFactRepository + from kt_qdrant.repositories.nodes import QdrantNodeRepository + + client = get_qdrant_client() + await QdrantFactRepository(client).ensure_collection() + await QdrantNodeRepository(client).ensure_collection() + except Exception: + pass # Qdrant may not be running in all test environments + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/services/mcp/tests/conftest.py b/services/mcp/tests/conftest.py index eba7f1ca..03714efd 100644 --- a/services/mcp/tests/conftest.py +++ b/services/mcp/tests/conftest.py @@ -1,80 +1,3 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("SKIP_AUTH", "true") - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) -async def _ensure_qdrant_collections(): - try: - from kt_qdrant.client import get_qdrant_client - from kt_qdrant.repositories.facts import QdrantFactRepository - from kt_qdrant.repositories.nodes import QdrantNodeRepository - - client = get_qdrant_client() - await QdrantFactRepository(client).ensure_collection() - await QdrantNodeRepository(client).ensure_collection() - except Exception: - pass - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-bottomup/tests/conftest.py b/services/worker-bottomup/tests/conftest.py index 9e9a7acd..440284f2 100644 --- a/services/worker-bottomup/tests/conftest.py +++ b/services/worker-bottomup/tests/conftest.py @@ -1,17 +1,21 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") +# The bottom-up workflow module imports the Hatchet client at import time, +# which refuses to initialise without a token. Unit tests import from that +# module, so an opaque placeholder is sufficient to let collection succeed. +os.environ.setdefault( + "HATCHET_CLIENT_TOKEN", + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw", +) +os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") +os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -29,55 +33,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-bottomup/tests/integration/conftest.py b/services/worker-bottomup/tests/integration/conftest.py new file mode 100644 index 00000000..a42774f6 --- /dev/null +++ b/services/worker-bottomup/tests/integration/conftest.py @@ -0,0 +1,62 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/services/worker-ingest/tests/conftest.py b/services/worker-ingest/tests/conftest.py new file mode 100644 index 00000000..97ddb989 --- /dev/null +++ b/services/worker-ingest/tests/conftest.py @@ -0,0 +1,15 @@ +import os + +os.environ.setdefault("USE_HATCHET", "false") +os.environ.setdefault("SKIP_AUTH", "true") +# The ingest workflow modules import the Hatchet client at import time, +# which refuses to initialise without a token. Unit tests only touch pure +# helpers, so an opaque placeholder is sufficient. +os.environ.setdefault( + "HATCHET_CLIENT_TOKEN", + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw", +) +os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") +os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") diff --git a/services/worker-nodes/tests/conftest.py b/services/worker-nodes/tests/conftest.py index 7e629631..49251219 100644 --- a/services/worker-nodes/tests/conftest.py +++ b/services/worker-nodes/tests/conftest.py @@ -1,17 +1,10 @@ import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine -from kt_config.settings import get_settings -from kt_db.models import Base from kt_models.expense import ExpenseContext, reset_current_expense, set_current_expense @@ -27,56 +20,3 @@ def _ambient_test_expense(): yield finally: reset_current_expense(token) - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-nodes/tests/integration/conftest.py b/services/worker-nodes/tests/integration/conftest.py new file mode 100644 index 00000000..b1018dbd --- /dev/null +++ b/services/worker-nodes/tests/integration/conftest.py @@ -0,0 +1,63 @@ +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine + +from kt_config.settings import get_settings +from kt_db.models import Base + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/services/worker-sync/tests/conftest.py b/services/worker-sync/tests/conftest.py index bce65015..217b1878 100644 --- a/services/worker-sync/tests/conftest.py +++ b/services/worker-sync/tests/conftest.py @@ -1,141 +1,22 @@ -"""Shared fixtures for worker-sync integration tests. +"""Shared setup for worker-sync tests. -Mirrors the dual-DB schema-per-worker pattern from libs/kt-db/tests/conftest.py -so we can stand up a real SyncEngine against isolated graph-db + write-db -schemas. +The dedup workflow module imports the Hatchet client at import time, +which refuses to initialise without a token. Unit tests only touch +pure helpers, so any opaque placeholder is sufficient — but it must +be set before any kt_worker_sync import happens. """ from __future__ import annotations import os -import uuid -from collections.abc import AsyncGenerator os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -# The dedup workflow module imports the Hatchet client at import time, -# which refuses to initialise without a token. Unit-level tests only -# touch pure helpers, so any opaque placeholder is sufficient. os.environ.setdefault( "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkdW1teSJ9.dummy", + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw", ) os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import ( - AsyncEngine, - AsyncSession, - async_sessionmaker, - create_async_engine, -) - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def graph_schema_name(): - return _worker_schema() - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {graph_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{graph_schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = graph_schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {graph_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest.fixture(scope="session") -def graph_session_factory(graph_engine) -> async_sessionmaker[AsyncSession]: - return async_sessionmaker(graph_engine, class_=AsyncSession, expire_on_commit=False) - - -@pytest.fixture(scope="session") -def write_session_factory(write_engine) -> async_sessionmaker[AsyncSession]: - return async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - - -@pytest.fixture -def sync_engine(write_session_factory, graph_session_factory): - """Real SyncEngine wired to per-worker isolated schemas.""" - from kt_worker_sync.sync_engine import SyncEngine - - return SyncEngine( - write_session_factory=write_session_factory, - graph_session_factory=graph_session_factory, - batch_size=100, - ) diff --git a/services/worker-sync/tests/integration/conftest.py b/services/worker-sync/tests/integration/conftest.py new file mode 100644 index 00000000..c1a47ae4 --- /dev/null +++ b/services/worker-sync/tests/integration/conftest.py @@ -0,0 +1,128 @@ +"""Shared fixtures for worker-sync integration tests. + +Mirrors the dual-DB schema-per-worker pattern from libs/kt-db/tests/integration/conftest.py +so we can stand up a real SyncEngine against isolated graph-db + write-db +schemas. +""" + +from __future__ import annotations + +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event, text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def graph_schema_name(): + return _worker_schema() + + +@pytest.fixture(scope="session") +def write_schema_name(): + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {graph_schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{graph_schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = graph_schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {graph_schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine(base_url, echo=False) + + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + await conn.run_sync(WriteBase.metadata.create_all) + for table in WriteBase.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest.fixture(scope="session") +def graph_session_factory(graph_engine) -> async_sessionmaker[AsyncSession]: + return async_sessionmaker(graph_engine, class_=AsyncSession, expire_on_commit=False) + + +@pytest.fixture(scope="session") +def write_session_factory(write_engine) -> async_sessionmaker[AsyncSession]: + return async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + + +@pytest.fixture +def sync_engine(write_session_factory, graph_session_factory): + """Real SyncEngine wired to per-worker isolated schemas.""" + from kt_worker_sync.sync_engine import SyncEngine + + return SyncEngine( + write_session_factory=write_session_factory, + graph_session_factory=graph_session_factory, + batch_size=100, + ) From 532a1f43949c0a98312f0c47cf152aa85ed830d5 Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Sun, 19 Apr 2026 18:44:40 -0600 Subject: [PATCH 2/7] ci: drop unused OPENAI_API_KEY from integration job env Embeddings go through OpenRouter, so OPENAI_API_KEY is not configured as a repo secret. Keep SERPER_KEY + OPENROUTER_API_KEY only. --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index af48bca7..29579191 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -333,9 +333,9 @@ jobs: USE_HATCHET: "false" # External-API secrets — BRAVE_KEY intentionally omitted (Serper is # the primary provider; Brave tests skip cleanly via pytest.skip). + # OpenAI embeddings go through OpenRouter, so no OPENAI_API_KEY needed. SERPER_KEY: ${{ secrets.SERPER_KEY }} OPENROUTER_API_KEY: ${{ secrets.OPENROUTER_API_KEY }} - OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} steps: - uses: actions/checkout@v4 - uses: astral-sh/setup-uv@v4 From 301242d48c96ea33598519ce5b68eaf9d0f70d69 Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Sun, 19 Apr 2026 18:49:03 -0600 Subject: [PATCH 3/7] fix(ci): use valid qdrant tag and add Hatchet dummy token to worker-nodes - qdrant/qdrant:v1.17.2 does not exist; use v1.17.1. - services/worker-nodes/tests/conftest.py: set dummy HATCHET_CLIENT_TOKEN because auto_build imports the Hatchet client at module load, same as worker-bottomup, worker-ingest, and worker-sync. --- .github/workflows/test.yml | 2 +- services/worker-nodes/tests/conftest.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 29579191..03f57a09 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -317,7 +317,7 @@ jobs: --health-timeout 5s --health-retries 5 qdrant: - image: qdrant/qdrant:v1.17.2 + image: qdrant/qdrant:v1.17.1 ports: - 6333:6333 - 6334:6334 diff --git a/services/worker-nodes/tests/conftest.py b/services/worker-nodes/tests/conftest.py index 49251219..6fe036dd 100644 --- a/services/worker-nodes/tests/conftest.py +++ b/services/worker-nodes/tests/conftest.py @@ -2,6 +2,17 @@ os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") +# auto_build imports the Hatchet client at module load, which refuses +# to initialise without a token. Unit tests only touch pure helpers, +# so an opaque placeholder is sufficient. +os.environ.setdefault( + "HATCHET_CLIENT_TOKEN", + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw", +) +os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") +os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") import pytest From 8b833cb7840ec38d1915e3b2322c7a8b61b751be Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Sun, 19 Apr 2026 18:53:53 -0600 Subject: [PATCH 4/7] fix(ci): only add packages with real integration tests to matrix worker-bottomup and worker-nodes have tests/integration/ dirs with just a conftest+__init__ but no test files. Previously they'd exit 5 (no tests collected) and fail the job. Now the matrix builder checks for actual test_*.py files before adding a package. --- .github/workflows/test.yml | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 03f57a09..62001fa3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -114,13 +114,6 @@ jobs: run: | BACKEND="[]" INTEGRATION="[]" - # Packages that have tests/integration/ directories in the tree. - # Keep this in sync with the physical layout. - declare -A HAS_INTEGRATION=( - [kt-db]=1 [kt-facts]=1 [kt-graph]=1 [kt-models]=1 [kt-providers]=1 - [api]=1 [worker-bottomup]=1 [worker-nodes]=1 [worker-sync]=1 - [plugin-search-providers]=1 - ) add_backend() { BACKEND=$(echo "$BACKEND" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') @@ -128,11 +121,20 @@ jobs: add_integration() { INTEGRATION=$(echo "$INTEGRATION" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') } + # Detect real integration tests (not just a conftest/__init__) so + # empty integration/ dirs don't produce "no tests ran" failures. + has_integration_tests() { + local path="$1" + [ -d "$path/tests/integration" ] || return 1 + local count + count=$(find "$path/tests/integration" -maxdepth 1 -name 'test_*.py' -type f | wc -l) + [ "$count" -gt 0 ] + } maybe_add() { local name="$1" path="$2" changed="$3" [ "$changed" = "true" ] || return 0 add_backend "$name" "$path" - if [ -n "${HAS_INTEGRATION[$name]:-}" ]; then + if has_integration_tests "$path"; then add_integration "$name" "$path" fi } From 82904b953aa219015af931c35ece771b0baa0d8e Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Mon, 20 Apr 2026 07:23:31 -0600 Subject: [PATCH 5/7] refactor(ci): extract shared DB + Hatchet test helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #234 review feedback: - `kt_db.testing` pytest plugin exposes engine/db_session/write_engine/ write_db_session fixtures. 8 integration conftests reduced from ~60 lines of duplication to a single `pytest_plugins = ["kt_db.testing"]` line. kt-facts (shared graph+write schema) and worker-sync (renamed fixtures + sync_engine) keep their own — they diverge meaningfully. - `kt_hatchet.testing.install_fake_hatchet_env()` replaces the inline JWT literal repeated in 4 worker conftests (bottomup, nodes, ingest, sync). Single source of the fake token and the Hatchet env triad. - Remove legacy justfile aliases (test-libs/test-api/test-mcp/ test-workers). All collapsed to test-unit, leaving the names misleading. - test.yml comment explains that integration tests use Base.metadata.create_all on per-worker schemas for xdist isolation, so model/migration drift is caught by `migrations-check`, not the integration matrix. --- .github/workflows/test.yml | 5 + justfile | 6 - libs/kt-db/src/kt_db/testing.py | 144 ++++++++++++++++++ libs/kt-db/tests/integration/conftest.py | 113 +------------- libs/kt-graph/tests/integration/conftest.py | 115 +------------- libs/kt-hatchet/src/kt_hatchet/testing.py | 35 +++++ libs/kt-models/tests/integration/conftest.py | 64 +------- .../tests/integration/conftest.py | 64 +------- .../tests/integration/conftest.py | 64 +------- services/api/tests/integration/conftest.py | 62 +------- services/worker-bottomup/tests/conftest.py | 17 +-- .../tests/integration/conftest.py | 63 +------- services/worker-ingest/tests/conftest.py | 17 +-- services/worker-nodes/tests/conftest.py | 17 +-- .../tests/integration/conftest.py | 64 +------- services/worker-sync/tests/conftest.py | 17 +-- 16 files changed, 212 insertions(+), 655 deletions(-) create mode 100644 libs/kt-db/src/kt_db/testing.py create mode 100644 libs/kt-hatchet/src/kt_hatchet/testing.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 62001fa3..1b535a26 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -347,6 +347,11 @@ jobs: with: python-version: "3.12" - run: uv sync --all-packages --frozen + # Note: migrations applied to public schema for connectivity sanity. + # Integration fixtures still use Base.metadata.create_all against + # per-worker schemas for isolation under xdist. Model/migration + # drift is therefore caught by the `migrations-check` job above, + # not here. - name: Apply graph-db migrations run: cd libs/kt-db && uv run --frozen alembic upgrade head - name: Apply write-db migrations diff --git a/justfile b/justfile index 22bc703b..93e4973b 100644 --- a/justfile +++ b/justfile @@ -144,12 +144,6 @@ test-integration: # Run all backend tests (unit + integration). Needs infra up. test-all: test-unit test-integration -# Legacy aliases — prefer test-unit / test-integration -test-libs: test-unit -test-api: test-unit -test-mcp: test-unit -test-workers: test-unit - # Frontend tests test-frontend: cd frontend && pnpm lint && pnpm type-check && pnpm test diff --git a/libs/kt-db/src/kt_db/testing.py b/libs/kt-db/src/kt_db/testing.py new file mode 100644 index 00000000..fa5cebad --- /dev/null +++ b/libs/kt-db/src/kt_db/testing.py @@ -0,0 +1,144 @@ +"""Pytest plugin providing schema-per-worker DB fixtures for integration tests. + +Activate by adding to an integration conftest: + + pytest_plugins = ["kt_db.testing"] + +Fixtures provided: + settings — cached Settings instance + schema_name — random graph-db schema name, session-scoped + engine — AsyncEngine bound to the isolated graph-db schema + (Base.metadata.create_all applied, pgvector + pg_trgm + extensions ensured) + db_session — per-test AsyncSession wrapped in a rolled-back txn + write_schema_name — random write-db schema name, session-scoped + write_engine — AsyncEngine bound to the isolated write-db schema + (WriteBase.metadata.create_all applied) + write_db_session — per-test write-db AsyncSession wrapped in a + rolled-back txn + +Packages that need a shared graph+write schema (e.g. kt-facts) or +renamed fixtures (e.g. worker-sync) should define those locally +instead of importing this plugin. +""" + +from __future__ import annotations + +import uuid +from collections.abc import AsyncGenerator + +import pytest +import pytest_asyncio +from sqlalchemy import event, text +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +from kt_config.settings import get_settings +from kt_db.models import Base +from kt_db.write_models import WriteBase + + +def _worker_schema() -> str: + return f"test_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def settings(): + return get_settings() + + +@pytest.fixture(scope="session") +def schema_name() -> str: + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + # Advisory lock prevents xdist workers from racing on CREATE EXTENSION. + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine( + base_url, + echo=False, + connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, + ) + async with eng.begin() as conn: + for table in Base.metadata.sorted_tables: + table.schema = schema_name + await conn.run_sync(Base.metadata.create_all) + for table in Base.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + async with session.begin(): + yield session + await session.rollback() + + +@pytest.fixture(scope="session") +def write_schema_name() -> str: + return _worker_schema() + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: + base_url = settings.write_database_url + setup_eng = create_async_engine(base_url, echo=False) + async with setup_eng.begin() as conn: + await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await setup_eng.dispose() + + eng = create_async_engine(base_url, echo=False) + + # Write-db runs behind pgbouncer in transaction-pooling mode, which + # rejects startup-time search_path. Set it per connection instead. + @event.listens_for(eng.sync_engine, "connect") + def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] + cursor = dbapi_conn.cursor() + cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.close() + + async with eng.begin() as conn: + for table in WriteBase.metadata.sorted_tables: + table.schema = write_schema_name + await conn.run_sync(WriteBase.metadata.create_all) + for table in WriteBase.metadata.sorted_tables: + table.schema = None + yield eng + await eng.dispose() + cleanup_eng = create_async_engine(base_url, echo=False) + async with cleanup_eng.begin() as conn: + await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) + await cleanup_eng.dispose() + + +@pytest_asyncio.fixture(loop_scope="session") +async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: + factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: + async with session.begin(): + yield session + await session.rollback() diff --git a/libs/kt-db/tests/integration/conftest.py b/libs/kt-db/tests/integration/conftest.py index 67123d9c..a680a760 100644 --- a/libs/kt-db/tests/integration/conftest.py +++ b/libs/kt-db/tests/integration/conftest.py @@ -1,112 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - # Advisory lock prevents xdist workers from racing on CREATE EXTENSION - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Set search_path on every new connection (write-db doesn't support startup params) - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-graph/tests/integration/conftest.py b/libs/kt-graph/tests/integration/conftest.py index 3b6258f5..a680a760 100644 --- a/libs/kt-graph/tests/integration/conftest.py +++ b/libs/kt-graph/tests/integration/conftest.py @@ -1,114 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import event, text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base -from kt_db.write_models import WriteBase - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() - - -# Write-db fixtures (used by PublicGraphBridge integration tests) - - -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await setup_eng.dispose() - - eng = create_async_engine(base_url, echo=False) - - # Write-db doesn't accept startup-time search_path (pgbouncer in - # transaction mode rejects it). Set it on every new connection - # instead so each session lands in the test schema. - @event.listens_for(eng.sync_engine, "connect") - def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] - cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") - cursor.close() - - async with eng.begin() as conn: - for table in WriteBase.metadata.sorted_tables: - table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def write_db_session(write_engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-hatchet/src/kt_hatchet/testing.py b/libs/kt-hatchet/src/kt_hatchet/testing.py new file mode 100644 index 00000000..36cfb9c5 --- /dev/null +++ b/libs/kt-hatchet/src/kt_hatchet/testing.py @@ -0,0 +1,35 @@ +"""Test helpers for packages that import the Hatchet client at module load. + +Some worker modules (worker-bottomup, worker-nodes, worker-ingest, +worker-sync) call ``get_hatchet()`` at import time. Unit tests that +only touch pure helpers still trigger that import, and the Hatchet +client refuses to initialise without a valid JWT. This module exports +a fake-but-parseable JWT and a one-call setup helper so test conftests +don't duplicate the literal. +""" + +from __future__ import annotations + +import os + +# JWT with header {"alg": "HS256", "typ": "JWT"} and payload +# {"server_url": "http://localhost:8080", "grpc_broadcast_address": +# "localhost:7070", "sub": "ci", "iat": 1516239022}. Signature is not +# verified by the Hatchet client — only the payload fields are read. +FAKE_HATCHET_JWT = ( + "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." + "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." + "ZmFrZXNpZw" +) + + +def install_fake_hatchet_env() -> None: + """Populate Hatchet env vars with opaque placeholders for test collection. + + Uses ``setdefault`` so a real token from ``.env`` or CI takes precedence. + Safe to call multiple times. + """ + os.environ.setdefault("USE_HATCHET", "false") + os.environ.setdefault("HATCHET_CLIENT_TOKEN", FAKE_HATCHET_JWT) + os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") + os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") diff --git a/libs/kt-models/tests/integration/conftest.py b/libs/kt-models/tests/integration/conftest.py index b1018dbd..a680a760 100644 --- a/libs/kt-models/tests/integration/conftest.py +++ b/libs/kt-models/tests/integration/conftest.py @@ -1,63 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/libs/kt-providers/tests/integration/conftest.py b/libs/kt-providers/tests/integration/conftest.py index b1018dbd..a680a760 100644 --- a/libs/kt-providers/tests/integration/conftest.py +++ b/libs/kt-providers/tests/integration/conftest.py @@ -1,63 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/plugins/backend-engine-search-providers/tests/integration/conftest.py b/plugins/backend-engine-search-providers/tests/integration/conftest.py index b1018dbd..a680a760 100644 --- a/plugins/backend-engine-search-providers/tests/integration/conftest.py +++ b/plugins/backend-engine-search-providers/tests/integration/conftest.py @@ -1,63 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/services/api/tests/integration/conftest.py b/services/api/tests/integration/conftest.py index 6a797204..68e92669 100644 --- a/services/api/tests/integration/conftest.py +++ b/services/api/tests/integration/conftest.py @@ -1,57 +1,6 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() +pytest_plugins = ["kt_db.testing"] @pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) @@ -67,12 +16,3 @@ async def _ensure_qdrant_collections(): await QdrantNodeRepository(client).ensure_collection() except Exception: pass # Qdrant may not be running in all test environments - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() diff --git a/services/worker-bottomup/tests/conftest.py b/services/worker-bottomup/tests/conftest.py index 440284f2..4e665815 100644 --- a/services/worker-bottomup/tests/conftest.py +++ b/services/worker-bottomup/tests/conftest.py @@ -1,18 +1,11 @@ import os -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -# The bottom-up workflow module imports the Hatchet client at import time, -# which refuses to initialise without a token. Unit tests import from that -# module, so an opaque placeholder is sufficient to let collection succeed. -os.environ.setdefault( - "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." - "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." - "ZmFrZXNpZw", -) -os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") -os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") + +from kt_hatchet.testing import install_fake_hatchet_env + +# bottom_up.workflow imports the Hatchet client at module load. +install_fake_hatchet_env() import pytest diff --git a/services/worker-bottomup/tests/integration/conftest.py b/services/worker-bottomup/tests/integration/conftest.py index a42774f6..a680a760 100644 --- a/services/worker-bottomup/tests/integration/conftest.py +++ b/services/worker-bottomup/tests/integration/conftest.py @@ -1,62 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/services/worker-ingest/tests/conftest.py b/services/worker-ingest/tests/conftest.py index 97ddb989..72b4ec87 100644 --- a/services/worker-ingest/tests/conftest.py +++ b/services/worker-ingest/tests/conftest.py @@ -1,15 +1,8 @@ import os -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -# The ingest workflow modules import the Hatchet client at import time, -# which refuses to initialise without a token. Unit tests only touch pure -# helpers, so an opaque placeholder is sufficient. -os.environ.setdefault( - "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." - "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." - "ZmFrZXNpZw", -) -os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") -os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") + +from kt_hatchet.testing import install_fake_hatchet_env + +# Ingest workflow modules import the Hatchet client at module load. +install_fake_hatchet_env() diff --git a/services/worker-nodes/tests/conftest.py b/services/worker-nodes/tests/conftest.py index 6fe036dd..cea82307 100644 --- a/services/worker-nodes/tests/conftest.py +++ b/services/worker-nodes/tests/conftest.py @@ -1,18 +1,11 @@ import os -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -# auto_build imports the Hatchet client at module load, which refuses -# to initialise without a token. Unit tests only touch pure helpers, -# so an opaque placeholder is sufficient. -os.environ.setdefault( - "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." - "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." - "ZmFrZXNpZw", -) -os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") -os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") + +from kt_hatchet.testing import install_fake_hatchet_env + +# auto_build imports the Hatchet client at module load. +install_fake_hatchet_env() import pytest diff --git a/services/worker-nodes/tests/integration/conftest.py b/services/worker-nodes/tests/integration/conftest.py index b1018dbd..a680a760 100644 --- a/services/worker-nodes/tests/integration/conftest.py +++ b/services/worker-nodes/tests/integration/conftest.py @@ -1,63 +1 @@ -import uuid -from collections.abc import AsyncGenerator - -import pytest -import pytest_asyncio -from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine - -from kt_config.settings import get_settings -from kt_db.models import Base - - -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - -@pytest.fixture(scope="session") -def settings(): - return get_settings() - - -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") -async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: - base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() - - eng = create_async_engine( - base_url, - echo=False, - connect_args={"server_settings": {"search_path": f"{schema_name},public"}}, - ) - async with eng.begin() as conn: - for table in Base.metadata.sorted_tables: - table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(loop_scope="session") -async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: - async with session.begin(): - yield session - await session.rollback() +pytest_plugins = ["kt_db.testing"] diff --git a/services/worker-sync/tests/conftest.py b/services/worker-sync/tests/conftest.py index 217b1878..51b8edd7 100644 --- a/services/worker-sync/tests/conftest.py +++ b/services/worker-sync/tests/conftest.py @@ -1,22 +1,15 @@ """Shared setup for worker-sync tests. The dedup workflow module imports the Hatchet client at import time, -which refuses to initialise without a token. Unit tests only touch -pure helpers, so any opaque placeholder is sufficient — but it must -be set before any kt_worker_sync import happens. +so a fake token must be set before any kt_worker_sync import happens. """ from __future__ import annotations import os -os.environ.setdefault("USE_HATCHET", "false") os.environ.setdefault("SKIP_AUTH", "true") -os.environ.setdefault( - "HATCHET_CLIENT_TOKEN", - "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9." - "eyJzZXJ2ZXJfdXJsIjogImh0dHA6Ly9sb2NhbGhvc3Q6ODA4MCIsICJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjogImxvY2FsaG9zdDo3MDcwIiwgInN1YiI6ICJjaSIsICJpYXQiOiAxNTE2MjM5MDIyfQ." - "ZmFrZXNpZw", -) -os.environ.setdefault("HATCHET_CLIENT_HOST_PORT", "localhost:7070") -os.environ.setdefault("HATCHET_CLIENT_TLS_STRATEGY", "none") + +from kt_hatchet.testing import install_fake_hatchet_env + +install_fake_hatchet_env() From d1f721324120a1114f181d969cbc00f95763fa8d Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Mon, 20 Apr 2026 07:33:40 -0600 Subject: [PATCH 6/7] ci: swap kt-models integration test model to gemma + serialise job OpenRouter returned 429 on kt-models integration run. Two mitigations: - Switch test model from openrouter/google/gemini-2.0-flash-001 to openrouter/google/gemma-4-26b-a4b-it:nitro (matches Settings.decomposition_model). - Add per-suite pytest -n override to the integration matrix so suites that hit rate-limited external APIs can run serially. kt-models pinned to -n 1; all others default to -n auto. --- .github/workflows/test.yml | 16 +++++++++++----- .../tests/integration/test_model_gateway.py | 6 +++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1b535a26..5d977d8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -119,7 +119,13 @@ jobs: BACKEND=$(echo "$BACKEND" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') } add_integration() { - INTEGRATION=$(echo "$INTEGRATION" | jq --arg n "$1" --arg p "$2" '. + [{"name": $n, "path": $p}]') + # $3 = pytest -n workers (optional, default "auto"). Serialise + # suites that hit external rate-limited APIs (e.g. kt-models + # → OpenRouter) by passing workers="1". + local workers="${3:-auto}" + INTEGRATION=$(echo "$INTEGRATION" \ + | jq --arg n "$1" --arg p "$2" --arg w "$workers" \ + '. + [{"name": $n, "path": $p, "workers": $w}]') } # Detect real integration tests (not just a conftest/__init__) so # empty integration/ dirs don't produce "no tests ran" failures. @@ -131,17 +137,17 @@ jobs: [ "$count" -gt 0 ] } maybe_add() { - local name="$1" path="$2" changed="$3" + local name="$1" path="$2" changed="$3" workers="${4:-auto}" [ "$changed" = "true" ] || return 0 add_backend "$name" "$path" if has_integration_tests "$path"; then - add_integration "$name" "$path" + add_integration "$name" "$path" "$workers" fi } maybe_add kt-config libs/kt-config "${{ steps.filter.outputs.kt-config }}" maybe_add kt-db libs/kt-db "${{ steps.filter.outputs.kt-db }}" - maybe_add kt-models libs/kt-models "${{ steps.filter.outputs.kt-models }}" + maybe_add kt-models libs/kt-models "${{ steps.filter.outputs.kt-models }}" 1 maybe_add kt-providers libs/kt-providers "${{ steps.filter.outputs.kt-providers }}" maybe_add kt-graph libs/kt-graph "${{ steps.filter.outputs.kt-graph }}" maybe_add kt-facts libs/kt-facts "${{ steps.filter.outputs.kt-facts }}" @@ -357,7 +363,7 @@ jobs: - name: Apply write-db migrations run: cd libs/kt-db && uv run --frozen alembic -c alembic_write.ini upgrade head - name: Run integration tests - run: uv run --frozen pytest ${{ matrix.suite.path }}/tests/integration -x -n auto + run: uv run --frozen pytest ${{ matrix.suite.path }}/tests/integration -x -n ${{ matrix.suite.workers }} frontend-test: name: Frontend Tests diff --git a/libs/kt-models/tests/integration/test_model_gateway.py b/libs/kt-models/tests/integration/test_model_gateway.py index 97f49c5f..77a6e6d4 100644 --- a/libs/kt-models/tests/integration/test_model_gateway.py +++ b/libs/kt-models/tests/integration/test_model_gateway.py @@ -16,7 +16,7 @@ async def test_generate(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): result = await gateway.generate( - "openrouter/google/gemini-2.0-flash-001", + "openrouter/google/gemma-4-26b-a4b-it:nitro", [{"role": "user", "content": "Say hello in one word."}], max_tokens=50, ) @@ -30,7 +30,7 @@ async def test_generate_parallel(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): results = await gateway.generate_parallel( - model_ids=["openrouter/google/gemini-2.0-flash-001"], + model_ids=["openrouter/google/gemma-4-26b-a4b-it:nitro"], messages=[{"role": "user", "content": "Say hello in one word."}], max_tokens=50, ) @@ -47,7 +47,7 @@ async def test_generate_with_system_prompt(): gateway = ModelGateway() with expense_scope(_TEST_EXPENSE): result = await gateway.generate( - "openrouter/google/gemini-2.0-flash-001", + "openrouter/google/gemma-4-26b-a4b-it:nitro", [{"role": "user", "content": "What should I say?"}], system_prompt="Always respond with exactly the word 'banana'.", max_tokens=50, From 4b1780f3a1674946d053ae7dc372decddc8849b2 Mon Sep 17 00:00:00 2001 From: charlie83Gs Date: Mon, 20 Apr 2026 07:55:17 -0600 Subject: [PATCH 7/7] refactor(tests): per-module schema isolation for integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flaky runs observed on kt-db integration came from session-scoped state leaking across test modules (e.g. write_seeds committing rows that test_trigram_dedup read). Move schema creation/teardown from session scope to module scope so every test file gets its own random schema that's dropped on module teardown. - kt_db.testing: engine/schema_name/write_engine/write_schema_name are now module-scoped; db_session/write_db_session remain function-scoped with per-test txn rollback. Extensions install once per session. - kt-facts and worker-sync conftests follow the same pattern for their divergent fixtures (shared schema / renamed graph_engine). - services/api/tests/integration/*.py: downgrade local session-scoped fixtures (api_app, wi_client, etc.) to module scope to match the new engine scope. Result: test_trigram_dedup.test_abbreviation_found no longer needs to be deselected — runs green alongside the full suite. 152 kt-db integration tests in 6.6s (session-scope was 3s; the cost of per-module teardown is modest). Each CI run uses randomly-named schemas so concurrent runs and leftover data never interfere. --- libs/kt-db/src/kt_db/testing.py | 153 +++++++++++------- libs/kt-facts/tests/integration/conftest.py | 79 +++++---- services/api/tests/integration/test_api.py | 6 +- .../api/tests/integration/test_permissions.py | 2 +- .../integration/test_waitlist_invites.py | 8 +- .../worker-sync/tests/integration/conftest.py | 107 ++++++------ 6 files changed, 199 insertions(+), 156 deletions(-) diff --git a/libs/kt-db/src/kt_db/testing.py b/libs/kt-db/src/kt_db/testing.py index fa5cebad..593654a0 100644 --- a/libs/kt-db/src/kt_db/testing.py +++ b/libs/kt-db/src/kt_db/testing.py @@ -1,25 +1,32 @@ -"""Pytest plugin providing schema-per-worker DB fixtures for integration tests. +"""Pytest plugin providing per-module schema isolation for integration tests. Activate by adding to an integration conftest: pytest_plugins = ["kt_db.testing"] Fixtures provided: - settings — cached Settings instance - schema_name — random graph-db schema name, session-scoped - engine — AsyncEngine bound to the isolated graph-db schema - (Base.metadata.create_all applied, pgvector + pg_trgm - extensions ensured) - db_session — per-test AsyncSession wrapped in a rolled-back txn - write_schema_name — random write-db schema name, session-scoped - write_engine — AsyncEngine bound to the isolated write-db schema - (WriteBase.metadata.create_all applied) - write_db_session — per-test write-db AsyncSession wrapped in a - rolled-back txn - -Packages that need a shared graph+write schema (e.g. kt-facts) or -renamed fixtures (e.g. worker-sync) should define those locally -instead of importing this plugin. + + settings — cached Settings instance (session scope) + schema_name — unique graph-db schema per test module + engine — AsyncEngine bound to that schema (module scope) + db_session — per-test AsyncSession wrapped in a rolled-back txn + write_schema_name — unique write-db schema per test module + write_engine — AsyncEngine bound to that schema (module scope) + write_db_session — per-test write-db AsyncSession in a rolled-back txn + +Rationale: one schema per test module gives cross-file isolation (the +source of the previous flakiness — tests in ``test_trigram_dedup.py`` +committing state that leaked into ``test_write_seeds.py``) without +paying the CREATE SCHEMA / create_all cost per-test. Within a module, +``db_session`` wraps each test in a txn + rollback. + +Schemas are dropped on module teardown so the database doesn't +accumulate ``test_*`` leftovers across runs. Extensions are installed +once per pytest worker (session scope, xdist-safe via advisory lock). + +Packages that diverge — kt-facts (graph+write in one schema) and +worker-sync (renamed fixtures) — define their own fixtures in +conftest using the helpers exported here. """ from __future__ import annotations @@ -42,8 +49,42 @@ from kt_db.write_models import WriteBase -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" +def unique_schema(prefix: str = "test") -> str: + """Return a short random schema name, e.g. ``test_a3f91b0c``.""" + return f"{prefix}_{uuid.uuid4().hex[:8]}" + + +async def install_graph_extensions(base_url: str) -> None: + """Install pgvector + pg_trgm on graph-db (idempotent, xdist-safe).""" + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await eng.dispose() + + +async def install_write_extensions(base_url: str) -> None: + """Install pg_trgm on write-db (idempotent, xdist-safe).""" + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) + await eng.dispose() + + +async def create_schema(base_url: str, schema: str) -> None: + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema}"')) + await eng.dispose() + + +async def drop_schema(base_url: str, schema: str) -> None: + eng = create_async_engine(base_url, echo=False) + async with eng.begin() as conn: + await conn.execute(text(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE')) + await eng.dispose() @pytest.fixture(scope="session") @@ -51,22 +92,19 @@ def settings(): return get_settings() -@pytest.fixture(scope="session") +# ── Graph-db fixtures (per-module schema) ──────────────────────────── + + +@pytest.fixture(scope="module") def schema_name() -> str: - return _worker_schema() + return unique_schema() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - # Advisory lock prevents xdist workers from racing on CREATE EXTENSION. - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() + await install_graph_extensions(base_url) + await create_schema(base_url, schema_name) eng = create_async_engine( base_url, @@ -76,15 +114,16 @@ async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: async with eng.begin() as conn: for table in Base.metadata.sorted_tables: table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, schema_name) @pytest_asyncio.fixture(loop_scope="session") @@ -96,20 +135,19 @@ async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: await session.rollback() -@pytest.fixture(scope="session") +# ── Write-db fixtures (per-module schema) ──────────────────────────── + + +@pytest.fixture(scope="module") def write_schema_name() -> str: - return _worker_schema() + return unique_schema() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() + await install_write_extensions(base_url) + await create_schema(base_url, write_schema_name) eng = create_async_engine(base_url, echo=False) @@ -118,21 +156,22 @@ async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngin @event.listens_for(eng.sync_engine, "connect") def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.execute(f'SET search_path TO "{write_schema_name}", public') cursor.close() async with eng.begin() as conn: for table in WriteBase.metadata.sorted_tables: table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, write_schema_name) @pytest_asyncio.fixture(loop_scope="session") diff --git a/libs/kt-facts/tests/integration/conftest.py b/libs/kt-facts/tests/integration/conftest.py index ec1e7cfe..2d9cd804 100644 --- a/libs/kt-facts/tests/integration/conftest.py +++ b/libs/kt-facts/tests/integration/conftest.py @@ -1,40 +1,44 @@ -import uuid +"""kt-facts integration conftest. + +kt-facts writes both graph-db and write-db tables into a *single* schema +so the fact pipeline can exercise both sides within one session. Schema +is unique per test module — see kt_db.testing for the helpers. +""" + +from __future__ import annotations + from collections.abc import AsyncGenerator import pytest import pytest_asyncio -from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from kt_config.settings import get_settings from kt_db.models import Base +from kt_db.testing import ( + create_schema, + drop_schema, + install_graph_extensions, + unique_schema, +) from kt_db.write_models import WriteBase -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - @pytest.fixture(scope="session") def settings(): return get_settings() -@pytest.fixture(scope="session") -def schema_name(): - return _worker_schema() +@pytest.fixture(scope="module") +def schema_name() -> str: + return unique_schema() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() + await install_graph_extensions(base_url) + await create_schema(base_url, schema_name) eng = create_async_engine( base_url, @@ -44,27 +48,29 @@ async def engine(settings, schema_name) -> AsyncGenerator[AsyncEngine, None]: async with eng.begin() as conn: for table in Base.metadata.sorted_tables: table.schema = schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - # Also create write-db tables so write_session tests work + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None for table in WriteBase.metadata.sorted_tables: table.schema = schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")) - await cleanup_eng.dispose() + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, schema_name) @pytest_asyncio.fixture(loop_scope="session") async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: async with session.begin(): yield session await session.rollback() @@ -72,7 +78,10 @@ async def db_session(engine) -> AsyncGenerator[AsyncSession, None]: @pytest_asyncio.fixture(loop_scope="session") async def write_session(engine) -> AsyncGenerator[AsyncSession, None]: - """Separate session for write-db operations (pipeline calls commit()).""" - session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as session: + """Session that does NOT roll back (pipelines call commit() internally). + + Safe because the surrounding module-scoped schema is dropped on teardown. + """ + factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with factory() as session: yield session diff --git a/services/api/tests/integration/test_api.py b/services/api/tests/integration/test_api.py index 84be0a1c..93233b41 100644 --- a/services/api/tests/integration/test_api.py +++ b/services/api/tests/integration/test_api.py @@ -21,13 +21,13 @@ ) -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_session_factory(engine): """Session factory for API integration tests, session-scoped.""" return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_app(api_session_factory): """Create a test FastAPI app with DB session overridden to use the test DB.""" application = create_app() @@ -40,7 +40,7 @@ async def override_get_db_session() -> AsyncGenerator[AsyncSession, None]: return application -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def api_client(api_app): """Create an async HTTP client for testing.""" transport = ASGITransport(app=api_app) diff --git a/services/api/tests/integration/test_permissions.py b/services/api/tests/integration/test_permissions.py index f3ded9bf..b19e15a9 100644 --- a/services/api/tests/integration/test_permissions.py +++ b/services/api/tests/integration/test_permissions.py @@ -29,7 +29,7 @@ def _make_stub_user(user_id: uuid.UUID, *, is_superuser: bool) -> User: return user -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def perm_session_factory(engine): return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) diff --git a/services/api/tests/integration/test_waitlist_invites.py b/services/api/tests/integration/test_waitlist_invites.py index 47b74f21..d31eb883 100644 --- a/services/api/tests/integration/test_waitlist_invites.py +++ b/services/api/tests/integration/test_waitlist_invites.py @@ -18,12 +18,12 @@ STUB_USER_ID = uuid.UUID("00000000-0000-0000-0000-000000000001") -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_session_factory(engine): return async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) -@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True) +@pytest_asyncio.fixture(scope="module", loop_scope="session", autouse=True) async def _ensure_stub_user(wi_session_factory): """Ensure the SKIP_AUTH stub user exists in the DB for FK constraints.""" async with wi_session_factory() as session: @@ -48,7 +48,7 @@ async def _ensure_stub_user(wi_session_factory): await session.commit() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_app(wi_session_factory, _ensure_stub_user): application = create_app() @@ -60,7 +60,7 @@ async def override_get_db_session() -> AsyncGenerator[AsyncSession, None]: return application -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def wi_client(wi_app): transport = ASGITransport(app=wi_app) async with AsyncClient(transport=transport, base_url="http://test") as c: diff --git a/services/worker-sync/tests/integration/conftest.py b/services/worker-sync/tests/integration/conftest.py index c1a47ae4..0f4656c6 100644 --- a/services/worker-sync/tests/integration/conftest.py +++ b/services/worker-sync/tests/integration/conftest.py @@ -1,18 +1,17 @@ """Shared fixtures for worker-sync integration tests. -Mirrors the dual-DB schema-per-worker pattern from libs/kt-db/tests/integration/conftest.py -so we can stand up a real SyncEngine against isolated graph-db + write-db -schemas. +Per-module graph-db + write-db schemas so SyncEngine runs are isolated +across files and across concurrent CI runs. Each test still gets a +fresh session rolled back at teardown. """ from __future__ import annotations -import uuid from collections.abc import AsyncGenerator import pytest import pytest_asyncio -from sqlalchemy import event, text +from sqlalchemy import event from sqlalchemy.ext.asyncio import ( AsyncEngine, AsyncSession, @@ -22,38 +21,36 @@ from kt_config.settings import get_settings from kt_db.models import Base +from kt_db.testing import ( + create_schema, + drop_schema, + install_graph_extensions, + install_write_extensions, + unique_schema, +) from kt_db.write_models import WriteBase -def _worker_schema() -> str: - return f"test_{uuid.uuid4().hex[:8]}" - - @pytest.fixture(scope="session") def settings(): return get_settings() -@pytest.fixture(scope="session") -def graph_schema_name(): - return _worker_schema() +@pytest.fixture(scope="module") +def graph_schema_name() -> str: + return unique_schema() -@pytest.fixture(scope="session") -def write_schema_name(): - return _worker_schema() +@pytest.fixture(scope="module") +def write_schema_name() -> str: + return unique_schema() -@pytest_asyncio.fixture(scope="session", loop_scope="session") +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngine, None]: base_url = settings.database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {graph_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() + await install_graph_extensions(base_url) + await create_schema(base_url, graph_schema_name) eng = create_async_engine( base_url, @@ -63,62 +60,60 @@ async def graph_engine(settings, graph_schema_name) -> AsyncGenerator[AsyncEngin async with eng.begin() as conn: for table in Base.metadata.sorted_tables: table.schema = graph_schema_name - await conn.run_sync(Base.metadata.create_all) - for table in Base.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {graph_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest_asyncio.fixture(scope="session", loop_scope="session") + try: + await conn.run_sync(Base.metadata.create_all) + finally: + for table in Base.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, graph_schema_name) + + +@pytest_asyncio.fixture(scope="module", loop_scope="session") async def write_engine(settings, write_schema_name) -> AsyncGenerator[AsyncEngine, None]: base_url = settings.write_database_url - setup_eng = create_async_engine(base_url, echo=False) - async with setup_eng.begin() as conn: - await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {write_schema_name}")) - await conn.execute(text("SELECT pg_advisory_xact_lock(hashtext('create_extensions'))")) - await conn.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm")) - await setup_eng.dispose() + await install_write_extensions(base_url) + await create_schema(base_url, write_schema_name) eng = create_async_engine(base_url, echo=False) @event.listens_for(eng.sync_engine, "connect") def _set_search_path(dbapi_conn, connection_record): # type: ignore[no-untyped-def] cursor = dbapi_conn.cursor() - cursor.execute(f"SET search_path TO {write_schema_name}, public") + cursor.execute(f'SET search_path TO "{write_schema_name}", public') cursor.close() async with eng.begin() as conn: for table in WriteBase.metadata.sorted_tables: table.schema = write_schema_name - await conn.run_sync(WriteBase.metadata.create_all) - for table in WriteBase.metadata.sorted_tables: - table.schema = None - yield eng - await eng.dispose() - cleanup_eng = create_async_engine(base_url, echo=False) - async with cleanup_eng.begin() as conn: - await conn.execute(text(f"DROP SCHEMA IF EXISTS {write_schema_name} CASCADE")) - await cleanup_eng.dispose() - - -@pytest.fixture(scope="session") + try: + await conn.run_sync(WriteBase.metadata.create_all) + finally: + for table in WriteBase.metadata.sorted_tables: + table.schema = None + try: + yield eng + finally: + await eng.dispose() + await drop_schema(base_url, write_schema_name) + + +@pytest.fixture(scope="module") def graph_session_factory(graph_engine) -> async_sessionmaker[AsyncSession]: return async_sessionmaker(graph_engine, class_=AsyncSession, expire_on_commit=False) -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def write_session_factory(write_engine) -> async_sessionmaker[AsyncSession]: return async_sessionmaker(write_engine, class_=AsyncSession, expire_on_commit=False) @pytest.fixture def sync_engine(write_session_factory, graph_session_factory): - """Real SyncEngine wired to per-worker isolated schemas.""" + """Real SyncEngine wired to per-module isolated schemas.""" from kt_worker_sync.sync_engine import SyncEngine return SyncEngine(