Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For SQL-based targets, add built-in handling for schema_mapping #1086

Open
aaronsteers opened this issue Oct 19, 2022 · 14 comments
Open

For SQL-based targets, add built-in handling for schema_mapping #1086

aaronsteers opened this issue Oct 19, 2022 · 14 comments
Labels
Accepting Pull Requests SQL Support for SQL taps and targets

Comments

@aaronsteers
Copy link
Contributor

As discussed in:

@aaronsteers aaronsteers changed the title For SQL-based taps, add built-in handling for "default load schema" and "schema mappings" For SQL-based targets, add built-in handling for "default load schema" and "schema mappings" Oct 19, 2022
@BuzzCutNorman
Copy link
Contributor

@aaronsteers what do you think about adding this one to the milestone SQL Target Out-of-Box Stability

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Oct 28, 2022

@BuzzCutNorman Good idea! Otherwise (at least without default_load_schema), certain expected behaviors do not work.

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Oct 28, 2022

In my comment here:

#1084 (reply in thread)

I lean towards keeping with pipelinewise precedent:

default_target_schema
schema_mappings

@aaronsteers aaronsteers changed the title For SQL-based targets, add built-in handling for "default load schema" and "schema mappings" For SQL-based targets, add built-in handling for default_target_schema and schema_mapping Oct 28, 2022
@BuzzCutNorman
Copy link
Contributor

@aaronsteers @kgpayne I just ran an issue when I bummped my target-mssql to SDK 0.13.0 where tap-postgres is sending a stream with public- as a prefix so the target tries to create the schema public. That is when I get the following error.

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42S01', "[42S01] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]There is already an object named 'public' in the database. (2714) (SQLExecDirectW); [42S01] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]CREATE SCHEMA failed due to previous errors. (2759)")

I was able to override the @property def schema_name(self) and set it to return a sting of dbo and it worked. I would now need to add the default_target_schema to the config and set the property to return it value if it is present.

WDTY? Good idea, ok idea not the best, or there be monsters!!!

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Nov 4, 2022

@BuzzCutNorman

I was able to override the @property def schema_name(self) and set it to return a sting of dbo and it worked. I would now need to add the default_target_schema to the config and set the property to return it value if it is present.

WDTY? Good idea, ok idea not the best, or there be monsters!!!

This is a great idea, and I think very similar to the approach we likely would have taken.

We discussed in a prior community call (sorry, I forget the date) that supporting default_target_schema is a much higher priority than schema_mapping - so we'd welcome a PR that leverages the logic you describe above to override any detected schema names in the stream IDs when default_target_schema is provided.

Then the logic would be:

  1. Use the schema name from default_target_schema if available.
  2. Otherwise, use the detected schema name from streams named like <schema_name>-<table_name>.
  3. Otherwise, attempt without a schema name (or use a reasonable default like dbo for SQL Server or the connection default for Snowflake).

The third bullet above is a bit more nebulous, and I don't think we necessarily have to solve that here.

@BuzzCutNorman
Copy link
Contributor

@aaronsteers I have placed an override / overload ??? of the property function schema_name in my target's Sink class and it is working well. This seems to accomplish all three items. The only thing it doesn't do is fix the error that happens if public is sent to a mssql target. This error occurs when the target tries to create the schema. You can get around the error by adding a default target schema. Should I try adding in code to avoid this issue?

Please take a look let me know what you think of it so far

    @property
    def schema_name(self) -> Optional[str]:
        """Return the schema name or `None` if using names with no schema part.

        Returns:
            The target schema name.
        """
        default_target_schema = self.config.get('default_target_schema', None)
        parts = self.stream_name.split("-")

        if default_target_schema:
            if self.connector.schema_exists(default_target_schema):
                return default_target_schema
            else:
                self.connector.create_schema(default_target_schema)
                return default_target_schema
        
        if len(parts) in {2, 3}:
            # Stream name is a two-part or three-part identifier.
            # Use the second-to-last part as the schema name.
            return self.conform_name(parts[-2], "schema")

        # Schema name not detected.
        return None

