Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ignore Catalog Cache #8518

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/docs/concepts/plugins.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Extractors support the following [extras](/guide/configuration#plugin-extras):
- [`select`](#select-extra)
- [`select_filter`](#select-filter-extra)
- [`state`](#state-extra)
- [`no_catalog_cache`](#no-catalog-cache-extra)

#### `catalog` extra

Expand Down Expand Up @@ -631,6 +632,52 @@ export TAP_GITLAB__STATE=extract/tap-gitlab.state.json
</TabItem>
</Tabs>

#### <a name="no-catalog-cache-extra"></a>`no_catalog_cache` extra

- Setting: `_no_catalog_cache`
- [Environment variable](/guide/configuration#configuring-settings): `<EXTRACTOR>__NO_CATALOG_CACHE`, e.g. `TAP_GITLAB__NO_CATALOG_CACHE`
- Default: `False`

An extractor's `no_catalog_cache` [extra](/guide/configuration#plugin-extras) is a boolean flag that, when set to `True`, disables the use of a cached catalog file during the extractor's discovery process. By default, Meltano will cache the catalog file generated by an extractor to speed up subsequent runs. However, if the extractor's schema has changed in a way that would affect discovery output, you may want to bypass the cache to ensure the latest catalog is used.

Setting this extra to `True` forces the extractor to perform discovery and generate a new catalog file every time it runs, which can be useful during development or when an extractor supports dynamic catalog discovery, such as in [`tap-salesforce`](https://github.com/MeltanoLabs/tap-salesforce).

##### How to use

Manage this extra:

<Tabs className="meltano-tabs" queryString="meltano-tabs">
<TabItem className="meltano-tab-content" value="meltano.yml" label="meltano.yml" default>

```yaml
extractors:
- name: tap-gitlab
no_catalog_cache: true
```

</TabItem>
<TabItem className="meltano-tab-content" value="terminal" label="terminal">

```bash
meltano config <extractor> set _no_catalog_cache true

# For example:
meltano config tap-gitlab set _no_catalog_cache true
```

</TabItem>
<TabItem className="meltano-tab-content" value="env" label="env">

```bash
export <EXTRACTOR>__NO_CATALOG_CACHE=true

# For example:
export TAP_GITLAB__NO_CATALOG_CACHE=true
```

</TabItem>
</Tabs>

### Loaders

Loaders are [pip packages](https://pip.pypa.io/en/stable/) used by [`meltano elt`](/reference/command-line-interface#elt) as part of [data integration](/guide/integration).
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/reference/command-line-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ meltano --environment=<ENVIRONMENT> run tap-gitlab target-postgres
meltano run tap-gitlab one-mapping another-mapping target-postgres
meltano run tap-gitlab target-postgres simple-job
meltano run --state-id-suffix=<STATE_ID_SUFFIX> tap-gitlab target-postgres
meltano run --catalog-refresh tap-salesforce target-postgres
```

#### Parameters
Expand All @@ -983,6 +984,7 @@ meltano run --state-id-suffix=<STATE_ID_SUFFIX> tap-gitlab target-postgres
- `--state-id-suffix` define a custom suffix to generate a state ID with for each EL pair.
- `--merge-state` will merge state with that of previous runs. See the [example in the Meltano repository](https://github.com/meltano/meltano/blob/main/integration/example-library/meltano-run-merge-states/index.md).
- `--run-id` will use the provided UUID for the current run. This is useful when your workflow is managed by an external system and you want to track the run in Meltano.
- `--catalog-refresh` will force a refresh of the catalog, ignoring any existing cached catalog from previous runs.

Examples:

Expand Down
15 changes: 15 additions & 0 deletions src/meltano/cli/elt.py
andyoneal marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class ELOptions:
help="Perform a full refresh (ignore state left behind by any previous runs).",
is_flag=True,
)
catalog_refresh = click.option(
"--catalog-refresh",
help="Invalidates catalog cache and forces running discovery before this run.",
andyoneal marked this conversation as resolved.
Show resolved Hide resolved
is_flag=True,
)
select = click.option(
"--select",
"-s",
Expand Down Expand Up @@ -108,6 +113,7 @@ class ELOptions:
@ELOptions.loader
@ELOptions.dry
@ELOptions.full_refresh
@ELOptions.catalog_refresh
@ELOptions.select
@ELOptions.exclude
@ELOptions.catalog
Expand All @@ -126,6 +132,7 @@ async def el( # WPS408
loader: str,
dry: bool,
full_refresh: bool,
catalog_refresh: bool,
select: list[str],
exclude: list[str],
catalog: str,
Expand Down Expand Up @@ -153,6 +160,7 @@ async def el( # WPS408
None,
dry,
full_refresh,
catalog_refresh,
select,
exclude,
catalog,
Expand All @@ -174,6 +182,7 @@ async def el( # WPS408
@ELOptions.transform
@ELOptions.dry
@ELOptions.full_refresh
@ELOptions.catalog_refresh
@ELOptions.select
@ELOptions.exclude
@ELOptions.catalog
Expand All @@ -193,6 +202,7 @@ async def elt( # WPS408
transform: str,
dry: bool,
full_refresh: bool,
catalog_refresh: bool,
select: list[str],
exclude: list[str],
catalog: str,
Expand Down Expand Up @@ -221,6 +231,7 @@ async def elt( # WPS408
transform,
dry,
full_refresh,
catalog_refresh,
select,
exclude,
catalog,
Expand All @@ -240,6 +251,7 @@ async def _run_el_command(
transform: str | None,
dry: bool,
full_refresh: bool,
catalog_refresh: bool,
select: list[str],
exclude: list[str],
catalog: str,
Expand Down Expand Up @@ -285,6 +297,7 @@ async def _run_el_command(
transform,
dry_run=dry,
full_refresh=full_refresh,
catalog_refresh=catalog_refresh,
select_filter=select_filter,
catalog=catalog,
state=state,
Expand Down Expand Up @@ -313,6 +326,7 @@ def _elt_context_builder(
transform,
dry_run=False,
full_refresh=False,
catalog_refresh=False,
select_filter=None,
catalog=None,
state=None,
Expand All @@ -333,6 +347,7 @@ def _elt_context_builder(
.with_dry_run(dry_run)
.with_only_transform(transform == "only")
.with_full_refresh(full_refresh)
.with_catalog_refresh(catalog_refresh)
.with_select_filter(select_filter)
.with_catalog(catalog)
.with_state(state)
Expand Down
7 changes: 7 additions & 0 deletions src/meltano/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def convert(self, value, param, ctx):
),
is_flag=True,
)
@click.option(
"--catalog-refresh",
help="Invalidates catalog cache and forces running discovery before this run.",
is_flag=True,
)
@click.option(
"--no-state-update",
help="Run without state saving. Applies to all pipelines.",
Expand Down Expand Up @@ -101,6 +106,7 @@ async def run(
project: Project,
dry_run: bool,
full_refresh: bool,
catalog_refresh: bool,
no_state_update: bool,
force: bool,
state_id_suffix: str,
Expand Down Expand Up @@ -143,6 +149,7 @@ async def run(
project,
blocks,
full_refresh=full_refresh,
catalog_refresh=catalog_refresh,
no_state_update=no_state_update,
force=force,
state_id_suffix=state_id_suffix,
Expand Down
21 changes: 18 additions & 3 deletions src/meltano/cli/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def selection_mark(selection):
is_flag=True,
help="Show all the tap attributes with their selected status.",
)
@click.option(
"--catalog-refresh",
is_flag=True,
help="Invalidate the catalog cache and refresh the catalog.",
)
@click.option(
"--rm",
"--remove",
Expand Down Expand Up @@ -85,7 +90,12 @@ async def select(
"""
try:
if flags["list"]:
await show(project, extractor, show_all=flags["all"])
await show(
project,
extractor,
show_all=flags["all"],
refresh=flags["catalog_refresh"],
)
else:
update(
project,
Expand All @@ -112,13 +122,18 @@ def update(
select_service.update(entities_filter, attributes_filter, exclude, remove)


async def show(project, extractor, show_all=False):
async def show(
project: Project,
extractor: str,
show_all: bool = False,
refresh: bool = False,
) -> None:
"""Show selected."""
_, Session = project_engine(project) # noqa: N806
select_service = SelectService(project, extractor)

with closing(Session()) as session:
list_all = await select_service.list_all(session)
list_all = await select_service.list_all(session, refresh)

# legend
click.secho("Legend:")
Expand Down
17 changes: 17 additions & 0 deletions src/meltano/core/block/extract_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
session: Session | None = None,
job: Job | None = None,
full_refresh: bool | None = False,
catalog_refresh: bool | None = False,
force: bool | None = False,
update_state: bool | None = True,
state_id_suffix: str | None = None,
Expand All @@ -66,6 +67,7 @@ def __init__(
session: The session to use.
job: The job within this context should run.
full_refresh: Whether this is a full refresh.
catalog_refresh: whether cached catalog should be ignored.
force: Whether to force the execution of the job if it is stale.
update_state: Whether to update the state of the job.
state_id_suffix: The state ID suffix to use.
Expand All @@ -77,6 +79,7 @@ def __init__(
self.session = session
self.job = job
self.full_refresh = full_refresh
self.catalog_refresh = catalog_refresh
self.force = force
self.update_state = update_state
self.state_id_suffix = state_id_suffix
Expand Down Expand Up @@ -120,6 +123,7 @@ def __init__(self, project: Project):

self._job = None
self._full_refresh = False
self._catalog_refresh = False
self._state_update = True
self._force = False
self._state_id_suffix = None
Expand Down Expand Up @@ -167,6 +171,18 @@ def with_full_refresh(self, full_refresh: bool):
self._full_refresh = full_refresh
return self

def with_catalog_refresh(self, catalog_refresh: bool):
"""Set whether cached catalog should be ignored.

Args:
catalog_refresh : whether cached catalog should be ignored.

Returns:
self
"""
self._catalog_refresh = catalog_refresh
return self

def with_no_state_update(self, no_state_update: bool):
"""Set whether this run should not update state.

Expand Down Expand Up @@ -309,6 +325,7 @@ def context(self) -> ELBContext:
session=self.session,
job=self._job,
full_refresh=self._full_refresh,
catalog_refresh=self._catalog_refresh,
force=self._force,
update_state=self._state_update,
state_id_suffix=self._state_id_suffix,
Expand Down
4 changes: 4 additions & 0 deletions src/meltano/core/block/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
project,
blocks: list[str],
full_refresh: bool | None = False,
catalog_refresh: bool | None = False,
no_state_update: bool | None = False,
force: bool | None = False,
state_id_suffix: str | None = None,
Expand All @@ -87,6 +88,7 @@ def __init__(
blocks: List of block names to parse.
full_refresh: Whether to perform a full refresh (applies to all
found sets).
catalog_refresh: Whether to ignore cached catalog.
no_state_update: Whether to run with or without state updates.
force: Whether to force a run if a job is already running (applies
to all found sets).
Expand All @@ -101,6 +103,7 @@ def __init__(
self.project = project

self._full_refresh = full_refresh
self._catalog_refresh = catalog_refresh
self._no_state_update = no_state_update
self._force = force
self._state_id_suffix = state_id_suffix
Expand Down Expand Up @@ -246,6 +249,7 @@ def _find_next_elb_set( # noqa: WPS231, WPS213
ELBContextBuilder(self.project)
.with_force(self._force)
.with_full_refresh(self._full_refresh)
.with_catalog_refresh(self._catalog_refresh)
.with_no_state_update(self._no_state_update)
.with_state_id_suffix(self._state_id_suffix)
.with_merge_state(self._merge_state)
Expand Down