Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taps): Avoid setting up mapper in discovery mode #1835

Merged
merged 4 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
CapabilitiesEnum,
PluginCapabilities,
)
from singer_sdk.mapper import PluginMapper
from singer_sdk.typing import extend_validator_with_defaults

if t.TYPE_CHECKING:
from singer_sdk.mapper import PluginMapper

SDK_PACKAGE_NAME = "singer_sdk"
CHECK_SUPPORTED_PYTHON_VERSIONS = (
# unsupported versions
Expand Down Expand Up @@ -65,6 +63,14 @@
JSONSchemaValidator = extend_validator_with_defaults(Draft7Validator)


class MapperNotInitialized(Exception):
"""Raised when the mapper is not initialized."""

def __init__(self) -> None:
"""Initialize the exception."""
super().__init__("Mapper not initialized. Please call setup_mapper() first.")


class PluginBase(metaclass=abc.ABCMeta):
"""Abstract base class for taps."""

Expand Down Expand Up @@ -144,11 +150,41 @@ def __init__(
config_dict[k] = SecretString(v)
self._config = config_dict
self._validate_config(raise_errors=validate_config)
self.mapper: PluginMapper
self._mapper: PluginMapper | None = None

metrics._setup_logging(self.config)
self.metrics_logger = metrics.get_metrics_logger()

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)

@property
def mapper(self) -> PluginMapper:
"""Plugin mapper for this tap.

Returns:
A PluginMapper object.

Raises:
MapperNotInitialized: If the mapper has not been initialized.
"""
if self._mapper is None:
raise MapperNotInitialized
return self._mapper

@mapper.setter
def mapper(self, mapper: PluginMapper) -> None:
"""Set the plugin mapper for this plugin.

Args:
mapper: A PluginMapper object.
"""
self._mapper = mapper

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get capabilities.
Expand Down
20 changes: 12 additions & 8 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
PluginCapabilities,
TapCapabilities,
)
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.mapper import PluginMapper
from singer_sdk.streams import SQLStream, Stream

STREAM_MAPS_CONFIG = "stream_maps"
Expand Down Expand Up @@ -61,6 +61,7 @@ def __init__(
state: PurePath | str | dict | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
setup_mapper: bool = True,
) -> None:
"""Initialize the tap.

Expand All @@ -73,6 +74,7 @@ def __init__(
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
setup_mapper: True to initialize the plugin mapper.
"""
super().__init__(
config=config,
Expand All @@ -94,14 +96,10 @@ def __init__(
elif catalog is not None:
self._input_catalog = Catalog.from_dict(read_json_file(catalog))

# Initialize mapper
self.mapper: PluginMapper
self.mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)
self._mapper: PluginMapper | None = None

self.mapper.register_raw_streams_from_catalog(self.catalog)
if setup_mapper:
self.setup_mapper()

# Process state
state_dict: dict = {}
Expand Down Expand Up @@ -168,6 +166,11 @@ def catalog(self) -> Catalog:

return self._catalog

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
super().setup_mapper()
self.mapper.register_raw_streams_from_catalog(self.catalog)

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
"""Get tap capabilities.
Expand Down Expand Up @@ -520,6 +523,7 @@ def cb_discover(
config=config_files, # type: ignore[arg-type]
parse_env_config=parse_env_config,
validate_config=False,
setup_mapper=False,
)
tap.run_discovery()
ctx.exit()
Expand Down
14 changes: 7 additions & 7 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase

if t.TYPE_CHECKING:
from pathlib import PurePath

from singer_sdk.mapper import PluginMapper
from singer_sdk.sinks import Sink

_MAX_PARALLELISM = 8
Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(
config: dict | PurePath | str | list[PurePath | str] | None = None,
parse_env_config: bool = False,
validate_config: bool = True,
setup_mapper: bool = True,
) -> None:
"""Initialize the target.

Expand All @@ -66,6 +67,7 @@ def __init__(
parse_env_config: Whether to look for configuration values in environment
variables.
validate_config: True to require validation of config settings.
setup_mapper: True to setup the mapper. Set to False if you want to
"""
super().__init__(
config=config,
Expand All @@ -82,12 +84,10 @@ def __init__(
# Approximated for max record age enforcement
self._last_full_drain_at: float = time.time()

# Initialize mapper
self.mapper: PluginMapper
self.mapper = PluginMapper(
plugin_config=dict(self.config),
logger=self.logger,
)
self._mapper: PluginMapper | None = None

if setup_mapper:
self.setup_mapper()

@classproperty
def capabilities(self) -> list[CapabilitiesEnum]:
Expand Down
14 changes: 13 additions & 1 deletion tests/core/test_plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import os
from unittest import mock

from singer_sdk.plugin_base import PluginBase
import pytest

from singer_sdk.plugin_base import MapperNotInitialized, PluginBase
from singer_sdk.typing import IntegerType, PropertiesList, Property, StringType


Expand Down Expand Up @@ -41,3 +43,13 @@ def test_get_env_var_config():
assert "PROP2" not in env_config
assert "prop3" not in no_env_config
assert "PROP3" not in env_config


def test_mapper_not_initialized():
"""Test that the mapper is not initialized before the plugin is started."""
plugin = PluginTest(
parse_env_config=False,
validate_config=False,
)
with pytest.raises(MapperNotInitialized):
_ = plugin.mapper