Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: spawn multiple named ingesters #3222

Closed
DeltaMichael opened this issue Mar 18, 2024 · 0 comments
Closed

vdk-core: spawn multiple named ingesters #3222

DeltaMichael opened this issue Mar 18, 2024 · 0 comments
Assignees
Labels

Comments

@DeltaMichael
Copy link
Contributor

DeltaMichael commented Mar 18, 2024

Overview

ingestion-multiple

In order to support ingestion into multiple databases, we should be able to easily instantiate multiple ingestion objects and register them under a name. We should also be able to tell the ingestion API which named ingester to use.

To achieve this, we need

  1. A way to instantiate ingesters from simple dict configs
  2. A way to register ingester classes with vdk-core
  3. A built-in plugin that initializes all the named ingesters based on the available configurations (whatever they may be, the actual configurations are out of scope) and stores them in-memory.
  4. Pass an ingester name to job_input.send_object_for_ingestion(). Note that this might become a separate ticket due to multi-threading when doing ingestion.

Note that this should work for non-database ingesters as well, e.g. ingesters that use an API, local files or other methods.

Pseudocode

ingester_router.py

    def add_ingester_class(
        self,
        dbtype: str,
        clazz: Type[IIngesterPlugin]
    ) -> None:
        self._supported_ingester_types[dbtype.lower()] = clazz

ingester_plugin.py

class IIngesterPlugin()
    @classmethod
    @abstractmethod
    def _from_dict(cls, **kwargs):
        """
        override this if you want to support multiple connections and ingestion operations
        """
        pass

ingest_to_oracle.py

class IngestToOracle(IIngesterPlugin):
    def __init__(self, name: str, connections: ManagedConnectionRouter):
        self.conn: PEP249Connection = connections.open_named_connection(name).connect()
        self.cursor: ManagedCursor = self.conn.cursor()

    @classmethod
    def _from_dict(cls, **kwargs):
        # config and intialize from dict

oracle_plugin.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        conf = OracleConfiguration(context.core_context.configuration)
        context.ingester.add_ingester_class("oracle", IngestToOracle)

ingester_config_plugin.py

Creates all the named ingester methods based on all the classes that were previously registered. For now, it can just do nothing until we add a configuration API.

    def initialize_job(self, context: JobContext):
        for ing_type, name in self._config.named_ingesters:
            if ing_type in context.ingesters.get_ingester_types():
                ingest_class = context.ingester.get_ingest_class(ing_type)
                context.ingester.add_named_ingester_factory_method(
                    name,
                    lambda: ingest_class(name, context)
                )

Acceptance criteria

  1. The above mechanism is implemented and tested
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants