diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6b01995 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,102 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +env: + PYTHON_VERSION: "3.13" + +jobs: + lint: + name: Lint & Format + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install just + uses: extractions/setup-just@v2 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python + run: uv python install ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: uv sync --frozen + + - name: Check formatting + run: uv run ruff format --check . + + - name: Check linting + run: uv run ruff check . + + typecheck: + name: Type Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install just + uses: extractions/setup-just@v2 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python + run: uv python install ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: uv sync --frozen + + - name: Run type checker + run: uv run ty check osa + + test: + name: Test + runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write + steps: + - uses: actions/checkout@v4 + + - name: Install just + uses: extractions/setup-just@v2 + + - name: Install uv + uses: astral-sh/setup-uv@v4 + with: + enable-cache: true + + - name: Set up Python + run: uv python install ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: uv sync --frozen + + - name: Run unit tests with coverage + run: uv run pytest tests/unit -v --tb=short --cov=osa --cov-report=xml --cov-report=term-missing + env: + TEST: "1" + + - name: Code Coverage Summary + uses: irongut/CodeCoverageSummary@v1.3.0 + with: + filename: coverage.xml + badge: true + format: markdown + output: both + + - name: Add coverage PR comment + uses: marocchino/sticky-pull-request-comment@v2 + if: github.event_name == 'pull_request' + with: + path: code-coverage-results.md diff --git a/.gitignore b/.gitignore index 64d49ae..42f20c5 100644 --- a/.gitignore +++ b/.gitignore @@ -195,9 +195,9 @@ cython_debug/ .abstra/ # Visual Studio Code -# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore # that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore -# and can be added to the global gitignore or merged into this file. However, if you prefer, +# and can be added to the global gitignore or merged into this file. However, if you prefer, # you could uncomment the following to ignore the entire vscode folder # .vscode/ @@ -213,4 +213,4 @@ marimo/_lsp/ __marimo__/ # Streamlit -.streamlit/secrets.toml \ No newline at end of file +.streamlit/secrets.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..3a0da0e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,37 @@ +repos: + # Basic file hygiene + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: check-yaml + - id: check-json + - id: check-toml + - id: check-merge-conflict + - id: end-of-file-fixer + - id: trailing-whitespace + + # Ruff - linting and formatting + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.14.10 + hooks: + - id: ruff-check + args: [--fix] + - id: ruff-format + + # Local hooks for ty and tests (no official pre-commit hooks yet) + - repo: local + hooks: + - id: ty-check + name: ty type check + entry: uv run ty check osa + language: system + types: [python] + pass_filenames: false + + - id: unit-tests + name: unit tests + entry: uv run pytest tests/unit -q --tb=short + language: system + types: [python] + pass_filenames: false + stages: [pre-commit] diff --git a/Justfile b/Justfile index eb9155e..1130ac0 100644 --- a/Justfile +++ b/Justfile @@ -75,6 +75,9 @@ up: db-up down: db-down # Testing commands +test kind="unit": + @TEST=1 uv run pytest "tests/{{kind}}" -v --tb=short + test-s kind="unit": @TEST=1 uv run pytest -s -o log_cli=True -o log_cli_level=DEBUG "tests/{{kind}}" @@ -96,7 +99,7 @@ fix thing="osa": lint thing="osa": uv run ruff check {{thing}} - uv run pyright {{thing}} + uv run ty check {{thing}} # Docker commands (standalone) docker-build: diff --git a/LICENSE b/LICENSE index 583f399..ec972a4 100644 --- a/LICENSE +++ b/LICENSE @@ -198,4 +198,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/ingestors/geo_entrez/ingestor.py b/ingestors/geo_entrez/ingestor.py index e9dba31..6f5b8c0 100644 --- a/ingestors/geo_entrez/ingestor.py +++ b/ingestors/geo_entrez/ingestor.py @@ -120,9 +120,7 @@ async def _search_uids(self, query: str, limit: int | None) -> list[str]: resp.raise_for_status() tree = ElementTree.fromstring(resp.text) - return [ - id_elem.text for id_elem in tree.findall(".//Id") if id_elem.text - ] + return [id_elem.text for id_elem in tree.findall(".//Id") if id_elem.text] async def _fetch_batch(self, uids: list[str]) -> list[UpstreamRecord]: """Fetch metadata for a batch of UIDs via ESummary.""" diff --git a/migrations/README b/migrations/README index 98e4f9c..2500aa1 100644 --- a/migrations/README +++ b/migrations/README @@ -1 +1 @@ -Generic single-database configuration. \ No newline at end of file +Generic single-database configuration. diff --git a/migrations/env.py b/migrations/env.py index 1434187..9b9fc57 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -15,7 +15,8 @@ # add your model's MetaData object here # for 'autogenerate' support -from osa.infrastructure.persistence.tables import metadata +from osa.infrastructure.persistence.tables import metadata # noqa: E402 + target_metadata = metadata # other values from the config, defined by the needs of env.py, diff --git a/osa/cli/commands/search.py b/osa/cli/commands/search.py index 831770e..fc962a1 100644 --- a/osa/cli/commands/search.py +++ b/osa/cli/commands/search.py @@ -21,9 +21,7 @@ def extract_short_id(srn: str) -> str: SRN format: urn:osa:{domain}:{type}:{uuid}[@{version}] """ - match = re.search( - r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", srn - ) + match = re.search(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", srn) if match: return match.group(0)[:6] return srn[:6] diff --git a/osa/cli/commands/server.py b/osa/cli/commands/server.py index 2187458..b9ad445 100644 --- a/osa/cli/commands/server.py +++ b/osa/cli/commands/server.py @@ -41,6 +41,7 @@ def _resolve_config(paths: OSAPaths, config: Path | None = None) -> Path: ) sys.exit(1) + assert config is not None # Guaranteed by logic above if not config.exists(): console.error(f"Config file not found: {config}") sys.exit(1) diff --git a/osa/cli/commands/show.py b/osa/cli/commands/show.py index 016ada7..a956824 100644 --- a/osa/cli/commands/show.py +++ b/osa/cli/commands/show.py @@ -25,9 +25,10 @@ def show(ref: str, /) -> None: if cache is None: console.error( "No search results cached", - hint="Run a search first: osa search vector \"your query\"", + hint='Run a search first: osa search vector "your query"', ) sys.exit(1) + return # Unreachable, but helps type checker # Try to interpret ref as a number first result: SearchHit | None = None @@ -51,9 +52,7 @@ def show(ref: str, /) -> None: console.error(f"Ambiguous short ID '{ref}'") console.print("[dim]Matches:[/dim]") for m in matches: - console.print( - f" [cyan]{m.short_id}[/cyan] {m.metadata.title[:50]}" - ) + console.print(f" [cyan]{m.short_id}[/cyan] {m.metadata.title[:50]}") sys.exit(1) else: console.error( diff --git a/osa/cli/util/daemon.py b/osa/cli/util/daemon.py index 7ae175b..42509e9 100644 --- a/osa/cli/util/daemon.py +++ b/osa/cli/util/daemon.py @@ -172,9 +172,7 @@ def start( if process.poll() is not None: # Process exited immediately - error self._paths.remove_server_state() - raise RuntimeError( - f"Server failed to start. Check logs at {self._paths.server_log}" - ) + raise RuntimeError(f"Server failed to start. Check logs at {self._paths.server_log}") return ServerInfo( status=ServerStatus.RUNNING, diff --git a/osa/config.py b/osa/config.py index 82117a5..96b3375 100644 --- a/osa/config.py +++ b/osa/config.py @@ -77,9 +77,7 @@ def name(self) -> str: class YamlConfigSettingsSource(PydanticBaseSettingsSource): """Load settings from YAML file specified by OSA_CONFIG_FILE env var.""" - def get_field_value( - self, field: Any, field_name: str - ) -> tuple[Any, str, bool]: + def get_field_value(self, field: Any, field_name: str) -> tuple[Any, str, bool]: """Get the value for a field from the YAML config.""" yaml_data = self._load_yaml_config() field_value = yaml_data.get(field_name) diff --git a/osa/domain/curation/listener/auto_approve_curation_tool.py b/osa/domain/curation/listener/auto_approve_curation_tool.py index 9dc4703..1dc551e 100644 --- a/osa/domain/curation/listener/auto_approve_curation_tool.py +++ b/osa/domain/curation/listener/auto_approve_curation_tool.py @@ -21,17 +21,13 @@ async def handle(self, event: ValidationCompleted) -> None: """Emit DepositionApproved if validation passed and no curation required.""" # Only auto-approve if validation passed if event.status != RunStatus.COMPLETED: - logger.warning( - f"Validation failed for {event.deposition_srn}, skipping auto-approve" - ) + logger.warning(f"Validation failed for {event.deposition_srn}, skipping auto-approve") return # TODO: Load curation config to check if manual curation is required curation_required = False # False for v1 if curation_required: - logger.info( - f"Curation required for {event.deposition_srn}, not auto-approving" - ) + logger.info(f"Curation required for {event.deposition_srn}, not auto-approving") return logger.debug(f"Auto-approving deposition: {event.deposition_srn}") diff --git a/osa/domain/deposition/command/delete_files.py b/osa/domain/deposition/command/delete_files.py index c0f97d1..675c90f 100644 --- a/osa/domain/deposition/command/delete_files.py +++ b/osa/domain/deposition/command/delete_files.py @@ -13,9 +13,7 @@ class DepositionFilesDeleted(Result): pass -class DeleteDepositionFilesHandler( - CommandHandler[DeleteDepositionFiles, DepositionFilesDeleted] -): +class DeleteDepositionFilesHandler(CommandHandler[DeleteDepositionFiles, DepositionFilesDeleted]): repository: DepositionRepository storage: StoragePort diff --git a/osa/domain/index/listener/__init__.py b/osa/domain/index/listener/__init__.py index 08c3d79..bac627e 100644 --- a/osa/domain/index/listener/__init__.py +++ b/osa/domain/index/listener/__init__.py @@ -2,4 +2,4 @@ from osa.domain.index.listener.index_projector import ProjectNewRecordToIndexes -__all__ = ["IndexProjector"] +__all__ = ["ProjectNewRecordToIndexes"] diff --git a/osa/domain/index/listener/index_projector.py b/osa/domain/index/listener/index_projector.py index 46dd4b3..3cd495b 100644 --- a/osa/domain/index/listener/index_projector.py +++ b/osa/domain/index/listener/index_projector.py @@ -1,27 +1,21 @@ """IndexProjector - indexes published records into storage backends.""" -import logging - -from osa.domain.index.model.registry import IndexRegistry +from osa.domain.index.service import IndexService from osa.domain.record.event.record_published import RecordPublished from osa.domain.shared.event import EventListener -logger = logging.getLogger(__name__) - class ProjectNewRecordToIndexes(EventListener[RecordPublished]): - """Projects published records into index backends.""" + """Projects published records into index backends. - indexes: IndexRegistry + This listener delegates to IndexService for all business logic. + """ - async def handle(self, event: RecordPublished) -> None: - """Index record into all configured backends.""" - srn_str = str(event.record_srn) + service: IndexService - # Index into all configured backends - for name, backend in self.indexes.items(): - try: - await backend.ingest(srn_str, event.metadata) - logger.debug(f"Indexed {srn_str} into backend '{name}'") - except Exception as e: - logger.error(f"Failed to index {srn_str} into '{name}': {e}") + async def handle(self, event: RecordPublished) -> None: + """Delegate to IndexService to index the record.""" + await self.service.index_record( + record_srn=event.record_srn, + metadata=event.metadata, + ) diff --git a/osa/domain/index/service/__init__.py b/osa/domain/index/service/__init__.py new file mode 100644 index 0000000..852bec9 --- /dev/null +++ b/osa/domain/index/service/__init__.py @@ -0,0 +1,5 @@ +"""Index service module.""" + +from osa.domain.index.service.index import IndexService + +__all__ = ["IndexService"] diff --git a/osa/domain/index/service/index.py b/osa/domain/index/service/index.py new file mode 100644 index 0000000..3e5efdf --- /dev/null +++ b/osa/domain/index/service/index.py @@ -0,0 +1,45 @@ +"""IndexService - orchestrates indexing of records into storage backends.""" + +import logging +from typing import Any + +from osa.domain.index.model.registry import IndexRegistry +from osa.domain.shared.model.srn import RecordSRN +from osa.domain.shared.service import Service + +logger = logging.getLogger(__name__) + + +class IndexService(Service): + """Projects records into configured index backends. + + This service encapsulates the business logic for indexing that was previously + embedded in the ProjectNewRecordToIndexes listener. It can be called from + multiple entry points (event listeners, CLI commands, bulk operations). + """ + + indexes: IndexRegistry + + async def index_record( + self, + record_srn: RecordSRN, + metadata: dict[str, Any], + ) -> None: + """Index a record into all configured backends. + + Args: + record_srn: SRN of the record to index. + metadata: The record metadata to index. + + Note: + This method logs errors but does not raise exceptions for individual + backend failures, allowing indexing to continue for other backends. + """ + srn_str = str(record_srn) + + for name, backend in self.indexes.items(): + try: + await backend.ingest(srn_str, metadata) + logger.debug(f"Indexed {srn_str} into backend '{name}'") + except Exception as e: + logger.error(f"Failed to index {srn_str} into '{name}': {e}") diff --git a/osa/domain/ingest/listener/__init__.py b/osa/domain/ingest/listener/__init__.py index c448040..f0163c8 100644 --- a/osa/domain/ingest/listener/__init__.py +++ b/osa/domain/ingest/listener/__init__.py @@ -3,4 +3,4 @@ from osa.domain.ingest.listener.ingest_listener import IngestFromUpstream from osa.domain.ingest.listener.initial_ingest_listener import TriggerInitialIngestion -__all__ = ["IngestListener", "InitialIngestListener"] +__all__ = ["IngestFromUpstream", "TriggerInitialIngestion"] diff --git a/osa/domain/ingest/listener/ingest_listener.py b/osa/domain/ingest/listener/ingest_listener.py index 95ffe4f..2607f6b 100644 --- a/osa/domain/ingest/listener/ingest_listener.py +++ b/osa/domain/ingest/listener/ingest_listener.py @@ -1,78 +1,22 @@ """IngestListener - handles IngestRequested events.""" -import logging -from datetime import UTC, datetime -from uuid import uuid4 - -from osa.config import Config -from osa.domain.deposition.event.submitted import DepositionSubmittedEvent from osa.domain.ingest.event.ingest_requested import IngestRequested -from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted -from osa.domain.ingest.model.registry import IngestorRegistry -from osa.domain.shared.event import EventId, EventListener -from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId -from osa.domain.shared.outbox import Outbox - -logger = logging.getLogger(__name__) +from osa.domain.ingest.service import IngestService +from osa.domain.shared.event import EventListener class IngestFromUpstream(EventListener[IngestRequested]): - """Pulls from upstream source and creates depositions.""" - - ingestors: IngestorRegistry - outbox: Outbox - config: Config + """Pulls from upstream source and creates depositions. - async def handle(self, event: IngestRequested) -> None: - """Pull records from ingestor and emit DepositionSubmitted for each.""" - ingestor = self.ingestors.get(event.ingestor_name) - if not ingestor: - logger.error(f"Unknown ingestor: {event.ingestor_name}") - return - - started_at = datetime.now(UTC) - logger.info( - f"Starting ingest from {event.ingestor_name}, " - f"since={event.since}, limit={event.limit}" - ) + This listener delegates to IngestService for all business logic. + """ - domain = Domain(self.config.server.domain) - count = 0 + service: IngestService - async for record in ingestor.pull(since=event.since, limit=event.limit): - # Create a deposition SRN for this record - dep_srn = DepositionSRN( - domain=domain, - id=LocalId(str(uuid4())), - ) - - # Emit DepositionSubmitted with the record metadata - submitted_event = DepositionSubmittedEvent( - id=EventId(uuid4()), - deposition_id=dep_srn, - metadata=record.metadata, - ) - await self.outbox.append(submitted_event) - count += 1 - - # Log record metadata - title = record.metadata.get("title", "")[:60] - logger.info(f" [{record.source_id}] {title}...") - - completed_at = datetime.now(UTC) - logger.info(f"Ingest completed: {count} records from {event.ingestor_name}") - - # Emit completion event for tracking - await self.outbox.append( - IngestionRunCompleted( - id=EventId(uuid4()), - ingestor_name=event.ingestor_name, - source_type=ingestor.name, - started_at=started_at, - completed_at=completed_at, - record_count=count, - since=event.since, - limit=event.limit, - ) + async def handle(self, event: IngestRequested) -> None: + """Delegate to IngestService to pull records and emit deposition events.""" + await self.service.run_ingest( + ingestor_name=event.ingestor_name, + since=event.since, + limit=event.limit, ) - # Session commit handled by BackgroundWorker diff --git a/osa/domain/ingest/service/__init__.py b/osa/domain/ingest/service/__init__.py new file mode 100644 index 0000000..cd67af0 --- /dev/null +++ b/osa/domain/ingest/service/__init__.py @@ -0,0 +1,5 @@ +"""Ingest service module.""" + +from osa.domain.ingest.service.ingest import IngestResult, IngestService + +__all__ = ["IngestService", "IngestResult"] diff --git a/osa/domain/ingest/service/ingest.py b/osa/domain/ingest/service/ingest.py new file mode 100644 index 0000000..36c8197 --- /dev/null +++ b/osa/domain/ingest/service/ingest.py @@ -0,0 +1,110 @@ +"""IngestService - orchestrates ingestion from upstream sources.""" + +import logging +from dataclasses import dataclass +from datetime import UTC, datetime +from uuid import uuid4 + +from osa.domain.deposition.event.submitted import DepositionSubmittedEvent +from osa.domain.ingest.event.ingestion_run_completed import IngestionRunCompleted +from osa.domain.ingest.model.registry import IngestorRegistry +from osa.domain.shared.event import EventId +from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId +from osa.domain.shared.outbox import Outbox +from osa.domain.shared.service import Service + +logger = logging.getLogger(__name__) + + +@dataclass +class IngestResult: + """Result of an ingestion run.""" + + ingestor_name: str + record_count: int + started_at: datetime + completed_at: datetime + + +class IngestService(Service): + """Orchestrates pulling records from upstream ingestors and emitting deposition events. + + This service encapsulates the business logic for ingestion that was previously + embedded in the IngestFromUpstream listener. It can be called from multiple + entry points (event listeners, CLI commands, scheduled jobs). + """ + + ingestors: IngestorRegistry + outbox: Outbox + node_domain: Domain + + async def run_ingest( + self, + ingestor_name: str, + since: datetime | None = None, + limit: int | None = None, + ) -> IngestResult: + """Pull records from an ingestor and emit DepositionSubmitted events. + + Args: + ingestor_name: Name of the ingestor to use. + since: Only fetch records updated after this time. + limit: Maximum number of records to fetch. + + Returns: + IngestResult with ingestion statistics. + + Raises: + ValueError: If the ingestor is not found. + """ + ingestor = self.ingestors.get(ingestor_name) + if not ingestor: + raise ValueError(f"Unknown ingestor: {ingestor_name}") + + started_at = datetime.now(UTC) + logger.info(f"Starting ingest from {ingestor_name}, since={since}, limit={limit}") + + count = 0 + async for record in ingestor.pull(since=since, limit=limit): + # Create a deposition SRN for this record + dep_srn = DepositionSRN( + domain=self.node_domain, + id=LocalId(str(uuid4())), + ) + + # Emit DepositionSubmitted with the record metadata + submitted_event = DepositionSubmittedEvent( + id=EventId(uuid4()), + deposition_id=dep_srn, + metadata=record.metadata, + ) + await self.outbox.append(submitted_event) + count += 1 + + # Log record metadata + title = record.metadata.get("title", "")[:60] + logger.info(f" [{record.source_id}] {title}...") + + completed_at = datetime.now(UTC) + logger.info(f"Ingest completed: {count} records from {ingestor_name}") + + # Emit completion event for tracking + await self.outbox.append( + IngestionRunCompleted( + id=EventId(uuid4()), + ingestor_name=ingestor_name, + source_type=ingestor.name, + started_at=started_at, + completed_at=completed_at, + record_count=count, + since=since, + limit=limit, + ) + ) + + return IngestResult( + ingestor_name=ingestor_name, + record_count=count, + started_at=started_at, + completed_at=completed_at, + ) diff --git a/osa/domain/record/listener/__init__.py b/osa/domain/record/listener/__init__.py index 4890e35..fb025b3 100644 --- a/osa/domain/record/listener/__init__.py +++ b/osa/domain/record/listener/__init__.py @@ -1,5 +1,7 @@ """Record domain listeners.""" -from osa.domain.record.listener.record_creation_listener import ConvertDepositionToRecord +from osa.domain.record.listener.record_creation_listener import ( + ConvertDepositionToRecord, +) -__all__ = ["RecordCreationListener"] +__all__ = ["ConvertDepositionToRecord"] diff --git a/osa/domain/record/listener/record_creation_listener.py b/osa/domain/record/listener/record_creation_listener.py index 20a97af..8d4a283 100644 --- a/osa/domain/record/listener/record_creation_listener.py +++ b/osa/domain/record/listener/record_creation_listener.py @@ -1,61 +1,21 @@ """RecordCreationListener - creates records when depositions are approved.""" -import logging -from datetime import UTC, datetime -from uuid import uuid4 - -from osa.config import Config from osa.domain.curation.event.deposition_approved import DepositionApproved -from osa.domain.record.event.record_published import RecordPublished -from osa.domain.record.model.aggregate import Record -from osa.domain.record.port.repository import RecordRepository -from osa.domain.shared.event import EventId, EventListener -from osa.domain.shared.model.srn import Domain, LocalId, RecordSRN, RecordVersion -from osa.domain.shared.outbox import Outbox - -logger = logging.getLogger(__name__) +from osa.domain.record.service import RecordService +from osa.domain.shared.event import EventListener class ConvertDepositionToRecord(EventListener[DepositionApproved]): - """Creates and persists records when depositions are approved.""" + """Creates and persists records when depositions are approved. - record_repo: RecordRepository - outbox: Outbox - config: Config - - async def handle(self, event: DepositionApproved) -> None: - """Create a Record from an approved deposition and emit RecordPublished.""" - logger.debug(f"Creating record for approved deposition: {event.deposition_srn}") + This listener delegates to RecordService for all business logic. + """ - domain = Domain(self.config.server.domain) + service: RecordService - # Create record SRN (version 1 for new records) - record_srn = RecordSRN( - domain=domain, - id=LocalId(str(uuid4())), - version=RecordVersion(1), - ) - - # Create the Record aggregate - record = Record( - srn=record_srn, - deposition_srn=event.deposition_srn, - metadata=event.metadata, - published_at=datetime.now(UTC), - ) - - # Persist the record - await self.record_repo.save(record) - logger.debug(f"Record persisted: {record_srn}") - - # Emit RecordPublished for downstream consumers (indexing, etc.) - published = RecordPublished( - id=EventId(uuid4()), - record_srn=record_srn, + async def handle(self, event: DepositionApproved) -> None: + """Delegate to RecordService to create and publish the record.""" + await self.service.publish_record( deposition_srn=event.deposition_srn, metadata=event.metadata, ) - await self.outbox.append(published) - # Session commit handled by BackgroundWorker - - logger.debug(f"RecordPublished event emitted: {record_srn}") diff --git a/osa/domain/record/service/__init__.py b/osa/domain/record/service/__init__.py index e69de29..c503f8d 100644 --- a/osa/domain/record/service/__init__.py +++ b/osa/domain/record/service/__init__.py @@ -0,0 +1,5 @@ +"""Record service module.""" + +from osa.domain.record.service.record import RecordService + +__all__ = ["RecordService"] diff --git a/osa/domain/record/service/record.py b/osa/domain/record/service/record.py new file mode 100644 index 0000000..3f29cd5 --- /dev/null +++ b/osa/domain/record/service/record.py @@ -0,0 +1,83 @@ +"""RecordService - orchestrates record creation from approved depositions.""" + +import logging +from datetime import UTC, datetime +from typing import Any +from uuid import uuid4 + +from osa.domain.record.event.record_published import RecordPublished +from osa.domain.record.model.aggregate import Record +from osa.domain.record.port.repository import RecordRepository +from osa.domain.shared.event import EventId +from osa.domain.shared.model.srn import ( + DepositionSRN, + Domain, + LocalId, + RecordSRN, + RecordVersion, +) +from osa.domain.shared.outbox import Outbox +from osa.domain.shared.service import Service + +logger = logging.getLogger(__name__) + + +class RecordService(Service): + """Creates and persists Record aggregates from approved depositions. + + This service encapsulates the business logic for record creation that was + previously embedded in the ConvertDepositionToRecord listener. It can be + called from multiple entry points (event listeners, CLI commands, APIs). + """ + + record_repo: RecordRepository + outbox: Outbox + node_domain: Domain + + async def publish_record( + self, + deposition_srn: DepositionSRN, + metadata: dict[str, Any], + ) -> Record: + """Create and persist a Record from an approved deposition. + + Args: + deposition_srn: SRN of the approved deposition. + metadata: The record metadata. + + Returns: + The created Record aggregate. + """ + logger.debug(f"Creating record for approved deposition: {deposition_srn}") + + # Create record SRN (version 1 for new records) + record_srn = RecordSRN( + domain=self.node_domain, + id=LocalId(str(uuid4())), + version=RecordVersion(1), + ) + + # Create the Record aggregate + record = Record( + srn=record_srn, + deposition_srn=deposition_srn, + metadata=metadata, + published_at=datetime.now(UTC), + ) + + # Persist the record + await self.record_repo.save(record) + logger.debug(f"Record persisted: {record_srn}") + + # Emit RecordPublished for downstream consumers (indexing, etc.) + published = RecordPublished( + id=EventId(uuid4()), + record_srn=record_srn, + deposition_srn=deposition_srn, + metadata=metadata, + ) + await self.outbox.append(published) + + logger.debug(f"RecordPublished event emitted: {record_srn}") + + return record diff --git a/osa/domain/shared/event.py b/osa/domain/shared/event.py index f02f6f6..e3bd183 100644 --- a/osa/domain/shared/event.py +++ b/osa/domain/shared/event.py @@ -3,7 +3,16 @@ from abc import ABC, ABCMeta, abstractmethod from dataclasses import dataclass from datetime import UTC, datetime -from typing import Any, ClassVar, Generic, NewType, TypeVar, dataclass_transform, get_args, get_origin +from typing import ( + Any, + ClassVar, + Generic, + NewType, + TypeVar, + dataclass_transform, + get_args, + get_origin, +) from uuid import UUID from pydantic import Field diff --git a/osa/domain/shared/model/srn.py b/osa/domain/shared/model/srn.py index 1020c64..bccd59d 100644 --- a/osa/domain/shared/model/srn.py +++ b/osa/domain/shared/model/srn.py @@ -24,7 +24,9 @@ class Domain(RootModel[str]): Examples: osap.org, archive.university.edu, localhost """ - _re: ClassVar[re.Pattern] = re.compile(r"^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$") + _re: ClassVar[re.Pattern] = re.compile( + r"^[a-z0-9]([a-z0-9\-]*[a-z0-9])?(\.[a-z0-9]([a-z0-9\-]*[a-z0-9])?)*$" + ) @field_validator("root") @classmethod @@ -173,9 +175,7 @@ def _extract_parts(srn: str) -> tuple[str, str, str, Version | None]: raise ValueError("not an OSA SRN") parts = srn.split(":") if len(parts) != 5: - raise ValueError( - "malformed SRN (expected urn:osa:{domain}:{type}:{id}[...])" - ) + raise ValueError("malformed SRN (expected urn:osa:{domain}:{type}:{id}[...])") _, _, domain, typ, rest = ( parts[0], parts[1], diff --git a/osa/domain/shared/outbox.py b/osa/domain/shared/outbox.py index 476505a..c63189b 100644 --- a/osa/domain/shared/outbox.py +++ b/osa/domain/shared/outbox.py @@ -1,6 +1,5 @@ """Outbox - domain service for reliable event delivery.""" -from datetime import UTC, datetime from typing import TypeVar from osa.domain.shared.event import Event, EventId diff --git a/osa/domain/validation/listener/__init__.py b/osa/domain/validation/listener/__init__.py index 61f11e9..c2aeffb 100644 --- a/osa/domain/validation/listener/__init__.py +++ b/osa/domain/validation/listener/__init__.py @@ -2,4 +2,4 @@ from osa.domain.validation.listener.validation_listener import ValidateNewDeposition -__all__ = ["ValidationListener"] +__all__ = ["ValidateNewDeposition"] diff --git a/osa/infrastructure/event/di.py b/osa/infrastructure/event/di.py index 90f7105..dbe9298 100644 --- a/osa/infrastructure/event/di.py +++ b/osa/infrastructure/event/di.py @@ -27,14 +27,16 @@ # All event listeners - single source of truth -LISTENER_TYPES: Subscriptions = Subscriptions([ - TriggerInitialIngestion, - IngestFromUpstream, - ValidateNewDeposition, - AutoApproveCurationTool, - ConvertDepositionToRecord, - ProjectNewRecordToIndexes, -]) +LISTENER_TYPES: Subscriptions = Subscriptions( + [ + TriggerInitialIngestion, + IngestFromUpstream, + ValidateNewDeposition, + AutoApproveCurationTool, + ConvertDepositionToRecord, + ProjectNewRecordToIndexes, + ] +) class EventProvider(Provider): diff --git a/osa/infrastructure/index/di.py b/osa/infrastructure/index/di.py index 4f34bb2..b0b02ee 100644 --- a/osa/infrastructure/index/di.py +++ b/osa/infrastructure/index/di.py @@ -1,12 +1,13 @@ """Dependency injection provider for index backends.""" -from osa.util.di.scope import Scope from dishka import Provider, provide from osa.config import Config from osa.domain.index.model.registry import IndexRegistry +from osa.domain.index.service import IndexService from osa.infrastructure.index.vector.backend import VectorStorageBackend from osa.sdk.index.backend import StorageBackend +from osa.util.di.scope import Scope class IndexProvider(Provider): @@ -23,9 +24,7 @@ def get_backends(self, config: Config) -> IndexRegistry: for idx_config in config.indexes: if idx_config.backend == "vector": - backends[idx_config.name] = VectorStorageBackend( - idx_config.name, idx_config.config - ) + backends[idx_config.name] = VectorStorageBackend(idx_config.name, idx_config.config) # Add more backend types here as they're implemented # elif idx_config.backend == "keyword": # backends[idx_config.name] = KeywordStorageBackend( @@ -33,3 +32,12 @@ def get_backends(self, config: Config) -> IndexRegistry: # ) return IndexRegistry(backends) + + @provide(scope=Scope.UOW) + def get_index_service(self, indexes: IndexRegistry) -> IndexService: + """Provide IndexService for UOW scope. + + IndexService is UOW-scoped for consistency with other services, + though it doesn't require Outbox (no events emitted). + """ + return IndexService(indexes=indexes) diff --git a/osa/infrastructure/index/vector/backend.py b/osa/infrastructure/index/vector/backend.py index 6836881..46c95ed 100644 --- a/osa/infrastructure/index/vector/backend.py +++ b/osa/infrastructure/index/vector/backend.py @@ -40,21 +40,17 @@ async def ingest(self, srn: str, record: dict[str, Any]) -> None: """Store a record in the index.""" text = self._to_text(record) - # Run CPU-bound embedding in thread pool - embedding = await asyncio.to_thread(self._model.encode, text) + # Run CPU-bound embedding in thread pool and convert to list + embedding = await asyncio.to_thread(lambda: self._model.encode(text).tolist()) # Filter metadata to ChromaDB-compatible types - safe_meta = { - k: v - for k, v in record.items() - if isinstance(v, (str, int, float, bool)) - } + safe_meta = {k: v for k, v in record.items() if isinstance(v, (str, int, float, bool))} # Run ChromaDB I/O in thread pool await asyncio.to_thread( self._collection.upsert, ids=[srn], - embeddings=[embedding.tolist()], + embeddings=[embedding], metadatas=[safe_meta], documents=[text], ) @@ -65,13 +61,13 @@ async def delete(self, srn: str) -> None: async def query(self, q: str, limit: int = 20) -> QueryResult: """Execute a query and return structured results.""" - # Run CPU-bound embedding in thread pool - embedding = await asyncio.to_thread(self._model.encode, q) + # Run CPU-bound embedding in thread pool and convert to list + embedding = await asyncio.to_thread(lambda: self._model.encode(q).tolist()) # Run ChromaDB query in thread pool results = await asyncio.to_thread( self._collection.query, - query_embeddings=[embedding.tolist()], + query_embeddings=[embedding], n_results=limit, include=["metadatas", "distances"], ) @@ -110,6 +106,4 @@ def _to_text(self, record: dict[str, Any]) -> str: """Convert record to embeddable text.""" if self._config.embedding.template: return self._config.embedding.template.format(**record) - return " ".join( - str(record.get(f, "")) for f in self._config.embedding.fields - ) + return " ".join(str(record.get(f, "")) for f in self._config.embedding.fields) diff --git a/osa/infrastructure/ingest/di.py b/osa/infrastructure/ingest/di.py index d71cd7f..30b4de6 100644 --- a/osa/infrastructure/ingest/di.py +++ b/osa/infrastructure/ingest/di.py @@ -4,6 +4,9 @@ from osa.config import Config from osa.domain.ingest.model.registry import IngestorRegistry +from osa.domain.ingest.service import IngestService +from osa.domain.shared.model.srn import Domain +from osa.domain.shared.outbox import Outbox from osa.infrastructure.ingest.discovery import ( discover_ingestors, validate_all_ingestor_configs, @@ -37,3 +40,20 @@ def get_ingestors(self, config: Config) -> IngestorRegistry: ingestors[name] = ingestor_cls(validated_config) return IngestorRegistry(ingestors) + + @provide(scope=Scope.UOW) + def get_ingest_service( + self, + ingestors: IngestorRegistry, + outbox: Outbox, + config: Config, + ) -> IngestService: + """Provide IngestService for UOW scope. + + IngestService is UOW-scoped because it needs fresh Outbox per unit of work. + """ + return IngestService( + ingestors=ingestors, + outbox=outbox, + node_domain=Domain(config.server.domain), + ) diff --git a/osa/infrastructure/ingest/discovery.py b/osa/infrastructure/ingest/discovery.py index 2650b7e..202f64b 100644 --- a/osa/infrastructure/ingest/discovery.py +++ b/osa/infrastructure/ingest/discovery.py @@ -64,9 +64,7 @@ def _validate_ingestor_class(cls: Any, name: str) -> None: if not hasattr(cls, "config_class"): raise TypeError(f"Ingestor {name} missing 'config_class' class attribute") if not issubclass(cls.config_class, BaseModel): - raise TypeError( - f"Ingestor {name} config_class must be a Pydantic BaseModel" - ) + raise TypeError(f"Ingestor {name} config_class must be a Pydantic BaseModel") def validate_ingestor_config( @@ -154,15 +152,12 @@ def validate_all_ingestor_configs( # Check for duplicates if name in validated: raise ValueError( - f"Duplicate ingestor '{name}'. " - "Each ingestor type can only be configured once." + f"Duplicate ingestor '{name}'. Each ingestor type can only be configured once." ) if name not in available_ingestors: available = ", ".join(sorted(available_ingestors.keys())) or "(none)" - raise ValueError( - f"Unknown ingestor type '{name}'. Available: {available}" - ) + raise ValueError(f"Unknown ingestor type '{name}'. Available: {available}") ingestor_cls = available_ingestors[name] diff --git a/osa/infrastructure/oci/runner.py b/osa/infrastructure/oci/runner.py index abffa12..127fb09 100644 --- a/osa/infrastructure/oci/runner.py +++ b/osa/infrastructure/oci/runner.py @@ -96,9 +96,7 @@ async def run( try: return await asyncio.wait_for( - self._run_container( - self._docker, image_ref, osap_in, osap_out, resources - ), + self._run_container(self._docker, image_ref, osap_in, osap_out, resources), timeout=timeout, ) except asyncio.TimeoutError: diff --git a/osa/infrastructure/persistence/adapter/storage.py b/osa/infrastructure/persistence/adapter/storage.py index 1d5530e..db043e5 100644 --- a/osa/infrastructure/persistence/adapter/storage.py +++ b/osa/infrastructure/persistence/adapter/storage.py @@ -28,9 +28,9 @@ def save_file(self, deposition_id: DepositionSRN, filename: str, stream: Any) -> # Helper for UploadFile command target_dir = self._get_dep_path(deposition_id) target_dir.mkdir(parents=True, exist_ok=True) - + target_file = target_dir / filename - + # Assuming stream is file-like open in binary mode # If stream is from httpx/spooledtempfile with open(target_file, "wb") as f: diff --git a/osa/infrastructure/persistence/di.py b/osa/infrastructure/persistence/di.py index 54a2cb6..710c074 100644 --- a/osa/infrastructure/persistence/di.py +++ b/osa/infrastructure/persistence/di.py @@ -6,6 +6,9 @@ from osa.config import Config from osa.domain.deposition.port.repository import DepositionRepository from osa.domain.record.port.repository import RecordRepository +from osa.domain.record.service import RecordService +from osa.domain.shared.model.srn import Domain +from osa.domain.shared.outbox import Outbox from osa.domain.shared.port.event_repository import EventRepository from osa.domain.validation.port.repository import ValidationRunRepository from osa.infrastructure.persistence.database import ( @@ -35,9 +38,7 @@ def get_engine(self, config: Config) -> AsyncEngine: return create_db_engine(config) @provide(scope=Scope.APP) - def get_session_factory( - self, engine: AsyncEngine - ) -> async_sessionmaker[AsyncSession]: + def get_session_factory(self, engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: return create_session_factory(engine) # UOW-scoped session (one per unit of work) @@ -49,15 +50,28 @@ async def get_session( yield session # UOW-scoped repositories - dep_repo = provide( - PostgresDepositionRepository, scope=Scope.UOW, provides=DepositionRepository - ) - record_repo = provide( - PostgresRecordRepository, scope=Scope.UOW, provides=RecordRepository - ) + dep_repo = provide(PostgresDepositionRepository, scope=Scope.UOW, provides=DepositionRepository) + record_repo = provide(PostgresRecordRepository, scope=Scope.UOW, provides=RecordRepository) validation_run_repo = provide( - PostgresValidationRunRepository, scope=Scope.UOW, provides=ValidationRunRepository - ) - event_repo = provide( - SQLAlchemyEventRepository, scope=Scope.UOW, provides=EventRepository + PostgresValidationRunRepository, + scope=Scope.UOW, + provides=ValidationRunRepository, ) + event_repo = provide(SQLAlchemyEventRepository, scope=Scope.UOW, provides=EventRepository) + + @provide(scope=Scope.UOW) + def get_record_service( + self, + record_repo: RecordRepository, + outbox: Outbox, + config: Config, + ) -> RecordService: + """Provide RecordService for UOW scope. + + RecordService is UOW-scoped because it needs fresh Outbox per unit of work. + """ + return RecordService( + record_repo=record_repo, + outbox=outbox, + node_domain=Domain(config.server.domain), + ) diff --git a/osa/infrastructure/persistence/mappers/deposition.py b/osa/infrastructure/persistence/mappers/deposition.py index 89bf19d..5d051de 100644 --- a/osa/infrastructure/persistence/mappers/deposition.py +++ b/osa/infrastructure/persistence/mappers/deposition.py @@ -31,9 +31,7 @@ def deposition_to_dict(dep: Deposition) -> dict[str, Any]: return { "srn": str(dep.srn), "status": dep.status, - "metadata": dep.metadata - if isinstance(dep.metadata, dict) - else dep.metadata.model_dump(), + "metadata": dep.metadata if isinstance(dep.metadata, dict) else dep.metadata.model_dump(), "files": [f.model_dump(mode="json") for f in dep.files], "provenance": dep.provenance, "record_id": str(dep.record_srn) if dep.record_srn else None, diff --git a/osa/infrastructure/persistence/mappers/record.py b/osa/infrastructure/persistence/mappers/record.py index 190efdb..237ec54 100644 --- a/osa/infrastructure/persistence/mappers/record.py +++ b/osa/infrastructure/persistence/mappers/record.py @@ -31,9 +31,7 @@ def row_to_record(row: dict[str, Any]) -> Record: def record_to_dict(record: Record) -> dict[str, Any]: """Convert Record aggregate to database dict.""" - indexes_dict = { - key: ref.model_dump(mode="json") for key, ref in record.indexes.items() - } + indexes_dict = {key: ref.model_dump(mode="json") for key, ref in record.indexes.items()} return { "srn": str(record.srn), diff --git a/osa/infrastructure/persistence/repository/deposition.py b/osa/infrastructure/persistence/repository/deposition.py index ccc54d7..40c5ef5 100644 --- a/osa/infrastructure/persistence/repository/deposition.py +++ b/osa/infrastructure/persistence/repository/deposition.py @@ -19,9 +19,9 @@ def __init__(self, session: AsyncSession) -> None: async def save(self, deposition: Deposition) -> None: dep_dict = deposition_to_dict(deposition) - + existing = await self.get(deposition.srn) - + if existing: stmt = ( update(depositions_table) @@ -30,7 +30,7 @@ async def save(self, deposition: Deposition) -> None: ) else: stmt = insert(depositions_table).values(**dep_dict) - + await self.session.execute(stmt) await self.session.flush() diff --git a/osa/infrastructure/persistence/repository/event.py b/osa/infrastructure/persistence/repository/event.py index 2db410e..219cb69 100644 --- a/osa/infrastructure/persistence/repository/event.py +++ b/osa/infrastructure/persistence/repository/event.py @@ -63,11 +63,7 @@ async def update_status( if error is not None: values["delivery_error"] = error - stmt = ( - update(events_table) - .where(events_table.c.id == str(event_id)) - .values(**values) - ) + stmt = update(events_table).where(events_table.c.id == str(event_id)).values(**values) await self._session.execute(stmt) async def find_pending(self, limit: int = 100) -> list[Event]: @@ -129,9 +125,7 @@ async def list_events( # Cursor: get events after the given ID if after is not None: - cursor_stmt = select(events_table.c.created_at).where( - events_table.c.id == str(after) - ) + cursor_stmt = select(events_table.c.created_at).where(events_table.c.id == str(after)) cursor_result = await self._session.execute(cursor_stmt) cursor_row = cursor_result.first() if cursor_row: diff --git a/osa/infrastructure/persistence/repository/record.py b/osa/infrastructure/persistence/repository/record.py index 7bdd9be..ddb6dc7 100644 --- a/osa/infrastructure/persistence/repository/record.py +++ b/osa/infrastructure/persistence/repository/record.py @@ -32,9 +32,7 @@ async def get(self, srn: RecordSRN) -> Record | None: async def find_by_deposition(self, deposition_srn: DepositionSRN) -> Record | None: """Find the record created from a deposition.""" - stmt = select(records_table).where( - records_table.c.deposition_srn == str(deposition_srn) - ) + stmt = select(records_table).where(records_table.c.deposition_srn == str(deposition_srn)) result = await self.session.execute(stmt) row = result.mappings().first() return row_to_record(dict(row)) if row else None diff --git a/osa/infrastructure/persistence/repository/validation.py b/osa/infrastructure/persistence/repository/validation.py index ae8f1b4..a15cb50 100644 --- a/osa/infrastructure/persistence/repository/validation.py +++ b/osa/infrastructure/persistence/repository/validation.py @@ -18,9 +18,7 @@ def __init__(self, session: AsyncSession) -> None: self._session = session async def get(self, srn: ValidationRunSRN) -> ValidationRun | None: - stmt = select(validation_runs_table).where( - validation_runs_table.c.srn == str(srn) - ) + stmt = select(validation_runs_table).where(validation_runs_table.c.srn == str(srn)) result = await self._session.execute(stmt) row = result.mappings().first() return row_to_validation_run(dict(row)) if row else None diff --git a/osa/infrastructure/persistence/tables.py b/osa/infrastructure/persistence/tables.py index 7bb4243..c0d2e40 100644 --- a/osa/infrastructure/persistence/tables.py +++ b/osa/infrastructure/persistence/tables.py @@ -82,5 +82,9 @@ Column("delivery_error", Text, nullable=True), ) -Index("idx_events_type_created", events_table.c.event_type, events_table.c.created_at.desc()) +Index( + "idx_events_type_created", + events_table.c.event_type, + events_table.c.created_at.desc(), +) Index("idx_events_delivery_status", events_table.c.delivery_status) diff --git a/pyproject.toml b/pyproject.toml index 091733b..9d9251e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,12 +42,15 @@ build-backend = "hatchling.build" dev = [ "coverage>=7.12.0", "pre-commit>=4.5.0", - "pyright>=1.1.407", "pytest>=9.0.1", "pytest-asyncio>=1.3.0", + "pytest-cov>=7.0.0", "ruff>=0.14.6", "ty>=0.0.5", ] [tool.pytest.ini_options] asyncio_mode = "auto" + +[tool.ruff] +line-length = 100 diff --git a/tests/test_ingestors/geo_entrez/test_ingestor.py b/tests/test_ingestors/geo_entrez/test_ingestor.py index 29cfb5b..816ff18 100644 --- a/tests/test_ingestors/geo_entrez/test_ingestor.py +++ b/tests/test_ingestors/geo_entrez/test_ingestor.py @@ -34,9 +34,7 @@ async def test_health_returns_true(self, geo_ingestor: GEOEntrezIngestor) -> Non result = await geo_ingestor.health() assert result is True - async def test_get_one_returns_upstream_record( - self, geo_ingestor: GEOEntrezIngestor - ) -> None: + async def test_get_one_returns_upstream_record(self, geo_ingestor: GEOEntrezIngestor) -> None: """Fetching a known GSE should return a valid UpstreamRecord.""" # GSE1 is one of the earliest GEO series, stable for testing record = await geo_ingestor.get_one("GSE1") @@ -54,16 +52,12 @@ async def test_get_one_returns_upstream_record( assert "title" in record.metadata assert record.metadata["title"] is not None - async def test_get_one_nonexistent_returns_none( - self, geo_ingestor: GEOEntrezIngestor - ) -> None: + async def test_get_one_nonexistent_returns_none(self, geo_ingestor: GEOEntrezIngestor) -> None: """Fetching a nonexistent GSE should return None.""" record = await geo_ingestor.get_one("GSE999999999999") assert record is None - async def test_pull_yields_upstream_records( - self, geo_ingestor: GEOEntrezIngestor - ) -> None: + async def test_pull_yields_upstream_records(self, geo_ingestor: GEOEntrezIngestor) -> None: """Pulling records should yield valid UpstreamRecords.""" records: list[UpstreamRecord] = [] diff --git a/tests/unit/domain/deposition/test_aggregate.py b/tests/unit/domain/deposition/test_aggregate.py deleted file mode 100644 index 16c7b31..0000000 --- a/tests/unit/domain/deposition/test_aggregate.py +++ /dev/null @@ -1,47 +0,0 @@ -from datetime import datetime -from osa.domain.deposition.model.value import DepositionStatus, DepositionFile -from osa.domain.deposition.model.aggregate import Deposition -from osa.domain.shared.model.srn import DepositionSRN, DepositionProfileSRN - -class TestDepositionAggregate: - def test_create_deposition(self): - srn = DepositionSRN.parse("urn:osa:mock-node:dep:mock-id") - profile_srn = DepositionProfileSRN.parse("urn:osa:osa-registry:profile:default@1.0.0") - - dep = Deposition( - srn=srn, - profile_srn=profile_srn, - status=DepositionStatus.DRAFT, - payload={"title": "Test Deposition"} - ) - - assert dep.srn == srn - assert dep.profile_srn == profile_srn - assert dep.status == DepositionStatus.DRAFT - assert dep.payload == {"title": "Test Deposition"} - assert dep.files == [] - - def test_add_and_remove_files(self): - srn = DepositionSRN.parse("urn:osa:mock-node:dep:mock-id") - profile_srn = DepositionProfileSRN.parse("urn:osa:osa-registry:profile:default@1.0.0") - - dep = Deposition( - srn=srn, - profile_srn=profile_srn, - status=DepositionStatus.DRAFT, - payload={} - ) - - file = DepositionFile( - name="data.csv", - size=1024, - checksum="sha256:abc", - uploaded_at=datetime.now() - ) - - dep.files.append(file) - assert len(dep.files) == 1 - assert dep.files[0].name == "data.csv" - - dep.remove_all_files() - assert len(dep.files) == 0 diff --git a/tests/unit/domain/index/__init__.py b/tests/unit/domain/index/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/domain/index/test_index_service.py b/tests/unit/domain/index/test_index_service.py new file mode 100644 index 0000000..cb67ab0 --- /dev/null +++ b/tests/unit/domain/index/test_index_service.py @@ -0,0 +1,127 @@ +"""Unit tests for IndexService.""" + +from typing import Any +from unittest.mock import AsyncMock +from uuid import uuid4 + +import pytest + +from osa.domain.index.model.registry import IndexRegistry +from osa.domain.index.service.index import IndexService +from osa.domain.shared.model.srn import Domain, LocalId, RecordSRN, RecordVersion + + +class FakeBackend: + """Fake storage backend for testing.""" + + def __init__(self): + self.ingested: list[tuple[str, dict]] = [] + self.ingest = AsyncMock(side_effect=self._ingest) + + async def _ingest(self, record_id: str, metadata: dict[str, Any]) -> None: + self.ingested.append((record_id, metadata)) + + +class FailingBackend: + """Backend that always fails for testing error handling.""" + + def __init__(self): + self.ingest = AsyncMock(side_effect=Exception("Backend failure")) + + +@pytest.fixture +def sample_record_srn() -> RecordSRN: + """Create a sample record SRN.""" + return RecordSRN( + domain=Domain("test.example.com"), + id=LocalId(str(uuid4())), + version=RecordVersion(1), + ) + + +@pytest.fixture +def sample_metadata() -> dict: + """Create sample metadata for testing.""" + return { + "title": "Test Record", + "organism": "human", + "platform": "GPL570", + } + + +class TestIndexService: + """Tests for IndexService.""" + + @pytest.mark.asyncio + async def test_index_record_indexes_to_all_backends( + self, + sample_record_srn: RecordSRN, + sample_metadata: dict, + ): + """Service should index record to all configured backends.""" + # Arrange + backend1 = FakeBackend() + backend2 = FakeBackend() + registry = IndexRegistry({"backend1": backend1, "backend2": backend2}) + + service = IndexService(indexes=registry) + + # Act + await service.index_record( + record_srn=sample_record_srn, + metadata=sample_metadata, + ) + + # Assert + assert len(backend1.ingested) == 1 + assert len(backend2.ingested) == 1 + assert backend1.ingested[0][0] == str(sample_record_srn) + assert backend1.ingested[0][1] == sample_metadata + + @pytest.mark.asyncio + async def test_index_record_handles_backend_failure( + self, + sample_record_srn: RecordSRN, + sample_metadata: dict, + ): + """Service should continue indexing to other backends if one fails.""" + # Arrange + failing_backend = FailingBackend() + working_backend = FakeBackend() + registry = IndexRegistry( + { + "failing": failing_backend, + "working": working_backend, + } + ) + + service = IndexService(indexes=registry) + + # Act - should not raise, just log error + await service.index_record( + record_srn=sample_record_srn, + metadata=sample_metadata, + ) + + # Assert - working backend should still have received the record + assert len(working_backend.ingested) == 1 + assert working_backend.ingested[0][0] == str(sample_record_srn) + + @pytest.mark.asyncio + async def test_index_record_empty_registry( + self, + sample_record_srn: RecordSRN, + sample_metadata: dict, + ): + """Service should handle empty registry gracefully.""" + # Arrange + registry = IndexRegistry({}) + service = IndexService(indexes=registry) + + # Act - should not raise + await service.index_record( + record_srn=sample_record_srn, + metadata=sample_metadata, + ) + + # Assert - no exception means success diff --git a/tests/unit/domain/ingest/__init__.py b/tests/unit/domain/ingest/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/domain/ingest/test_ingest_service.py b/tests/unit/domain/ingest/test_ingest_service.py new file mode 100644 index 0000000..ea6a2c9 --- /dev/null +++ b/tests/unit/domain/ingest/test_ingest_service.py @@ -0,0 +1,186 @@ +"""Unit tests for IngestService.""" + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from osa.config import Config +from osa.domain.ingest.model.registry import IngestorRegistry +from osa.domain.ingest.service.ingest import IngestService +from osa.domain.shared.model.srn import Domain +from osa.domain.shared.outbox import Outbox +from osa.sdk.ingest.record import UpstreamRecord + + +class FakeIngestor: + """Fake ingestor for testing.""" + + name = "fake-ingestor" + + def __init__(self, records: list[UpstreamRecord]): + self._records = records + + async def pull(self, since: datetime | None = None, limit: int | None = None): + for record in self._records[: limit if limit else len(self._records)]: + yield record + + +@pytest.fixture +def mock_outbox() -> Outbox: + """Create a mock Outbox.""" + outbox = MagicMock(spec=Outbox) + outbox.append = AsyncMock() + return outbox + + +@pytest.fixture +def mock_config() -> Config: + """Create a mock Config with server domain.""" + config = MagicMock(spec=Config) + config.server = MagicMock() + config.server.domain = "test.example.com" + return config + + +@pytest.fixture +def sample_records() -> list[UpstreamRecord]: + """Create sample upstream records for testing.""" + now = datetime.now(timezone.utc) + return [ + UpstreamRecord( + source_id="GSE001", + source_type="geo", + metadata={"title": "Test Record 1", "organism": "human"}, + fetched_at=now, + ), + UpstreamRecord( + source_id="GSE002", + source_type="geo", + metadata={"title": "Test Record 2", "organism": "mouse"}, + fetched_at=now, + ), + ] + + +class TestIngestService: + """Tests for IngestService.""" + + @pytest.mark.asyncio + async def test_run_ingest_emits_deposition_events( + self, + mock_outbox: Outbox, + mock_config: Config, + sample_records: list[UpstreamRecord], + ): + """Service should emit DepositionSubmittedEvent for each ingested record.""" + # Arrange + fake_ingestor = FakeIngestor(sample_records) + registry = IngestorRegistry({"fake": fake_ingestor}) + + service = IngestService( + ingestors=registry, + outbox=mock_outbox, + node_domain=Domain(mock_config.server.domain), + ) + + # Act + result = await service.run_ingest( + ingestor_name="fake", + since=None, + limit=None, + ) + + # Assert + assert result.record_count == 2 + assert result.ingestor_name == "fake" + # Two DepositionSubmittedEvent + one IngestionRunCompleted + assert mock_outbox.append.call_count == 3 + + @pytest.mark.asyncio + async def test_run_ingest_with_limit( + self, + mock_outbox: Outbox, + mock_config: Config, + sample_records: list[UpstreamRecord], + ): + """Service should respect limit parameter.""" + # Arrange + fake_ingestor = FakeIngestor(sample_records) + registry = IngestorRegistry({"fake": fake_ingestor}) + + service = IngestService( + ingestors=registry, + outbox=mock_outbox, + node_domain=Domain(mock_config.server.domain), + ) + + # Act + result = await service.run_ingest( + ingestor_name="fake", + since=None, + limit=1, + ) + + # Assert + assert result.record_count == 1 + + @pytest.mark.asyncio + async def test_run_ingest_unknown_ingestor_raises( + self, + mock_outbox: Outbox, + mock_config: Config, + ): + """Service should raise error for unknown ingestor.""" + # Arrange + registry = IngestorRegistry({}) + + service = IngestService( + ingestors=registry, + outbox=mock_outbox, + node_domain=Domain(mock_config.server.domain), + ) + + # Act & Assert + with pytest.raises(ValueError, match="Unknown ingestor"): + await service.run_ingest( + ingestor_name="nonexistent", + since=None, + limit=None, + ) + + @pytest.mark.asyncio + async def test_run_ingest_emits_completion_event( + self, + mock_outbox: Outbox, + mock_config: Config, + sample_records: list[UpstreamRecord], + ): + """Service should emit IngestionRunCompleted event after ingestion.""" + # Arrange + fake_ingestor = FakeIngestor(sample_records) + registry = IngestorRegistry({"fake": fake_ingestor}) + + service = IngestService( + ingestors=registry, + outbox=mock_outbox, + node_domain=Domain(mock_config.server.domain), + ) + + # Act + await service.run_ingest( + ingestor_name="fake", + since=None, + limit=None, + ) + + # Assert - last call should be the completion event + from osa.domain.ingest.event.ingestion_run_completed import ( + IngestionRunCompleted, + ) + + last_call = mock_outbox.append.call_args_list[-1] + event = last_call[0][0] + assert isinstance(event, IngestionRunCompleted) + assert event.record_count == 2 + assert event.ingestor_name == "fake" diff --git a/tests/unit/domain/record/__init__.py b/tests/unit/domain/record/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/domain/record/test_record_service.py b/tests/unit/domain/record/test_record_service.py new file mode 100644 index 0000000..38c9924 --- /dev/null +++ b/tests/unit/domain/record/test_record_service.py @@ -0,0 +1,143 @@ +"""Unit tests for RecordService.""" + +from unittest.mock import AsyncMock, MagicMock +from uuid import uuid4 + +import pytest + +from osa.domain.record.event.record_published import RecordPublished +from osa.domain.record.port.repository import RecordRepository +from osa.domain.record.service.record import RecordService +from osa.domain.shared.model.srn import DepositionSRN, Domain, LocalId +from osa.domain.shared.outbox import Outbox + + +@pytest.fixture +def mock_record_repo() -> RecordRepository: + """Create a mock RecordRepository.""" + repo = MagicMock(spec=RecordRepository) + repo.save = AsyncMock() + return repo + + +@pytest.fixture +def mock_outbox() -> Outbox: + """Create a mock Outbox.""" + outbox = MagicMock(spec=Outbox) + outbox.append = AsyncMock() + return outbox + + +@pytest.fixture +def node_domain() -> Domain: + """Create test node domain.""" + return Domain("test.example.com") + + +@pytest.fixture +def sample_deposition_srn(node_domain: Domain) -> DepositionSRN: + """Create a sample deposition SRN.""" + return DepositionSRN( + domain=node_domain, + id=LocalId(str(uuid4())), + ) + + +@pytest.fixture +def sample_metadata() -> dict: + """Create sample metadata for testing.""" + return { + "title": "Test Record", + "organism": "human", + "platform": "GPL570", + } + + +class TestRecordService: + """Tests for RecordService.""" + + @pytest.mark.asyncio + async def test_publish_record_creates_record( + self, + mock_record_repo: RecordRepository, + mock_outbox: Outbox, + node_domain: Domain, + sample_deposition_srn: DepositionSRN, + sample_metadata: dict, + ): + """Service should create and persist a Record.""" + # Arrange + service = RecordService( + record_repo=mock_record_repo, + outbox=mock_outbox, + node_domain=node_domain, + ) + + # Act + record = await service.publish_record( + deposition_srn=sample_deposition_srn, + metadata=sample_metadata, + ) + + # Assert + assert record is not None + assert record.deposition_srn == sample_deposition_srn + assert record.metadata == sample_metadata + mock_record_repo.save.assert_called_once() + + @pytest.mark.asyncio + async def test_publish_record_emits_record_published_event( + self, + mock_record_repo: RecordRepository, + mock_outbox: Outbox, + node_domain: Domain, + sample_deposition_srn: DepositionSRN, + sample_metadata: dict, + ): + """Service should emit RecordPublished event.""" + # Arrange + service = RecordService( + record_repo=mock_record_repo, + outbox=mock_outbox, + node_domain=node_domain, + ) + + # Act + record = await service.publish_record( + deposition_srn=sample_deposition_srn, + metadata=sample_metadata, + ) + + # Assert + mock_outbox.append.assert_called_once() + event = mock_outbox.append.call_args[0][0] + assert isinstance(event, RecordPublished) + assert event.record_srn == record.srn + assert event.deposition_srn == sample_deposition_srn + assert event.metadata == sample_metadata + + @pytest.mark.asyncio + async def test_publish_record_creates_version_1( + self, + mock_record_repo: RecordRepository, + mock_outbox: Outbox, + node_domain: Domain, + sample_deposition_srn: DepositionSRN, + sample_metadata: dict, + ): + """New records should be version 1.""" + # Arrange + service = RecordService( + record_repo=mock_record_repo, + outbox=mock_outbox, + node_domain=node_domain, + ) + + # Act + record = await service.publish_record( + deposition_srn=sample_deposition_srn, + metadata=sample_metadata, + ) + + # Assert + assert record.srn.version.root == 1 diff --git a/tests/unit/domain/shadow/test_models.py b/tests/unit/domain/shadow/test_models.py deleted file mode 100644 index 4e0ecb2..0000000 --- a/tests/unit/domain/shadow/test_models.py +++ /dev/null @@ -1,32 +0,0 @@ -from datetime import datetime -from osa.domain.shadow.model.aggregate import ShadowRequest, ShadowId -from osa.domain.shadow.model.report import ShadowReport -from osa.domain.shadow.model.value import ShadowStatus -from osa.domain.shared.model.srn import DepositionProfileSRN - -class TestShadowModels: - def test_shadow_request_creation(self): - req = ShadowRequest( - id=ShadowId("shadow-123"), - status=ShadowStatus.PENDING, - source_url="http://example.com/data.zip", - profile_srn=DepositionProfileSRN.parse("urn:osa:osa-registry:profile:default@1.0.0") - ) - - assert req.id == "shadow-123" - assert req.status == ShadowStatus.PENDING - assert req.source_url == "http://example.com/data.zip" - assert req.deposition_id is None - - def test_shadow_report_creation(self): - report = ShadowReport( - shadow_id=ShadowId("shadow-123"), - source_domain="example.com", - validation_summary={"status": "pass"}, - score="5/5", - created_at=datetime.now() - ) - - assert report.shadow_id == "shadow-123" - assert report.source_domain == "example.com" - assert report.score == "5/5" diff --git a/tests/unit/domain/shared/test_srn.py b/tests/unit/domain/shared/test_srn.py index 49b90ce..669c5ed 100644 --- a/tests/unit/domain/shared/test_srn.py +++ b/tests/unit/domain/shared/test_srn.py @@ -1,22 +1,27 @@ import pytest -from osa.domain.shared.model.srn import SRN, RecordSRN, DepositionSRN, SchemaSRN, ResourceType +from osa.domain.shared.model.srn import ( + SRN, + RecordSRN, + DepositionSRN, + SchemaSRN, + ResourceType, +) + class TestSRN: def test_parse_record_srn(self): raw = "urn:osa:node-1:rec:123@1" - srn = SRN.parse(raw) - assert isinstance(srn, RecordSRN) + srn = RecordSRN.parse(raw) assert srn.type == ResourceType.rec - assert srn.local.root == "123" + assert srn.id.root == "123" assert srn.version is not None assert srn.version.root == 1 def test_parse_schema_srn(self): raw = "urn:osa:node-1:schema:my-schema@1.0.0" - srn = SRN.parse(raw) - assert isinstance(srn, SchemaSRN) + srn = SchemaSRN.parse(raw) assert srn.type == ResourceType.schema - assert srn.local.root == "my-schema" + assert srn.id.root == "my-schema" assert str(srn.version) == "1.0.0" def test_render_srn(self): diff --git a/tests/unit/domain/validation/test_command.py b/tests/unit/domain/validation/test_command.py deleted file mode 100644 index f1a5914..0000000 --- a/tests/unit/domain/validation/test_command.py +++ /dev/null @@ -1,125 +0,0 @@ -from datetime import datetime, timezone -from unittest.mock import AsyncMock - -import pytest - -from osa.domain.shared.model.srn import Domain, LocalId, Semver, TraitSRN -from osa.domain.validation.command import RegisterTrait, RegisterTraitHandler -from osa.domain.validation.model import ( - TraitStatus, - Validator, - ValidatorLimits, - ValidatorRef, -) - - -class TestRegisterTraitHandler: - @pytest.fixture - def mock_trait_repo(self): - return AsyncMock() - - @pytest.fixture - def handler(self, mock_trait_repo): - return RegisterTraitHandler(trait_repo=mock_trait_repo) - - @pytest.fixture - def trait_srn(self): - return TraitSRN( - domain=Domain("localhost"), - id=LocalId("test-trait"), - version=Semver("1.0.0"), - ) - - @pytest.fixture - def validator(self): - return Validator( - ref=ValidatorRef( - image="ghcr.io/test/validator", - digest="sha256:abc123", - ), - limits=ValidatorLimits( - timeout_seconds=60, - memory="256Mi", - cpu="0.5", - ), - ) - - async def test_register_trait_creates_trait( - self, handler, mock_trait_repo, trait_srn, validator - ): - """Test that RegisterTrait creates a new trait.""" - cmd = RegisterTrait( - srn=trait_srn, - slug="test-trait", - name="Test Trait", - description="A trait for testing", - validator=validator, - ) - - result = await handler.run(cmd) - - assert result.srn == trait_srn - mock_trait_repo.save.assert_called_once() - - # Check the saved trait - saved_trait = mock_trait_repo.save.call_args[0][0] - assert saved_trait.srn == trait_srn - assert saved_trait.slug == "test-trait" - assert saved_trait.name == "Test Trait" - assert saved_trait.description == "A trait for testing" - assert saved_trait.validator == validator - assert saved_trait.status == TraitStatus.DRAFT - - async def test_register_trait_sets_created_at( - self, handler, mock_trait_repo, trait_srn, validator - ): - """Test that RegisterTrait sets created_at timestamp.""" - before = datetime.now(timezone.utc) - - cmd = RegisterTrait( - srn=trait_srn, - slug="test-trait", - name="Test Trait", - description="A trait for testing", - validator=validator, - ) - - await handler.run(cmd) - - after = datetime.now(timezone.utc) - saved_trait = mock_trait_repo.save.call_args[0][0] - - assert before <= saved_trait.created_at <= after - - async def test_register_trait_preserves_validator_config( - self, handler, mock_trait_repo, trait_srn - ): - """Test that validator configuration is preserved.""" - custom_validator = Validator( - ref=ValidatorRef( - image="custom/image:latest", - digest="sha256:customdigest", - ), - limits=ValidatorLimits( - timeout_seconds=300, - memory="1Gi", - cpu="2.0", - ), - ) - - cmd = RegisterTrait( - srn=trait_srn, - slug="custom-trait", - name="Custom Trait", - description="Trait with custom validator config", - validator=custom_validator, - ) - - await handler.run(cmd) - - saved_trait = mock_trait_repo.save.call_args[0][0] - assert saved_trait.validator.ref.image == "custom/image:latest" - assert saved_trait.validator.ref.digest == "sha256:customdigest" - assert saved_trait.validator.limits.timeout_seconds == 300 - assert saved_trait.validator.limits.memory == "1Gi" - assert saved_trait.validator.limits.cpu == "2.0" diff --git a/tests/unit/domain/validation/test_models.py b/tests/unit/domain/validation/test_models.py deleted file mode 100644 index 22159c1..0000000 --- a/tests/unit/domain/validation/test_models.py +++ /dev/null @@ -1,197 +0,0 @@ -from datetime import datetime, timezone - -from osa.domain.shared.model.srn import Domain, LocalId, Semver, TraitSRN, ValidationRunSRN -from osa.domain.validation.model import ( - CheckResult, - CheckStatus, - RunStatus, - Trait, - TraitStatus, - ValidationRun, - Validator, - ValidatorLimits, - ValidatorRef, -) - - -class TestValidatorRef: - def test_create_validator_ref(self): - ref = ValidatorRef( - image="ghcr.io/osap/validators/si-units", - digest="sha256:abc123", - ) - assert ref.image == "ghcr.io/osap/validators/si-units" - assert ref.digest == "sha256:abc123" - - -class TestValidatorLimits: - def test_default_limits(self): - limits = ValidatorLimits() - assert limits.timeout_seconds == 60 - assert limits.memory == "256Mi" - assert limits.cpu == "0.5" - - def test_custom_limits(self): - limits = ValidatorLimits( - timeout_seconds=120, - memory="512Mi", - cpu="1.0", - ) - assert limits.timeout_seconds == 120 - assert limits.memory == "512Mi" - assert limits.cpu == "1.0" - - -class TestValidator: - def test_create_validator_with_defaults(self): - ref = ValidatorRef(image="test/image", digest="sha256:123") - validator = Validator(ref=ref) - - assert validator.ref == ref - assert validator.limits.timeout_seconds == 60 - assert validator.limits.memory == "256Mi" - - def test_create_validator_with_custom_limits(self): - ref = ValidatorRef(image="test/image", digest="sha256:123") - limits = ValidatorLimits(timeout_seconds=300, memory="1Gi", cpu="2.0") - validator = Validator(ref=ref, limits=limits) - - assert validator.ref == ref - assert validator.limits.timeout_seconds == 300 - assert validator.limits.memory == "1Gi" - - -class TestTrait: - def test_create_trait(self): - trait_srn = TraitSRN( - domain=Domain("osap.org"), - id=LocalId("si-units"), - version=Semver("1.0.0"), - ) - validator = Validator( - ref=ValidatorRef(image="ghcr.io/osap/si-units", digest="sha256:abc"), - ) - now = datetime.now(timezone.utc) - - trait = Trait( - srn=trait_srn, - slug="si-units", - name="SI Units Compliance", - description="Validates that all measurements use SI units", - validator=validator, - status=TraitStatus.DRAFT, - created_at=now, - ) - - assert trait.srn == trait_srn - assert trait.slug == "si-units" - assert trait.name == "SI Units Compliance" - assert trait.status == TraitStatus.DRAFT - assert trait.validator.ref.image == "ghcr.io/osap/si-units" - - def test_trait_status_values(self): - assert TraitStatus.DRAFT == "draft" - assert TraitStatus.ACTIVE == "active" - assert TraitStatus.DEPRECATED == "deprecated" - - -class TestCheckResult: - def test_create_check_result_passed(self): - result = CheckResult( - trait_srn="urn:osa:osap.org:trait:si-units@1.0.0", - validator_digest="sha256:abc123", - status=CheckStatus.PASSED, - ) - assert result.status == CheckStatus.PASSED - assert result.message is None - assert result.details is None - - def test_create_check_result_failed_with_message(self): - result = CheckResult( - trait_srn="urn:osa:osap.org:trait:si-units@1.0.0", - validator_digest="sha256:abc123", - status=CheckStatus.FAILED, - message="Temperature column uses Fahrenheit instead of Kelvin", - details={"column": "temperature", "found": "F", "expected": "K"}, - ) - assert result.status == CheckStatus.FAILED - assert "Fahrenheit" in result.message - assert result.details["column"] == "temperature" - - def test_check_status_values(self): - assert CheckStatus.PASSED == "passed" - assert CheckStatus.WARNINGS == "warnings" - assert CheckStatus.FAILED == "failed" - assert CheckStatus.ERROR == "error" - - -class TestValidationRun: - def test_create_validation_run(self): - run_srn = ValidationRunSRN( - domain=Domain("localhost"), - id=LocalId("run-123"), - version=None, - ) - trait_srn = TraitSRN( - domain=Domain("osap.org"), - id=LocalId("si-units"), - version=Semver("1.0.0"), - ) - - run = ValidationRun( - srn=run_srn, - trait_srns=[trait_srn], - status=RunStatus.PENDING, - ) - - assert run.srn == run_srn - assert run.trait_srns == [trait_srn] - assert run.status == RunStatus.PENDING - assert run.results == [] - assert run.started_at is None - assert run.completed_at is None - - def test_validation_run_with_results(self): - run_srn = ValidationRunSRN( - domain=Domain("localhost"), - id=LocalId("run-123"), - version=None, - ) - trait_srns = [ - TraitSRN(domain=Domain("osap.org"), id=LocalId("si-units"), version=Semver("1.0.0")), - TraitSRN(domain=Domain("osap.org"), id=LocalId("iso-dates"), version=Semver("1.0.0")), - ] - now = datetime.now(timezone.utc) - - results = [ - CheckResult( - trait_srn="urn:osa:osap.org:trait:si-units@1.0.0", - validator_digest="sha256:abc", - status=CheckStatus.PASSED, - ), - CheckResult( - trait_srn="urn:osa:osap.org:trait:iso-dates@1.0.0", - validator_digest="sha256:def", - status=CheckStatus.FAILED, - message="Invalid date format", - ), - ] - - run = ValidationRun( - srn=run_srn, - trait_srns=trait_srns, - status=RunStatus.COMPLETED, - results=results, - started_at=now, - completed_at=now, - ) - - assert len(run.results) == 2 - assert run.results[0].status == CheckStatus.PASSED - assert run.results[1].status == CheckStatus.FAILED - - def test_run_status_values(self): - assert RunStatus.PENDING == "pending" - assert RunStatus.RUNNING == "running" - assert RunStatus.COMPLETED == "completed" - assert RunStatus.FAILED == "failed" diff --git a/tests/unit/domain/validation/test_service.py b/tests/unit/domain/validation/test_service.py deleted file mode 100644 index eb8d96c..0000000 --- a/tests/unit/domain/validation/test_service.py +++ /dev/null @@ -1,191 +0,0 @@ -from datetime import datetime, timezone -from unittest.mock import AsyncMock - -import pytest - -from osa.domain.shared.model.srn import ( - Domain, - LocalId, - Semver, - TraitSRN, -) -from osa.domain.validation.model import ( - CheckStatus, - RunStatus, - Trait, - TraitStatus, - Validator, - ValidatorLimits, - ValidatorRef, -) -from osa.domain.validation.port.runner import ValidationInputs, ValidatorOutput -from osa.domain.validation.service import ValidationService - - -def make_trait(slug: str, image: str = "test/validator") -> Trait: - """Helper to create a test trait.""" - return Trait( - srn=TraitSRN( - domain=Domain("osap.org"), - id=LocalId(slug), - version=Semver("1.0.0"), - ), - slug=slug, - name=f"Test Trait: {slug}", - description=f"Test trait for {slug}", - validator=Validator( - ref=ValidatorRef(image=image, digest="sha256:abc123"), - limits=ValidatorLimits(timeout_seconds=30), - ), - status=TraitStatus.ACTIVE, - created_at=datetime.now(timezone.utc), - ) - - -class TestValidationService: - @pytest.fixture - def mock_trait_repo(self): - repo = AsyncMock() - return repo - - @pytest.fixture - def mock_run_repo(self): - repo = AsyncMock() - return repo - - @pytest.fixture - def mock_runner(self): - runner = AsyncMock() - return runner - - @pytest.fixture - def service(self, mock_trait_repo, mock_run_repo, mock_runner): - return ValidationService( - trait_repo=mock_trait_repo, - run_repo=mock_run_repo, - runner=mock_runner, - node_domain=Domain("localhost"), - ) - - @pytest.fixture - def validation_inputs(self): - return ValidationInputs( - record_json={"title": "Test Dataset", "temperature": 293.15}, - ) - - async def test_validate_all_pass( - self, service, mock_trait_repo, mock_runner, validation_inputs - ): - """Test validation where all traits pass.""" - trait = make_trait("si-units") - mock_trait_repo.get_or_fetch.return_value = trait - mock_runner.run.return_value = ValidatorOutput( - status=CheckStatus.PASSED, - checks=[{"id": "check-1", "status": "passed"}], - ) - - trait_srns = [trait.srn] - run = await service.validate(trait_srns, validation_inputs) - - assert run.status == RunStatus.COMPLETED - assert len(run.results) == 1 - assert run.results[0].status == CheckStatus.PASSED - assert run.started_at is not None - assert run.completed_at is not None - - async def test_validate_with_failure( - self, service, mock_trait_repo, mock_runner, validation_inputs - ): - """Test validation where a trait fails.""" - trait = make_trait("si-units") - mock_trait_repo.get_or_fetch.return_value = trait - mock_runner.run.return_value = ValidatorOutput( - status=CheckStatus.FAILED, - checks=[{"id": "check-1", "status": "failed", "message": "Invalid units"}], - ) - - trait_srns = [trait.srn] - run = await service.validate(trait_srns, validation_inputs) - - assert run.status == RunStatus.FAILED - assert len(run.results) == 1 - assert run.results[0].status == CheckStatus.FAILED - - async def test_validate_multiple_traits( - self, service, mock_trait_repo, mock_runner, validation_inputs - ): - """Test validation with multiple traits.""" - trait1 = make_trait("si-units") - trait2 = make_trait("iso-dates") - - mock_trait_repo.get_or_fetch.side_effect = [trait1, trait2] - mock_runner.run.side_effect = [ - ValidatorOutput(status=CheckStatus.PASSED, checks=[]), - ValidatorOutput(status=CheckStatus.PASSED, checks=[]), - ] - - trait_srns = [trait1.srn, trait2.srn] - run = await service.validate(trait_srns, validation_inputs) - - assert run.status == RunStatus.COMPLETED - assert len(run.results) == 2 - assert all(r.status == CheckStatus.PASSED for r in run.results) - - async def test_validate_runner_exception( - self, service, mock_trait_repo, mock_runner, validation_inputs - ): - """Test that runner exceptions are captured as ERROR status.""" - trait = make_trait("si-units") - mock_trait_repo.get_or_fetch.return_value = trait - mock_runner.run.side_effect = Exception("Container failed to start") - - trait_srns = [trait.srn] - run = await service.validate(trait_srns, validation_inputs) - - assert run.status == RunStatus.FAILED - assert len(run.results) == 1 - assert run.results[0].status == CheckStatus.ERROR - assert "Container failed to start" in run.results[0].message - - async def test_validate_saves_run_states( - self, service, mock_trait_repo, mock_run_repo, mock_runner, validation_inputs - ): - """Test that the service saves run state at each stage.""" - trait = make_trait("si-units") - mock_trait_repo.get_or_fetch.return_value = trait - mock_runner.run.return_value = ValidatorOutput( - status=CheckStatus.PASSED, checks=[] - ) - - # Track status at each save call - statuses_seen = [] - - def capture_status(run): - statuses_seen.append(run.status) - - mock_run_repo.save.side_effect = capture_status - - trait_srns = [trait.srn] - await service.validate(trait_srns, validation_inputs) - - # Should save: initial (pending), running, completed - assert mock_run_repo.save.call_count == 3 - assert statuses_seen == [RunStatus.PENDING, RunStatus.RUNNING, RunStatus.COMPLETED] - - async def test_validate_stores_trait_srns( - self, service, mock_trait_repo, mock_run_repo, mock_runner, validation_inputs - ): - """Test that the validation run stores the trait SRNs.""" - trait1 = make_trait("si-units") - trait2 = make_trait("iso-dates") - - mock_trait_repo.get_or_fetch.side_effect = [trait1, trait2] - mock_runner.run.side_effect = [ - ValidatorOutput(status=CheckStatus.PASSED, checks=[]), - ValidatorOutput(status=CheckStatus.PASSED, checks=[]), - ] - - trait_srns = [trait1.srn, trait2.srn] - run = await service.validate(trait_srns, validation_inputs) - - assert run.trait_srns == trait_srns diff --git a/tests/unit/infrastructure/persistence/test_mappers.py b/tests/unit/infrastructure/persistence/test_mappers.py deleted file mode 100644 index 4dfb413..0000000 --- a/tests/unit/infrastructure/persistence/test_mappers.py +++ /dev/null @@ -1,25 +0,0 @@ -from osa.domain.shadow.model.aggregate import ShadowId, ShadowRequest -from osa.domain.shadow.model.value import ShadowStatus -from osa.domain.shared.model.srn import DepositionProfileSRN -from osa.infrastructure.persistence.mappers.shadow import shadow_request_to_dict, row_to_shadow_request - -class TestShadowMappers: - def test_shadow_request_mapping(self): - profile_srn = DepositionProfileSRN.parse("urn:osa:osa-registry:profile:default@1.0.0") - req = ShadowRequest( - id=ShadowId("shadow-123"), - status=ShadowStatus.PENDING, - source_url="http://example.com", - profile_srn=profile_srn - ) - - data = shadow_request_to_dict(req) - assert data["id"] == "shadow-123" - assert data["status"] == "pending" - assert data["source_url"] == "http://example.com" - assert data["profile_srn"] == str(profile_srn) - - reconstructed = row_to_shadow_request(data) - assert reconstructed.id == req.id - assert reconstructed.status == req.status - assert str(reconstructed.profile_srn) == str(req.profile_srn) diff --git a/uv.lock b/uv.lock index ebdc9fa..5becc7e 100644 --- a/uv.lock +++ b/uv.lock @@ -1758,9 +1758,9 @@ dependencies = [ dev = [ { name = "coverage" }, { name = "pre-commit" }, - { name = "pyright" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-cov" }, { name = "ruff" }, { name = "ty" }, ] @@ -1791,9 +1791,9 @@ requires-dist = [ dev = [ { name = "coverage", specifier = ">=7.12.0" }, { name = "pre-commit", specifier = ">=4.5.0" }, - { name = "pyright", specifier = ">=1.1.407" }, { name = "pytest", specifier = ">=9.0.1" }, { name = "pytest-asyncio", specifier = ">=1.3.0" }, + { name = "pytest-cov", specifier = ">=7.0.0" }, { name = "ruff", specifier = ">=0.14.6" }, { name = "ty", specifier = ">=0.0.5" }, ] @@ -2185,19 +2185,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5a/dc/491b7661614ab97483abf2056be1deee4dc2490ecbf7bff9ab5cdbac86e1/pyreadline3-3.5.4-py3-none-any.whl", hash = "sha256:eaf8e6cc3c49bcccf145fc6067ba8643d1df34d604a1ec0eccbf7a18e6d3fae6", size = 83178, upload-time = "2024-09-19T02:40:08.598Z" }, ] -[[package]] -name = "pyright" -version = "1.1.407" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "nodeenv" }, - { name = "typing-extensions" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/a6/1b/0aa08ee42948b61745ac5b5b5ccaec4669e8884b53d31c8ec20b2fcd6b6f/pyright-1.1.407.tar.gz", hash = "sha256:099674dba5c10489832d4a4b2d302636152a9a42d317986c38474c76fe562262", size = 4122872, upload-time = "2025-10-24T23:17:15.145Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/dc/93/b69052907d032b00c40cb656d21438ec00b3a471733de137a3f65a49a0a0/pyright-1.1.407-py3-none-any.whl", hash = "sha256:6dd419f54fcc13f03b52285796d65e639786373f433e243f8b94cf93a7444d21", size = 5997008, upload-time = "2025-10-24T23:17:13.159Z" }, -] - [[package]] name = "pytest" version = "9.0.1" @@ -2226,6 +2213,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, ] +[[package]] +name = "pytest-cov" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "coverage" }, + { name = "pluggy" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"