Skip to content

Commit

Permalink
fix(taps): Avoid setting up mapper in discovery mode (#1835)
Browse files Browse the repository at this point in the history
* fix(taps): Avoid setting up mapper during discovery mode

* Explicitly avoid setting up mapper in discovery mode

* Implement in base plugin class

* Test mapper initialization
  • Loading branch information
edgarrmondragon authored Jul 13, 2023
1 parent d372080 commit 2634329
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
44 changes: 40 additions & 4 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
CapabilitiesEnum,
PluginCapabilities,
)
from singer_sdk.mapper import PluginMapper
from singer_sdk.typing import extend_validator_with_defaults

if t.TYPE_CHECKING:
from singer_sdk.mapper import PluginMapper

SDK_PACKAGE_NAME = "singer_sdk"
CHECK_SUPPORTED_PYTHON_VERSIONS = (
# unsupported versions
Expand Down Expand Up @@ -65,6 +63,14 @@
JSONSchemaValidator = extend_validator_with_defaults(Draft7Validator)


class MapperNotInitialized(Exception):
"""Raised when the mapper is not initialized."""

def __init__(self) -> None:
"""Initialize the exception."""
super().__init__("Mapper not initialized. Please call setup_mapper() first.")


class PluginBase(metaclass=abc.ABCMeta):
"""Abstract base class for taps."""

Expand Down Expand Up @@ -144,11 +150,41 @@ def __init__(
config_dict[k] = SecretString(v)
self._config = config_dict
self._validate_config(raise_errors=validate_config)
self.mapper: PluginMapper
self._mapper: PluginMapper | None = None

metrics._setup_logging(self.config)
self.metrics_logger = metrics.get_metrics_logger()

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)

@property
def mapper(self) -> PluginMapper:
"""Plugin mapper for this tap.
Returns:
A PluginMapper object.
Raises:
MapperNotInitialized: If the mapper has not been initialized.
"""
if self._mapper is None:
raise MapperNotInitialized
return self._mapper

@mapper.setter
def mapper(self, mapper: PluginMapper) -> None:
"""Set the plugin mapper for this plugin.
Args:
mapper: A PluginMapper object.
"""
self._mapper = mapper

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Expand Down
20 changes: 12 additions & 8 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
PluginCapabilities,
TapCapabilities,
)
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.mapper import PluginMapper
from singer_sdk.streams import SQLStream, Stream

STREAM_MAPS_CONFIG = "stream_maps"
Expand Down Expand Up @@ -61,6 +61,7 @@ def __init__(
state: PurePath | str | dict | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
setup_mapper: bool = True,
) -> None:
"""Initialize the tap.
Expand All @@ -73,6 +74,7 @@ def __init__(
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
setup_mapper: True to initialize the plugin mapper.
"""
super().__init__(
config=config,
Expand All @@ -94,14 +96,10 @@ def __init__(
elif catalog is not None:
self._input_catalog = Catalog.from_dict(read_json_file(catalog))

# Initialize mapper
self.mapper: PluginMapper
self.mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)
self._mapper: PluginMapper | None = None

self.mapper.register_raw_streams_from_catalog(self.catalog)
if setup_mapper:
self.setup_mapper()

# Process state
state_dict: dict = {}
Expand Down Expand Up @@ -168,6 +166,11 @@ def catalog(self) -> Catalog:

return self._catalog

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
super().setup_mapper()
self.mapper.register_raw_streams_from_catalog(self.catalog)

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get tap capabilities.
Expand Down Expand Up @@ -520,6 +523,7 @@ def cb_discover(
config=config_files, # type: ignore[arg-type]
parse_env_config=parse_env_config,
validate_config=False,
setup_mapper=False,
)
tap.run_discovery()
ctx.exit()
Expand Down
14 changes: 7 additions & 7 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.mapper import PluginMapper
from singer_sdk.sinks import Sink

_MAX_PARALLELISM = 8
Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(
config: dict | PurePath | str | list[PurePath | str] | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
setup_mapper: bool = True,
) -> None:
"""Initialize the target.
Expand All @@ -66,6 +67,7 @@ def __init__(
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
setup_mapper: True to setup the mapper. Set to False if you want to
"""
super().__init__(
config=config,
Expand All @@ -82,12 +84,10 @@ def __init__(
# Approximated for max record age enforcement
self._last_full_drain_at: float = time.time()

# Initialize mapper
self.mapper: PluginMapper
self.mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)
self._mapper: PluginMapper | None = None

if setup_mapper:
self.setup_mapper()

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
Expand Down
14 changes: 13 additions & 1 deletion tests/core/test_plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import os
from unittest import mock

from singer_sdk.plugin_base import PluginBase
import pytest

from singer_sdk.plugin_base import MapperNotInitialized, PluginBase
from singer_sdk.typing import IntegerType, PropertiesList, Property, StringType


Expand Down Expand Up @@ -41,3 +43,13 @@ def test_get_env_var_config():
assert "PROP2" not in env_config
assert "prop3" not in no_env_config
assert "PROP3" not in env_config


def test_mapper_not_initialized():
"""Test that the mapper is not initialized before the plugin is started."""
plugin = PluginTest(
parse_env_config=False,
validate_config=False,
)
with pytest.raises(MapperNotInitialized):
_ = plugin.mapper

0 comments on commit 2634329

Please sign in to comment.