diff --git a/docs/docs/concepts/plugins.mdx b/docs/docs/concepts/plugins.mdx index 7bd15f34f5..93e68c2c22 100644 --- a/docs/docs/concepts/plugins.mdx +++ b/docs/docs/concepts/plugins.mdx @@ -154,6 +154,7 @@ Extractors support the following [extras](/guide/configuration#plugin-extras): - [`select`](#select-extra) - [`select_filter`](#select-filter-extra) - [`state`](#state-extra) +- [`no_catalog_cache`](#no-catalog-cache-extra) #### `catalog` extra @@ -631,6 +632,52 @@ export TAP_GITLAB__STATE=extract/tap-gitlab.state.json +#### `no_catalog_cache` extra + +- Setting: `_no_catalog_cache` +- [Environment variable](/guide/configuration#configuring-settings): `__NO_CATALOG_CACHE`, e.g. `TAP_GITLAB__NO_CATALOG_CACHE` +- Default: `False` + +An extractor's `no_catalog_cache` [extra](/guide/configuration#plugin-extras) is a boolean flag that, when set to `True`, disables the use of a cached catalog file during the extractor's discovery process. By default, Meltano will cache the catalog file generated by an extractor to speed up subsequent runs. However, if the extractor's schema has changed in a way that would affect discovery output, you may want to bypass the cache to ensure the latest catalog is used. + +Setting this extra to `True` forces the extractor to perform discovery and generate a new catalog file every time it runs, which can be useful during development or when an extractor supports dynamic catalog discovery, such as in [`tap-salesforce`](https://github.com/MeltanoLabs/tap-salesforce). + +##### How to use + +Manage this extra: + + + + +```yaml +extractors: +- name: tap-gitlab + no_catalog_cache: true +``` + + + + +```bash +meltano config set _no_catalog_cache true + +# For example: +meltano config tap-gitlab set _no_catalog_cache true +``` + + + + +```bash +export __NO_CATALOG_CACHE=true + +# For example: +export TAP_GITLAB__NO_CATALOG_CACHE=true +``` + + + + ### Loaders Loaders are [pip packages](https://pip.pypa.io/en/stable/) used by [`meltano elt`](/reference/command-line-interface#elt) as part of [data integration](/guide/integration). diff --git a/docs/docs/reference/command-line-interface.md b/docs/docs/reference/command-line-interface.md index d291623f02..8fd4680a6c 100644 --- a/docs/docs/reference/command-line-interface.md +++ b/docs/docs/reference/command-line-interface.md @@ -971,6 +971,7 @@ meltano --environment= run tap-gitlab target-postgres meltano run tap-gitlab one-mapping another-mapping target-postgres meltano run tap-gitlab target-postgres simple-job meltano run --state-id-suffix= tap-gitlab target-postgres +meltano run --catalog-refresh tap-salesforce target-postgres ``` #### Parameters @@ -985,6 +986,7 @@ meltano run --state-id-suffix= tap-gitlab target-postgres - `--state-id-suffix` define a custom suffix to generate a state ID with for each EL pair. - `--merge-state` will merge state with that of previous runs. See the [example in the Meltano repository](https://github.com/meltano/meltano/blob/main/integration/example-library/meltano-run-merge-states/index.md). - `--run-id` will use the provided UUID for the current run. This is useful when your workflow is managed by an external system and you want to track the run in Meltano. +- `--catalog-refresh` will force a refresh of the catalog, ignoring any existing cached catalog from previous runs. Examples: diff --git a/src/meltano/cli/elt.py b/src/meltano/cli/elt.py index 06d7061af9..7dc474b4e3 100644 --- a/src/meltano/cli/elt.py +++ b/src/meltano/cli/elt.py @@ -57,6 +57,11 @@ class ELOptions: help="Perform a full refresh (ignore state left behind by any previous runs).", is_flag=True, ) + catalog_refresh = click.option( + "--catalog-refresh", + help="Invalidates catalog cache and forces running discovery before this run.", + is_flag=True, + ) select = click.option( "--select", "-s", @@ -108,6 +113,7 @@ class ELOptions: @ELOptions.loader @ELOptions.dry @ELOptions.full_refresh +@ELOptions.catalog_refresh @ELOptions.select @ELOptions.exclude @ELOptions.catalog @@ -126,6 +132,7 @@ async def el( # WPS408 loader: str, dry: bool, full_refresh: bool, + catalog_refresh: bool, select: list[str], exclude: list[str], catalog: str, @@ -153,6 +160,7 @@ async def el( # WPS408 None, dry, full_refresh, + catalog_refresh, select, exclude, catalog, @@ -174,6 +182,7 @@ async def el( # WPS408 @ELOptions.transform @ELOptions.dry @ELOptions.full_refresh +@ELOptions.catalog_refresh @ELOptions.select @ELOptions.exclude @ELOptions.catalog @@ -193,6 +202,7 @@ async def elt( # WPS408 transform: str, dry: bool, full_refresh: bool, + catalog_refresh: bool, select: list[str], exclude: list[str], catalog: str, @@ -221,6 +231,7 @@ async def elt( # WPS408 transform, dry, full_refresh, + catalog_refresh, select, exclude, catalog, @@ -240,6 +251,7 @@ async def _run_el_command( transform: str | None, dry: bool, full_refresh: bool, + catalog_refresh: bool, select: list[str], exclude: list[str], catalog: str, @@ -285,6 +297,7 @@ async def _run_el_command( transform, dry_run=dry, full_refresh=full_refresh, + catalog_refresh=catalog_refresh, select_filter=select_filter, catalog=catalog, state=state, @@ -313,6 +326,7 @@ def _elt_context_builder( transform, dry_run=False, full_refresh=False, + catalog_refresh=False, select_filter=None, catalog=None, state=None, @@ -333,6 +347,7 @@ def _elt_context_builder( .with_dry_run(dry_run) .with_only_transform(transform == "only") .with_full_refresh(full_refresh) + .with_catalog_refresh(catalog_refresh) .with_select_filter(select_filter) .with_catalog(catalog) .with_state(state) diff --git a/src/meltano/cli/run.py b/src/meltano/cli/run.py index 4bd3aba289..d8ce29011c 100644 --- a/src/meltano/cli/run.py +++ b/src/meltano/cli/run.py @@ -61,6 +61,11 @@ def convert(self, value, param, ctx): ), is_flag=True, ) +@click.option( + "--catalog-refresh", + help="Invalidates catalog cache and forces running discovery before this run.", + is_flag=True, +) @click.option( "--no-state-update", help="Run without state saving. Applies to all pipelines.", @@ -101,6 +106,7 @@ async def run( project: Project, dry_run: bool, full_refresh: bool, + catalog_refresh: bool, no_state_update: bool, force: bool, state_id_suffix: str, @@ -143,6 +149,7 @@ async def run( project, blocks, full_refresh=full_refresh, + catalog_refresh=catalog_refresh, no_state_update=no_state_update, force=force, state_id_suffix=state_id_suffix, diff --git a/src/meltano/cli/select.py b/src/meltano/cli/select.py index 426a0035ed..b6171fa0fc 100644 --- a/src/meltano/cli/select.py +++ b/src/meltano/cli/select.py @@ -57,6 +57,11 @@ def selection_mark(selection): is_flag=True, help="Show all the tap attributes with their selected status.", ) +@click.option( + "--catalog-refresh", + is_flag=True, + help="Invalidate the catalog cache and refresh the catalog.", +) @click.option( "--rm", "--remove", @@ -85,7 +90,12 @@ async def select( """ try: if flags["list"]: - await show(project, extractor, show_all=flags["all"]) + await show( + project, + extractor, + show_all=flags["all"], + refresh=flags["catalog_refresh"], + ) else: update( project, @@ -112,13 +122,18 @@ def update( select_service.update(entities_filter, attributes_filter, exclude, remove) -async def show(project, extractor, show_all=False): +async def show( + project: Project, + extractor: str, + show_all: bool = False, + refresh: bool = False, +) -> None: """Show selected.""" _, Session = project_engine(project) # noqa: N806 select_service = SelectService(project, extractor) with closing(Session()) as session: - list_all = await select_service.list_all(session) + list_all = await select_service.list_all(session, refresh) # legend click.secho("Legend:") diff --git a/src/meltano/core/block/extract_load.py b/src/meltano/core/block/extract_load.py index af552c85aa..171fdd689c 100644 --- a/src/meltano/core/block/extract_load.py +++ b/src/meltano/core/block/extract_load.py @@ -52,6 +52,7 @@ def __init__( session: Session | None = None, job: Job | None = None, full_refresh: bool | None = False, + catalog_refresh: bool | None = False, force: bool | None = False, update_state: bool | None = True, state_id_suffix: str | None = None, @@ -66,6 +67,7 @@ def __init__( session: The session to use. job: The job within this context should run. full_refresh: Whether this is a full refresh. + catalog_refresh: whether cached catalog should be ignored. force: Whether to force the execution of the job if it is stale. update_state: Whether to update the state of the job. state_id_suffix: The state ID suffix to use. @@ -77,6 +79,7 @@ def __init__( self.session = session self.job = job self.full_refresh = full_refresh + self.catalog_refresh = catalog_refresh self.force = force self.update_state = update_state self.state_id_suffix = state_id_suffix @@ -120,6 +123,7 @@ def __init__(self, project: Project): self._job = None self._full_refresh = False + self._catalog_refresh = False self._state_update = True self._force = False self._state_id_suffix = None @@ -167,6 +171,18 @@ def with_full_refresh(self, full_refresh: bool): self._full_refresh = full_refresh return self + def with_catalog_refresh(self, catalog_refresh: bool): + """Set whether cached catalog should be ignored. + + Args: + catalog_refresh : whether cached catalog should be ignored. + + Returns: + self + """ + self._catalog_refresh = catalog_refresh + return self + def with_no_state_update(self, no_state_update: bool): """Set whether this run should not update state. @@ -309,6 +325,7 @@ def context(self) -> ELBContext: session=self.session, job=self._job, full_refresh=self._full_refresh, + catalog_refresh=self._catalog_refresh, force=self._force, update_state=self._state_update, state_id_suffix=self._state_id_suffix, diff --git a/src/meltano/core/block/parser.py b/src/meltano/core/block/parser.py index 716e43b1ed..de8afc9985 100644 --- a/src/meltano/core/block/parser.py +++ b/src/meltano/core/block/parser.py @@ -72,6 +72,7 @@ def __init__( project, blocks: list[str], full_refresh: bool | None = False, + catalog_refresh: bool | None = False, no_state_update: bool | None = False, force: bool | None = False, state_id_suffix: str | None = None, @@ -87,6 +88,7 @@ def __init__( blocks: List of block names to parse. full_refresh: Whether to perform a full refresh (applies to all found sets). + catalog_refresh: Whether to ignore cached catalog. no_state_update: Whether to run with or without state updates. force: Whether to force a run if a job is already running (applies to all found sets). @@ -101,6 +103,7 @@ def __init__( self.project = project self._full_refresh = full_refresh + self._catalog_refresh = catalog_refresh self._no_state_update = no_state_update self._force = force self._state_id_suffix = state_id_suffix @@ -246,6 +249,7 @@ def _find_next_elb_set( # noqa: WPS231, WPS213 ELBContextBuilder(self.project) .with_force(self._force) .with_full_refresh(self._full_refresh) + .with_catalog_refresh(self._catalog_refresh) .with_no_state_update(self._no_state_update) .with_state_id_suffix(self._state_id_suffix) .with_merge_state(self._merge_state) diff --git a/src/meltano/core/elt_context.py b/src/meltano/core/elt_context.py index cbdaea26ce..189f68a8f5 100644 --- a/src/meltano/core/elt_context.py +++ b/src/meltano/core/elt_context.py @@ -85,7 +85,7 @@ class ELTContext: # noqa: WPS230 def __init__( self, - project, + project: Project, job: Job | None = None, session=None, extractor: PluginContext | None = None, @@ -95,6 +95,7 @@ def __init__( only_transform: bool | None = False, dry_run: bool | None = False, full_refresh: bool | None = False, + catalog_refresh: bool | None = False, select_filter: list | None = None, catalog: str | None = None, state: str | None = None, @@ -114,6 +115,7 @@ def __init__( only_transform: Flag. Only run transform. dry_run: Flag. Don't actually run. full_refresh: Flag. Ignore previous captured state. + catalog_refresh: Flag. Ignore cached catalog. select_filter: Select filters to apply to extractor. catalog: Catalog to pass to extractor. state: State to pass to extractor. @@ -132,6 +134,7 @@ def __init__( self.only_transform = only_transform self.dry_run = dry_run self.full_refresh = full_refresh + self.catalog_refresh = catalog_refresh self.select_filter = select_filter or [] self.catalog = catalog self.state = state @@ -220,6 +223,7 @@ def __init__(self, project: Project): self._only_transform = False self._dry_run = False self._full_refresh = False + self._catalog_refresh = False self._select_filter = None self._catalog = None self._state = None @@ -340,6 +344,18 @@ def with_full_refresh(self, full_refresh: bool) -> ELTContextBuilder: self._full_refresh = full_refresh return self + def with_catalog_refresh(self, catalog_refresh: bool) -> ELTContextBuilder: + """Ignore cached catalog. + + Args: + catalog_refresh: Whether ignore cached catalog. + + Returns: + Updated ELTContextBuilder instance. + """ + self._catalog_refresh = catalog_refresh + return self + def with_merge_state(self, merge_state: bool): """Set whether the state is to be merged or overwritten. @@ -502,6 +518,7 @@ def context(self) -> ELTContext: only_transform=self._only_transform, dry_run=self._dry_run, full_refresh=self._full_refresh, + catalog_refresh=self._catalog_refresh, select_filter=self._select_filter, catalog=self._catalog, state=self._state, diff --git a/src/meltano/core/plugin/singer/tap.py b/src/meltano/core/plugin/singer/tap.py index fb0b2bdbeb..e08efeee4d 100644 --- a/src/meltano/core/plugin/singer/tap.py +++ b/src/meltano/core/plugin/singer/tap.py @@ -185,6 +185,9 @@ class SingerTap(SingerPlugin): # noqa: WPS214 value_processor="nest_object", ), SettingDefinition(name="_select_filter", kind=SettingKind.ARRAY, value=[]), + SettingDefinition( + name="_no_catalog_cache", kind=SettingKind.BOOLEAN, value=False + ), ] def exec_args(self, plugin_invoker): @@ -375,7 +378,17 @@ async def discover_catalog( # noqa: WPS231, WPS210, """ catalog_path = plugin_invoker.files["catalog"] catalog_cache_key_path = plugin_invoker.files["catalog_cache_key"] - if catalog_path.exists(): + elt_context = plugin_invoker.context + + use_catalog_cache = True + if ( + elt_context + and elt_context.catalog_refresh + or plugin_invoker.plugin_config_extras["_no_catalog_cache"] + ): + use_catalog_cache = False + + if catalog_path.exists() and use_catalog_cache: with suppress(FileNotFoundError): cached_key = catalog_cache_key_path.read_text() new_cache_key = self.catalog_cache_key(plugin_invoker) diff --git a/src/meltano/core/select_service.py b/src/meltano/core/select_service.py index 6b430c6cd6..991b4719fb 100644 --- a/src/meltano/core/select_service.py +++ b/src/meltano/core/select_service.py @@ -39,19 +39,22 @@ def current_select(self): plugin_settings_service = PluginSettingsService(self.project, self.extractor) return plugin_settings_service.get("_select") - async def load_catalog(self, session): + async def load_catalog(self, session, refresh=False): """Load the catalog.""" invoker = invoker_factory(self.project, self.extractor) + if refresh: + invoker.settings_service.config_override["_no_catalog_cache"] = True + async with invoker.prepared(session): catalog_json = await invoker.dump("catalog") return json.loads(catalog_json) - async def list_all(self, session) -> ListSelectedExecutor: + async def list_all(self, session, refresh=False) -> ListSelectedExecutor: """List all select.""" try: - catalog = await self.load_catalog(session) + catalog = await self.load_catalog(session, refresh) except FileNotFoundError as err: raise PluginExecutionError( "Could not find catalog. Verify that the tap supports discovery " # noqa: EM101 diff --git a/src/meltano/schemas/meltano.schema.json b/src/meltano/schemas/meltano.schema.json index e7da43799d..72d73d4f75 100644 --- a/src/meltano/schemas/meltano.schema.json +++ b/src/meltano/schemas/meltano.schema.json @@ -589,6 +589,11 @@ "type": "string" }, "default": [] + }, + "no_catalog_cache": { + "type": "boolean", + "description": "A boolean that determines if the catalog cache should be ignored.", + "default": false } } }, diff --git a/tests/fixtures/core.py b/tests/fixtures/core.py index 8d59c52b77..6268028d92 100644 --- a/tests/fixtures/core.py +++ b/tests/fixtures/core.py @@ -1067,7 +1067,12 @@ def discovery(): # noqa: WPS213 "kind": "object", "value": {"nested": "from_default"}, }, - {"name": "hidden", "kind": "hidden", "value": 42}, + { + "name": "hidden", + "kind": "integer", + "value": 42, + "hidden": True, + }, {"name": "boolean", "kind": "boolean"}, {"name": "auth.username"}, {"name": "auth.password", "sensitive": True}, diff --git a/tests/meltano/cli/test_select.py b/tests/meltano/cli/test_select.py index 92050ffce5..cc72b6f110 100644 --- a/tests/meltano/cli/test_select.py +++ b/tests/meltano/cli/test_select.py @@ -6,6 +6,12 @@ from asserts import assert_cli_runner from meltano.cli import cli +from meltano.core.plugin.singer.catalog import ( + ListSelectedExecutor, + SelectedNode, + SelectionType, +) +from meltano.core.select_service import SelectService class TestCliSelect: @@ -33,3 +39,40 @@ def test_update_select_pattern(self, cli_runner, tap): assert_cli_runner(result) json_config = json.loads(result.stdout) assert "mock.*" not in json_config["_select"] + + @pytest.mark.usefixtures("project") + def test_select_list(self, cli_runner, tap, monkeypatch: pytest.MonkeyPatch): + async def mock_list_all(*args, **kwargs): # noqa: ARG001 + result = ListSelectedExecutor() + result.streams = { + SelectedNode(key="users", selection=SelectionType.SELECTED) + } + result.properties = { + "users": { + SelectedNode(key="id", selection=SelectionType.SELECTED), + SelectedNode(key="name", selection=SelectionType.EXCLUDED), + } + } + return result + + monkeypatch.setattr( + SelectService, + "list_all", + mock_list_all, + ) + + # list selection + result = cli_runner.invoke( + cli, + [ + "--no-environment", + "select", + tap.name, + "--list", + "--all", + ], + ) + assert_cli_runner(result) + + assert "[selected ] users.id" in result.stdout + assert "[excluded ] users.name" in result.stdout diff --git a/tests/meltano/core/plugin/singer/test_tap.py b/tests/meltano/core/plugin/singer/test_tap.py index 8d2ee4aacd..4b408c92ef 100644 --- a/tests/meltano/core/plugin/singer/test_tap.py +++ b/tests/meltano/core/plugin/singer/test_tap.py @@ -216,6 +216,7 @@ async def test_discover_catalog( # noqa: WPS213 session, plugin_invoker_factory, subject, + monkeypatch, ): invoker = plugin_invoker_factory(subject) @@ -267,6 +268,33 @@ def mock_discovery(*args, **kwargs): # noqa: ARG001 assert json.loads(catalog_path.read_text()) == {"discovered": True} assert not catalog_cache_key_path.exists() + # Apply catalog rules to store the cache key again + subject.apply_catalog_rules(invoker) + assert catalog_cache_key_path.exists() + + monkeypatch.setitem( + invoker.settings_service.config_override, + "_no_catalog_cache", + True, + ) + + async with invoker.prepared(session): + with mock.patch.object( + SingerTap, + "run_discovery", + side_effect=mock_discovery, + ) as mocked_run_discovery: + assert catalog_cache_key_path.exists() + + # with _no_catalog_cache = true, discovery should be invoked + # again even with a stored cache key. + mocked_run_discovery.reset_mock() + await subject.discover_catalog(invoker) + + mocked_run_discovery.assert_called_once() + assert json.loads(catalog_path.read_text()) == {"discovered": True} + assert not catalog_cache_key_path.exists() + @pytest.mark.asyncio() async def test_discover_catalog_custom( self, diff --git a/tests/meltano/core/test_select_service.py b/tests/meltano/core/test_select_service.py new file mode 100644 index 0000000000..3462b0bfb3 --- /dev/null +++ b/tests/meltano/core/test_select_service.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +import typing as t +from collections import OrderedDict + +import pytest + +from meltano.core.plugin.singer.catalog import SelectedNode, SelectionType +from meltano.core.plugin.singer.tap import SingerTap +from meltano.core.select_service import SelectService + +if t.TYPE_CHECKING: + from sqlalchemy.orm.session import Session + + from meltano.core.project import Project + + +@pytest.mark.asyncio() +@pytest.mark.usefixtures("tap") +async def test_select_service_list_all( + project: Project, + session: Session, + monkeypatch: pytest.MonkeyPatch, +): + catalog = { + "streams": [ + { + "stream": "users", + "tap_stream_id": "users", + "metadata": [ + {"breadcrumb": [], "metadata": {"selected": True}}, + ], + "schema": { + "properties": { + "id": {"type": "integer"}, + } + }, + } + ] + } + extractor = "tap-mock" + service = SelectService(project, extractor) + + async def mock_run_discovery(tap, plugin_invoker, catalog_path): # noqa: ARG001 + with catalog_path.open("w") as catalog_file: + json.dump(catalog, catalog_file) + + # Mock tap's run_discovery method + monkeypatch.setattr( + SingerTap, + "run_discovery", + mock_run_discovery, + ) + + list_all = await service.list_all(session, refresh=False) + assert list_all.streams == { + SelectedNode( + key="users", + selection=SelectionType.SELECTED, + ) + } + assert list_all.properties == OrderedDict( + { + "users": { + SelectedNode( + key="id", + selection=SelectionType.AUTOMATIC, + ), + } + } + ) + + # Update the catalog to include a new property + catalog["streams"][0]["schema"]["properties"]["name"] = {"type": "string"} # noqa: WPS219 + + # Without refreshing the catalog, the new property should not be included + list_all = await service.list_all(session, refresh=False) + assert list_all.properties == OrderedDict( + { + "users": { + SelectedNode( + key="id", + selection=SelectionType.AUTOMATIC, + ), + } + } + ) + + # Refreshing the catalog should include the new property + list_all = await service.list_all(session, refresh=True) + assert list_all.properties == OrderedDict( + { + "users": { + SelectedNode( + key="id", + selection=SelectionType.AUTOMATIC, + ), + SelectedNode( + key="name", + selection=SelectionType.AUTOMATIC, + ), + } + } + )