Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: standardise per-stream config approach #1350

Open
kgpayne opened this issue Jan 25, 2023 · 12 comments
Open

feat: standardise per-stream config approach #1350

kgpayne opened this issue Jan 25, 2023 · 12 comments
Labels
kind/Feature New feature or request valuestream/SDK
Milestone

Comments

@kgpayne
Copy link
Contributor

kgpayne commented Jan 25, 2023

Feature scope

Configuration (settings parsing, validation, etc.)

Description

We have at least two open issues for features requiring per-stream configuration:

It strikes me that we may want to standardise our approach for specifying per-stream config, and leverage the same pattern-matching functions for all per-stream config. Something like:

# config in yaml for readability
config:
  start_date: "2010-01-01T00:00:00Z"
  streams:
    "*":
      ignore: true
    "my_stream_name":
      ignore: false
      max_records_limit: 1000
    "some_stream_prefix_*":
      ignore: false

The above would have the following effect:

  • ignore/exclude all streams
  • explicitly include stream "my_stream_name", setting its max_records_limit to 1000
  • explicitly include streams beginning with "some_stream_prefix_"
@kgpayne
Copy link
Contributor Author

kgpayne commented Jan 25, 2023

@aaronsteers @pnadolny13 FYI as you opened those two issues 🙂

@pnadolny13
Copy link
Contributor

pnadolny13 commented Jan 26, 2023

@kgpayne I like this idea!

One thing to consider though is how to cleanly override these settings. For me I like to set the streams in the base plugin configuration so theyre shared by all environments, then override a config settings like max_records_limit in the environment config i.e. the cicd environment. The way that it seems meltano works right now is that if I add select criteria to an environment it treated as "instead" vs "in addition to", so theres no way to say "keep my base selection criteria but also add this extra max_records_limit setting to each".

So the challenge I see with the approach you suggested is that if I have complex stream selection at the base level then I need to copy/paste it into my environment config to keep the same stream config but override the max_records_limit (see example below). For me I'm hestitant to do that because I risk having my stream selection in non-prod environments differ from prod. Does that make sense?

I'm not sure what the solution is though, maybe having a way to treat environment select criteria as additional criteria. In a lot of implementations we had ways to manipulate selection in the catalog.json and config.json, so the config could be used in meltano environments to achieve this goal.

Here's an example of what I'm trying to describe. To keep the same selection criteria and add max_records_limit I need to copy/paste it.

plugins:
  extractors:
  - name: tap-gitlab
    variant: meltanolabs
    pip_url: git+https://github.com/meltanolabs/tap-gitlab.git
    select:
      projects.*:
      merge_requests.*:
      issues.*:
      '!issues.description':
      '!issues.title':
      '!merge_requests.description':
      '!merge_requests.title':
environments:
- name: userdev
  config:
    plugins:
      extractors:
      - name: tap-gitlab
        config:
          groups: meltano
          start_date: '2020-01-01T00:00:00Z'
        select:
          projects.*:
            max_records_limit: 1000
          merge_requests.*:
            max_records_limit: 1000
          issues.*:
            max_records_limit: 1000
          '!issues.description':
            max_records_limit: 1000
          '!issues.title':
            max_records_limit: 1000
          '!merge_requests.description':
            max_records_limit: 1000
          '!merge_requests.title':
            max_records_limit: 1000

On the other hand something like this would end up ignoring all the selection rules defined and overriding them with *.

plugins:
  extractors:
  - name: tap-gitlab
    variant: meltanolabs
    pip_url: git+https://github.com/meltanolabs/tap-gitlab.git
    select:
      projects.*:
      merge_requests.*:
      issues.*:
      '!issues.description':
      '!issues.title':
      '!merge_requests.description':
      '!merge_requests.title':
environments:
- name: userdev
  config:
    plugins:
      extractors:
      - name: tap-gitlab
        config:
          groups: meltano
          start_date: '2020-01-01T00:00:00Z'
        select:
          '*':
            max_records_limit: 1000

@kgpayne
Copy link
Contributor Author

kgpayne commented Jan 27, 2023

@pnadolny13 thanks for your feedback 🙌 Just to clarify - this would be a new standard config format/feature in the sdk, not a change to the select feature in Meltano (though the two look and behave quite similarly). Therefore these options would fall under the config part of your Meltano examples, not select. I.e.

plugins:
  extractors:
  - name: tap-gitlab
    variant: meltanolabs
    pip_url: git+https://github.com/meltanolabs/tap-gitlab.git
    select:
      # rules as normal
    config:
      streams:
        "*":
          ignore: true
environments:
- name: userdev
  config:
    plugins:
      extractors:
      - name: tap-gitlab
        config:
          groups: meltano
          start_date: '2020-01-01T00:00:00Z'
          streams:
            "*":
              max_records_limit: 100
            "my_selected_stream_prefix_*":
              ignore: false

So I would expect Meltano to do a merge in the example above, with the resulting config for userdev being:

config:
  groups: meltano
  start_date: '2020-01-01T00:00:00Z'
  streams:
    "*":
      ignore: true
      max_records_limit: 100
    "my_selected_stream_prefix_*":
      ignore: false

This is maybe still somewhat confusing in that "my_selected_stream_prefix_*" streams would inherit max_records_limit: 100 from "*", but I think that is what most people would expect the behaviour to be 😅 I know this also raises questions about the use of ignore in config vs. selection in Meltano, which @aaronsteers covers well in the issue for the ignore feature #1240 🙂

It is also worth saying that implementing ignore (or include/exclude) is not a prerequisite for us to decide on a structure and selection syntax for configuring individual streams, which is the topic of this issue. I think it worth agreeing that having a standardised mechanism to configure individual streams (catering for select-style pattern matching, for the cases when stream names are not fixed) is important as an enabler for other features like ignore and max_records_limit down the line. Does that make sense?

@pnadolny13
Copy link
Contributor

pnadolny13 commented Jan 27, 2023

@kgpayne ahh ok I misunderstood how this proposal would work, I was thinking this was a change to select itself but should have known because this is the SDK repo 🙄 , thanks for explaining more! I do think this solves the use cases that I have then.

@aaronsteers
Copy link
Contributor

aaronsteers commented Feb 2, 2023

@kgpayne - A few things here...

First of all, I do think it's potentially valuable to allow users to override specific aspects of the tap's config at an individual stream level.

But I think the preferred implementation would be (1) that the stream-level config be a subset of the tap-level config, and (2) that we do not support wildcards in this implementation. Wildcards create a difficult-to-debug situation at best, and an impossible-to-debug situation at worse - due to the issues of precedence order and a domain of stream names that may not be known to the user at config-time. Whereas the ignore patterns approach described in #1240 would be confirmable in the produced catalog prior to sync - the config application needs to be exactly correct before the sync operation runs, and I think the best way to ensure predictable results would be to require explicit stream names.

Explicit stream names also allow the tap to print a hard error message if the stream name does not apply to any known streams in the catalog - and print a simple warning message if rules apply to streams that are simply deselected. Config applied to a deselected stream may be valid for the tap overall but not in this invocation, whereas config that applies to streams that aren't in the catalog is almost certainly a typo.

Possible implementation

I think one reasonably simple implementation path here is to simply add a new Tap.stream_level_config_jsonschema class property on Tap classes that is parallel to Tap.config_jsonschema. This new stream level config property could be a copy-pasted subset from Tap.config_jsonschema - but importantly would be limited to those config options that the tap can accept at the stream level.

Examples of properties which could work at the stream level:

  • Flattening level - allowing custom flattening logic per stream
  • Start date - allowing different streams to start in different places
  • Base API - if the tap can support it, allow some streams to use a beta REST API endpoint, for instance
  • Dry run (per Consider exposing config like dry_run_record_limit  #1366)
  • User agent - Not hugely valuable, but in theory, individual streams could pass a different user agent string.

Examples of properties which likely would not work at the stream level:

  • Database name (since discovery happens only at the tap level)
  • Schema name (since discovery happens only at the tap level)
  • Stream maps - just because the stream map is already stream-level, and another stream-level means of declaring maps would be redundant and (more importantly) would create conflicts if declared in both places.

Caveats / Implementation Challenges

Other points:

  1. At least for a first implementation, I'd like to keep wildcards and ignore patterns out of scope.
    1. Wildcards create difficult or impossible-to-debug scenarios where the same config values would be overridden by multiple matching entries. The intersection of these wildcards would not be debuggable by the user, since they are provided at runtime and immediately applied to the invoked sync operation.
    2. Ignore patterns already have a spec in Follow-up from SQLStreams: 'only_schemas' and/or 'ignore_schemas' config option #290 and Feature: Accept ignore glob patterns as standard tap config #1240 stream. And presumably, these could not be ignored in the way described in Feature: Accept ignore glob patterns as standard tap config #1240, since stream config is applied after discovery. We could do something like disabled: True at the stream config level - but then we have three very-similar concepts: ignoring (prevents discovery and also prevents sync), deselecting (prevents sync, applied to catalog), disabling (same as deselection but applied to config). For the relative complexity involved, I'd be hesitant to introduce a synonym to deselection in this way - especially given that the ignored spec in Feature: Accept ignore glob patterns as standard tap config #1240 does already cover disabling while also handling discovery optimization and wildcards.
  2. I don't think max_records_limit should be a config option as I note in Consider exposing config like dry_run_record_limit  #1366 and bug: default implementations of RESTStream (and others) do not adhere to _MAX_RECORDS_LIMIT #1349. As noted in those issues, the record limit is non-viable for production use cases due primarily to the reality of non-sorted streams, which go into an infinite loop if we apply a record limit. An alternative would be to call it dry_run_max_record_limit or simply dry_run as a boolean and/or int value. This could be handled by Consider exposing config like dry_run_record_limit  #1366 first, and then extended to be allowed in the stream-level config in a future version.

@kgpayne
Copy link
Contributor Author

kgpayne commented Feb 6, 2023

@aaronsteers thanks for writing this up 🙏 Your examples in particular include ones I wasn't aware of!

I am on board with not supporting patterns/wild cards, ignore patters or max_records_limit in the first iteration, for simplicity and in the interest of getting stream-level config out. Adding pattern support in a future iteration would not be a breaking change, provided stream names don't include pattern chars 😅 However I want to raise 2 points for future reference:

  1. We can readily solve the "Wildcards create difficult or impossible-to-debug scenarios..." by providing a means to write out the compiled/dynamic config, just as we support exporting the 'dynamic' discovered catalog using --discover. Supporting wildcards doesn't exclude users from directly naming every stream, to save having to think and reason about how the rules they are formulating will apply, but it does give them the extra power for circumstances that require broad pattern-based config (especially SQL-based Taps). We expect Meltano users to think about their select rules, and provide them the same mechanism for debugging (dumping the catalog) and it works well.
  2. @pnadolny13 may have strong feelings on this too, but in my view not supporting wildcards/patterns effectively excludes SQL-based Taps from the benefits of per-stream config, i) because of the volume of manual work in maintaining the list of configured streams as tables and schemas are added/removed (especially given that stream_name != table_name) and ii) because of the sheer size of the config dictionary produced if a user does choose to individually configure every stream for a medium-sized database.

Re: max_records_limit, I responded directly in the issue here.

@robby-rob-slalom
Copy link

@aaronsteers

I have been running with an idea that you tossed out in Slack for a per-stream approach. These are the components I have incorporated into a few taps to get this working:

  • Added a stream_config property to the config_jsonschema
#tap.py

class TapName(Tap):
    config_jsonschema = th.PropertiesList( 
        th.Property(
            "stream_config",
            th.ArrayType(
                th.PropertiesList(
                    th.Property(
                        "stream",
                        th.StringType,
                        required=True,
                        description="Name of stream to apply a custom configuration.",
                    ),
                    th.Property(
                        "parameters",
                        th.StringType,
                        description="URL formatted parameters string to be used for stream.",
                    ),
                )
            ),
            description="Custom configuration for streams.",
        )
   )
  • Added a function to get the stream config to the client stream
# client.py
class TapNameStream(RESTStream):
    def get_stream_config(self) -> dict:
        """Get parameters set in config."""
        config = {}

        stream_configs = self.config.get("stream_config", [])
        if not stream_configs:
            return config

        config_list = [
            conf for conf in stream_configs if conf.get("stream", "") == self.name
        ] or [None]
        config_dict = config_list[-1] or {}
        stream_config = {k: v for k, v in config_dict.items() if k != "stream"}
        return stream_config

Now I can use the config on a per-stream basis like in this example:

# client.py
from urllib.parse import parse_qsl

class TapNameStream(RESTStream):
    def get_stream_params(self) -> dict:
        stream_params = self.get_stream_config().get("parameters", "")
        return {qry[0]: qry[1] for qry in parse_qsl(stream_params.lstrip("?"))}


    def get_url_params(
        self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> Dict[str, Any]:
        return self.get_stream_params()

This can most likely be cleaned up but otherwise works great, especially for testing during development.

@aaronsteers
Copy link
Contributor

aaronsteers commented May 12, 2023

@robby-rob-slalom - Fantastic news. Thanks for sharing this! 🚀

Any reason you chose stream configs as a list/array, rather than as a map with the stream name as map key?

@aaronsteers
Copy link
Contributor

aaronsteers commented May 12, 2023

If we were going to make this part of the default built-in SDK handling, we could add an overridable Stream.get_stream_config() to perform those overrides in the stream's constructor, when config for the stream is initialized.

Instead of what we have currently in Stream.__init__():

        self._config: dict = dict(tap.config)

That might be replaced with something like this:

        self._config: dict = self.get_stream_config(tap_config=tap.config)

And your get_stream_config() would almost work as-is, except accept a tap's config dict as input and return a cleaned/validated dict with those replacements already performed.

If the exact form of the stream-level overrides were still up in the air, or to be left up to the developer, we could keep status quo behavior by simply returning tap_config directly from get_stream_config() with no changes.

What's nice about making this built-in, is that everywhere that the Stream class accesses it's config, it would always get the version that contains overrides. From the perspective of the Stream class operations, it wouldn't know or care (post-initialization) whether the setting was from the tap or from custom overrides.

cc @edgarrmondragon, @kgpayne for their thoughts as well.

@robby-rob-slalom
Copy link

@aaronsteers

IIRC I made it a list to get around the schema validation and having to define each stream as a key with a "parameters" property. I'm open to a better way to go about it.

@edgarrmondragon
Copy link
Collaborator

IIRC I made it a list to get around the schema validation and having to define each stream as a key with a "parameters" property. I'm open to a better way to go about it.

@robby-rob-slalom maybe additional_properties:

h.PropertiesList(
    th.Property(
        "stream_config",
        th.ObjectType(
            additional_properties=th.ObjectType(
                th.Property(
                    "parameters",
                    th.StringType,
                    description="URL formatted parameters string to be used for stream.",
                ),
            ),
        ),
        description="Custom configuration for streams.",
    ),
)

@aaronsteers

And your get_stream_config() would almost work as-is, except accept a tap's config dict as input and return a cleaned/validated dict with those replacements already performed.

I'm still not sure how developers would handle custom stream-level settings that extend built-in ones (e.g. flattening, etc.)

@robby-rob-slalom
Copy link

@robby-rob-slalom maybe additional_properties:

Thanks @edgarrmondragon! That reduced the get_stream_config down quite a bit:

# client.py
# class TapNameStream(RESTStream):
    def get_stream_config(self) -> dict:
        """Get config for stream."""
        stream_configs = self.config.get("stream_config", {})
        return stream_configs.get(self.name, {})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/Feature New feature or request valuestream/SDK
Projects
None yet
Development

No branches or pull requests

5 participants