cc @kgpayne @edgarrmondragon

@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Nov 7, 2022

@BuzzCutNorman is the error caused by conflicting with the default public role in mssql?

If that's the case, and this public object is reasonably assumed to be present in all or most mssql deployments, then I'd agree about hardcoding a check for the public part in the target-mssql implementation. Otherwise, the above looks great as a default for the SDK.

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Nov 7, 2022

@BuzzCutNorman is the error caused by conflicting with the default public role in mssql?

@edgarrmondragon Yes I believe that is exactly why the error occurs. This is the error. [SQL Server]There is already an object named 'public' in the database. (2714)

@BuzzCutNorman
Copy link
Contributor

@edgarrmondragon I was able to get the public mssql correction in place. I also noticed I had the function doing more work than it needed to do. While testing I ran into a scenario where two streams were sent to the same table.

Below is the updated function and the tap select: that causes two streams to go into the same table if the default_target_schema is set.

    @property
    def schema_name(self) -> Optional[str]:
        """Return the schema name or `None` if using names with no schema part.

        Returns:
            The target schema name.
        """
        target_sqla_dialect = self.connection.engine.dialect.name
        default_target_schema = self.config.get('default_target_schema', None)
        parts = self.stream_name.split("-")

        if default_target_schema:
            return default_target_schema
        
        if len(parts) in {2, 3}:
            # Stream name is a two-part or three-part identifier.
            # Use the second-to-last part as the schema name.
            stream_schema = self.conform_name(parts[-2], "schema")
            
            if target_sqla_dialect == "mssql" and stream_schema == "public":
                return "dbo"
            else:
                return stream_schema

        # Schema name not detected.
        return None

tap select example:

    select:
    - public-badges.*
    - stuff-badges.*

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Nov 7, 2022

@edgarrmondragon I was looking to see what I needed to do to convert this to work in the SQLSink class. During my research I can across the tartget-schema capability listed in the SDK Documentation. Does this mean there is already something in place I should add my code to or is this a flag or marker to allow meltano to know what a tap can do ?

Would I need to add to below code to the capabilities.py file?

TARGET_SCHEMA_CONFIG = PropertiesList( 
    Property(
        "default_target_schema",
        ObjectType(),
        description="The Default schema to place all streams"
    ),
).to_dict()

or looking at "BATCH" there are @dataclass for the configs. Please let me know what the preferred method of adding new default configuration options to the SDK is?

@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Nov 8, 2022

During my research I can across the target-schema capability listed in the SDK Documentation. Does this mean there is already something in place I should add my code to or is this a flag or marker to allow meltano to know what a tap can do ?

@BuzzCutNorman Short answer is unfortunately no 😕. The capabilities are not tied to implementations of any kind, so taps or targets could report supporting them while they really don't and viceversa. We've been trying to figure out the best way to tie advertised capabilities to behavior.

I also noticed I had the function doing more work than it needed to do. While testing I ran into a scenario where two streams were sent to the same table.

Yeah, that's something we should try to prevent. I wonder if the wise targets handle this scenario or they have the same issue. It might make sense as a default behavior in case users want to merge streams coming from different source schemas. Either way it's probably a good test case for all SDK-based targets 🙂

cc @visch @kgpayne

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Nov 8, 2022

Just to add a couple notes here...


Re: declaration of the capability

During my research I can across the target-schema capability listed in the SDK Documentation. Does this mean there is already something in place I should add my code to or is this a flag or marker to allow meltano to know what a tap can do ?

@BuzzCutNorman Short answer is unfortunately no 😕.

That is correct. The SDK doesn't actually support it yet - but (to confirm) that is what this feature request would be delivering. Currently the feature is defined as:

Allow setting the target schema.

And post-completion, we may want to expand upon the definition a bit more:

Allow users to configure the target schema name when loading to a database target. The SDK's default implementation uses default_target_schema and schema_mapping config settings, as described in <docs link here>.


**Re: overriding capabilities definitions for SQLTarget class:

I think you can declare an override for the SQLTarget.capabilities property, inheriting from the base plugin class and adding TARGET_SCHEMA to the emitted list. (This will automatically ripple into --about --format=json output.)

For example of how to declare this, the base class has the following:

@classproperty
def capabilities(self) -> List[CapabilitiesEnum]:
"""Get capabilities.
Developers may override this property in oder to add or remove
advertised capabilities for this plugin.
Returns:
A list of plugin capabilities.
"""
return [
PluginCapabilities.STREAM_MAPS,
PluginCapabilities.FLATTENING,
]


Re: declaration of the config values:

Similarly, I think you can override the SQLTarget.append_builtin_config implementation to also append definitions for the new config settings, using a similar pattern. The advatage of this pattern is that (1) if someone forcibly overrides their SQLTarget.capabilites list to exclude the declared capability, or if they declare the settings themselves, then the corresponding settings entries wouldn't be auto-appended.

@classmethod
def append_builtin_config(cls: Type["PluginBase"], config_jsonschema: dict) -> None:
"""Appends built-in config to `config_jsonschema` if not already set.
To customize or disable this behavior, developers may either override this class
method or override the `capabilities` property to disabled any unwanted
built-in capabilities.
For all except very advanced use cases, we recommend leaving these
implementations "as-is", since this provides the most choice to users and is
the most "future proof" in terms of taking advantage of built-in capabilities
which may be added in the future.
Args:
config_jsonschema: [description]
"""
def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
# Append any missing properties in the target with those from source.
for k, v in source_jsonschema["properties"].items():
if k not in target_jsonschema["properties"]:
target_jsonschema["properties"][k] = v
capabilities = cls.capabilities
if PluginCapabilities.STREAM_MAPS in capabilities:
_merge_missing(STREAM_MAPS_CONFIG, config_jsonschema)
if PluginCapabilities.FLATTENING in capabilities:
_merge_missing(FLATTENING_CONFIG, config_jsonschema)


Re: dealing with multiple streams loading to the same target table

While testing I ran into a scenario where two streams were sent to the same table.

I've run into this as well on other taps in cases where multiple databases can be extracted from at once, and when two databases both have a table with the same table and schema name. This is incredibly hard to detect and debug if the records from both streams are allowed to insert into the same table. The only way I know of to resolve this is to keep a dictionary of all of the "seen" pairings of target table name and source stream_id - and throw an error if a different source stream ID results in the same calculated destination table. Presumably when this occurs we'd want to print an error to the user and fail the load:

ERROR. Multiple streams attempted to load to the same target table dbo.user: dbo-user and public-user.

Please deselect one of the source streams or override the target database schema using the default_target_schema and schema_mappings settings.

Note: While I do think the above would be a good default behavior, as of now I don't know where this would best live - since the name resolution operation exists within the individual SQLSink objects, which by definition are just sent data for a single stream. (Might be good for us to log as a follow-on issue, out of scope for this feature but worthwhile as a follow-on?)


Hope this helps! Thanks!

@BuzzCutNorman
Copy link
Contributor

BuzzCutNorman commented Nov 8, 2022

That is correct. The SDK doesn't actually support it yet - but (to confirm) that is what this feature request would be delivering. Currently the feature is defined as:

Allow setting the target schema.

Thanks for that, I was starting to worry I missed something and all I needed to do was find the TODO and follow the directions. 😮‍💨 So super simplistic summary TARGET_SCHEMA is like a Feature Flag in Meltano?

Thank you for the explanation of ways you can add a built-in capability config. 🙏😊

@BuzzCutNorman
Copy link
Contributor

@aaronsteers , @edgarrmondragon @kgpayne I just put in a draft PR for this feature. When you get a chance, please give it a look. PR #1157

@aaronsteers aaronsteers changed the title For SQL-based targets, add built-in handling for default_target_schema and schema_mapping For SQL-based targets, add built-in handling for schema_mapping Jan 24, 2023
@edgarrmondragon edgarrmondragon added the SQL Support for SQL taps and targets label Jul 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Accepting Pull Requests SQL Support for SQL taps and targets
Projects
Archived in project
Development

No branches or pull requests

3 participants