Skip to content

feat: design pluggable Ingestor plugin system #11

@rorybyrne

Description

@rorybyrne

Summary

The current ingestor configuration is coupled to specific implementations (GEOIngestorConfig). We want third-party developers to be able to create custom ingestors that plug into OSA without modifying core code.

This mirrors issue #10 (Index plugin system) to create a consistent plugin architecture across OSA.

Current State

# osa/config.py - coupled to specific implementations
from osa.infrastructure.ingest.geo.config import GEOIngestorConfig

AnyIngestorConfig = Annotated[
    Union[GEOIngestorConfig],  # Must modify this for each new ingestor
    Field(discriminator=None),
]

Adding a new ingestor (e.g., ENA, Zenodo, BioStudies) requires modifying config.py.

Goals

  1. Third-party ingestors can be installed as packages
  2. No modification to OSA core code required
  3. Config validation happens at startup (fail fast)
  4. Clear error messages for invalid configs
  5. Type-safe config within the ingestor implementation

Proposed Design

1. Ingestor Protocol with Config Class

The Ingestor protocol should declare its config class:

# osa/sdk/ingest/ingestor.py
from typing import Any, ClassVar, Protocol, AsyncIterator

class Ingestor(Protocol):
    """Protocol for pluggable data ingestors."""
    
    # The config class for this ingestor - used for validation at load time
    config_class: ClassVar[type[IngestorConfig]]
    
    @property
    def name(self) -> str: ...
    
    async def fetch(
        self,
        since: datetime | None = None,
        limit: int | None = None,
    ) -> AsyncIterator[IngestRecord]: ...
    
    async def close(self) -> None: ...

2. Ingestor Registration via Entry Points

Third-party packages register their ingestors:

# Third-party pyproject.toml
[project.entry-points."osa.ingestors"]
ena = "my_plugin.ena:ENAIngestor"
zenodo = "my_plugin.zenodo:ZenodoIngestor"

3. Discovery and Validation at Startup

# osa/infrastructure/ingest/registry.py
from importlib.metadata import entry_points

def discover_ingestors() -> dict[str, type[Ingestor]]:
    """Discover all registered ingestors."""
    eps = entry_points(group="osa.ingestors")
    return {ep.name: ep.load() for ep in eps}

def validate_ingestor_config(ingestor_name: str, raw_config: dict) -> IngestorConfig:
    """Validate raw config against the ingestor's config class."""
    ingestors = discover_ingestors()
    ingestor_cls = ingestors.get(ingestor_name)
    if ingestor_cls is None:
        raise ConfigError(f"Unknown ingestor: {ingestor_name}. "
                         f"Available: {list(ingestors.keys())}")
    
    config_cls = ingestor_cls.config_class
    return config_cls.model_validate(raw_config)

4. Simplified Core Config

# osa/config.py - no longer coupled to implementations
class IngestConfig(BaseModel):
    """Configuration for a named ingestor."""
    ingestor: str  # "geo", "ena", "zenodo", etc.
    config: dict[str, Any]  # Validated against ingestor's config_class at load time
    schedule: IngestSchedule | None = None
    initial_run: InitialRun | None = None

5. Example Third-Party Ingestor

# my_plugin/ena.py
from pydantic import BaseModel
from osa.sdk.ingest.ingestor import Ingestor, IngestorConfig

class ENAIngestorConfig(IngestorConfig):
    base_url: str = "https://www.ebi.ac.uk/ena/browser/api"
    study_accessions: list[str]
    include_samples: bool = True

class ENAIngestor:
    config_class = ENAIngestorConfig  # Used for validation
    
    def __init__(self, name: str, config: ENAIngestorConfig) -> None:
        self._name = name
        self._config = config
        # ...
    
    async def fetch(self, since: datetime | None = None, limit: int | None = None):
        # Fetch from ENA API
        ...

Built-in Ingestors

OSA ships with:

  • geo - NCBI GEO DataSets (current)

These are registered via entry points in OSA's own pyproject.toml:

[project.entry-points."osa.ingestors"]
geo = "osa.infrastructure.ingest.geo:GEOIngestor"

Tasks

  • Update Ingestor protocol to include config_class
  • Add entry point registration for built-in GEO ingestor
  • Implement ingestor discovery in DI provider
  • Update config loading to validate against discovered config classes
  • Simplify IngestConfig to use dict[str, Any]
  • Document how to create third-party ingestors
  • Add integration test for plugin discovery

Relationship to Issue #10

This issue and #10 (Index plugin system) should be implemented together to ensure:

  • Consistent plugin discovery patterns
  • Shared base utilities for entry point loading
  • Unified documentation for plugin authors
  • Consistent CLI commands (e.g., osa plugins list)

Future Considerations

  • Consider a shared plugin base module in osa/sdk/plugin/
  • CLI command to list available ingestors: osa ingestors list
  • Config schema export for documentation
  • Plugin health checks and status reporting

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions