Skip to content

Commit

Permalink
feat: Support state backend core setting extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Feb 6, 2024
1 parent b06f1ea commit 00a5a67
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 20 deletions.
51 changes: 50 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ croniter = "^2.0.1"
fasteners = "^0.19"
flatten-dict = "^0"
google-cloud-storage = {version = ">=1.31.0", optional = true}
importlib-metadata = { version = ">=5", python = "<3.12" }
importlib-resources = "^6.1.1"
jinja2 = "^3.1.3"
jsonschema = "^4.21"
Expand Down
54 changes: 54 additions & 0 deletions src/meltano/core/behavior/pluggable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Make a feature pluggable with packaging plugins."""

from __future__ import annotations

import sys
import typing as t
from functools import cached_property

if sys.version_info >= (3, 12):
from importlib.metadata import EntryPoints, entry_points
else:
from importlib_metadata import EntryPoints, entry_points

T = t.TypeVar("T")


class Pluggable(t.Generic[T]):
"""Make a feature pluggable with packaging plugins."""

def __init__(self, entry_point_group: str):
"""Create a new pluggable feature.
Args:
entry_point_group: The entry point group for the feature.
"""
self.entry_point_group = entry_point_group

@cached_property
def meltano_plugins(self) -> EntryPoints:
"""List available plugins.
Returns:
List of available plugins.
"""
return entry_points(group=self.entry_point_group)

def get_all_meltano_plugins(self) -> dict[str, T]:
"""Get all plugins.
Returns:
All plugins.
"""
return {ep.name: ep.load() for ep in self.meltano_plugins}

def get_meltano_plugin(self, name: str) -> T:
"""Get a plugin by name.
Args:
name: The name of the plugin.
Returns:
The plugged object.
"""
return self.meltano_plugins[name].load()
12 changes: 10 additions & 2 deletions src/meltano/core/config_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import yaml

from meltano.core import bundle
from meltano.core.behavior.pluggable import Pluggable
from meltano.core.setting_definition import SettingDefinition

if t.TYPE_CHECKING:
Expand All @@ -20,9 +21,11 @@
logger = logging.getLogger(__name__)


class ConfigService:
class ConfigService(Pluggable[SettingDefinition]): # noqa: WPS214
"""Service to manage meltano.yml."""

settings_plugins: Pluggable[SettingDefinition] = Pluggable("meltano.settings")

def __init__(self, project: Project):
"""Create a new project configuration service.
Expand All @@ -40,7 +43,12 @@ def settings(self) -> list[SettingDefinition]:
"""
with open(str(bundle.root / "settings.yml")) as settings_yaml:
settings_yaml_content = yaml.safe_load(settings_yaml)
return [SettingDefinition.parse(x) for x in settings_yaml_content["settings"]]

builtin = [
SettingDefinition.parse(x) for x in settings_yaml_content["settings"]
]
plugged = list(self.settings_plugins.get_all_meltano_plugins().values())
return builtin + plugged

@cached_property
def current_meltano_yml(self) -> MeltanoFile:
Expand Down
90 changes: 77 additions & 13 deletions src/meltano/core/state_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
from __future__ import annotations

import platform
import sys
import typing as t
from enum import Enum
from urllib.parse import urlparse

from structlog.stdlib import get_logger

from meltano.core.behavior.pluggable import Pluggable
from meltano.core.db import project_engine
from meltano.core.state_store.azure import AZStorageStateStoreManager
from meltano.core.state_store.base import StateStoreManager
from meltano.core.state_store.db import DBStateStoreManager
from meltano.core.state_store.filesystem import (
LocalFilesystemStateStoreManager,
Expand All @@ -20,10 +25,28 @@
from sqlalchemy.orm import Session

from meltano.core.project_settings_service import ProjectSettingsService
from meltano.core.state_store.base import StateStoreManager

if sys.version_info >= (3, 12):
from importlib.metadata import EntryPoints, entry_points
else:
from importlib_metadata import EntryPoints, entry_points

__all__ = [
"AZStorageStateStoreManager",
"BuiltinStateBackendEnum",
"DBStateStoreManager",
"GCSStateStoreManager",
"LocalFilesystemStateStoreManager",
"S3StateStoreManager",
"StateBackend",
"StateStoreManager",
"state_store_manager_from_project_settings",
]

logger = get_logger(__name__)


class StateBackend(str, Enum):
class BuiltinStateBackendEnum(str, Enum):
"""State backend."""

SYSTEMDB = "systemdb"
Expand All @@ -35,17 +58,35 @@ class StateBackend(str, Enum):
S3 = "s3"
GCS = "gs"


class StateBackend:
"""State backend."""

state_backend_plugins: Pluggable[type[StateStoreManager]] = Pluggable(
"meltano.state_backends",
)

def __init__(self, scheme: str) -> None:
"""Create a new StateBackend.
Args:
scheme: The scheme of the StateBackend.
"""
self.scheme = scheme

@classmethod
def backends(cls) -> list[StateBackend]:
def backends(cls) -> list[str]:
"""List available state backends.
Returns:
List of available state backends.
"""
return list(cls)
return list(BuiltinStateBackendEnum) + [
ep.name for ep in cls.state_backend_plugins.meltano_plugins
]

@property
def _managers(
def _builtin_managers(
self,
) -> t.Mapping[str, type[StateStoreManager]]:
"""Get mapping of StateBackend to associated StateStoreManager.
Expand All @@ -54,11 +95,11 @@ def _managers(
Mapping of StateBackend to associated StateStoreManager.
"""
return {
self.SYSTEMDB: DBStateStoreManager,
self.LOCAL_FILESYSTEM: LocalFilesystemStateStoreManager,
self.S3: S3StateStoreManager,
self.AZURE: AZStorageStateStoreManager,
self.GCS: GCSStateStoreManager,
BuiltinStateBackendEnum.SYSTEMDB: DBStateStoreManager,
BuiltinStateBackendEnum.LOCAL_FILESYSTEM: LocalFilesystemStateStoreManager,
BuiltinStateBackendEnum.S3: S3StateStoreManager,
BuiltinStateBackendEnum.AZURE: AZStorageStateStoreManager,
BuiltinStateBackendEnum.GCS: GCSStateStoreManager,
}

@property
Expand All @@ -69,8 +110,28 @@ def manager(
Returns:
The StateStoreManager associated with this StateBackend.
Raises:
ValueError: If no state backend is found for the scheme.
"""
return self._managers[self]
try:
return self._builtin_managers[self.scheme]
except KeyError:
logger.info(
"No builtin state backend found for scheme '%s'", # noqa: WPS323
self.scheme,
)

try:
return self.state_backend_plugins.get_meltano_plugin(self.scheme)
except KeyError:
logger.info(
"No state backend plugin found for scheme '%s'", # noqa: WPS323
self.scheme,
)

# TODO: This should be a Meltano exception
raise ValueError(f"No state backend found for scheme '{self.scheme}'")


def state_store_manager_from_project_settings( # noqa: WPS210
Expand All @@ -88,7 +149,7 @@ def state_store_manager_from_project_settings( # noqa: WPS210
"""
state_backend_uri: str = settings_service.get("state_backend.uri")
parsed = urlparse(state_backend_uri)
if state_backend_uri == StateBackend.SYSTEMDB:
if state_backend_uri == BuiltinStateBackendEnum.SYSTEMDB:
return DBStateStoreManager(
session=session or project_engine(settings_service.project)[1](),
)
Expand All @@ -107,7 +168,10 @@ def state_store_manager_from_project_settings( # noqa: WPS210
)
settings = (setting_def.name for setting_def in setting_defs)
backend = StateBackend(scheme).manager
if scheme == StateBackend.LOCAL_FILESYSTEM and "Windows" in platform.system():
if (
scheme == BuiltinStateBackendEnum.LOCAL_FILESYSTEM
and "Windows" in platform.system()
):
backend = WindowsFilesystemStateStoreManager
kwargs = {name.split(".")[-1]: settings_service.get(name) for name in settings}
return backend(**kwargs)
5 changes: 5 additions & 0 deletions tests/fixtures/custom_pluggable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations


class MyPlugin:
"""Dummy plugin."""
25 changes: 25 additions & 0 deletions tests/fixtures/state_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from meltano.core.state_store import StateStoreManager


class DummyStateStoreManager(StateStoreManager):
label: str = "Dummy state store manager"

def __init__(self, **kwargs):
... # noqa: WPS428

def set(self, state):
... # noqa: WPS428

def get(self, state_id):
... # noqa: WPS428

def clear(self, state_id):
... # noqa: WPS428

def get_state_ids(self, _pattern=None):
return () # pragma: no cover

def acquire_lock(self, state_id):
... # noqa: WPS428
43 changes: 43 additions & 0 deletions tests/meltano/core/behavior/test_pluggable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

import sys
import typing as t

from fixtures.custom_pluggable import MyPlugin
from meltano.core.behavior.pluggable import Pluggable

if sys.version_info >= (3, 12):
from importlib.metadata import EntryPoint, EntryPoints
else:
from importlib_metadata import EntryPoint, EntryPoints

if t.TYPE_CHECKING:
import pytest


def test_entry_point_group():
"""Test that the entry point group is set correctly."""
pluggable = Pluggable("meltano.custom_feature")

assert pluggable.entry_point_group == "meltano.custom_feature"
assert not pluggable.meltano_plugins


def test_entry_points(monkeypatch: pytest.MonkeyPatch):
"""Test that the entry points are loaded correctly."""
pluggable: Pluggable[object] = Pluggable("meltano.custom_feature")

entry_points = EntryPoints(
(
EntryPoint(
value="fixtures.custom_pluggable:MyPlugin",
name="custom",
group="meltano.custom_feature",
),
),
)

with monkeypatch.context() as m:
m.setattr(pluggable, "meltano_plugins", entry_points)
plugin = pluggable.get_meltano_plugin("custom")
assert plugin == MyPlugin

0 comments on commit 00a5a67

Please sign in to comment.