From f64b2e07a12de6cbe308cd68e337ff8c64d63e03 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 17:12:52 +0000 Subject: [PATCH 01/10] When the user doesn't pass a connection string to `sgr mount` (used for PG/MySQL connectors), don't set credentials to None (omit them instead). --- splitgraph/core/output.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitgraph/core/output.py b/splitgraph/core/output.py index a6a0f980..2d9064a2 100644 --- a/splitgraph/core/output.py +++ b/splitgraph/core/output.py @@ -79,7 +79,7 @@ def conn_string_to_dict(connection: Optional[str]) -> Dict[str, Any]: password=match.group(3), ) else: - return dict(server=None, port=None, username=None, password=None) + return {} def parse_dt(string: str) -> datetime: From c21a56460a30f603f0525455a4440e5b87e01f0e Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:00:42 +0000 Subject: [PATCH 02/10] Use the credentials specified in the data source's JSON schema when extracting them from the JSON params in sgr mount (rather than just username/pw). --- splitgraph/hooks/data_source/fdw.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/splitgraph/hooks/data_source/fdw.py b/splitgraph/hooks/data_source/fdw.py index ff992e44..9c63694a 100644 --- a/splitgraph/hooks/data_source/fdw.py +++ b/splitgraph/hooks/data_source/fdw.py @@ -80,10 +80,10 @@ def from_commandline(cls, engine, commandline_kwargs) -> "ForeignDataWrapperData else: tables = None - credentials = { - "username": params.pop("username"), - "password": params.pop("password"), - } + credentials: Dict[str, Any] = {} + for k in cast(Dict[str, Any], cls.credentials_schema["properties"]).keys(): + if k in params: + credentials[k] = params[k] result = cls(engine, credentials, params) result.tables = tables return result From ab042c334729a55ced7f846c9158a0c4752c43a0 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:00:58 +0000 Subject: [PATCH 03/10] Bump Multicorn to pick up the sqlalchemy_fdw fixes --- engine/src/Multicorn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/src/Multicorn b/engine/src/Multicorn index 5a7b00c8..0a39ac3a 160000 --- a/engine/src/Multicorn +++ b/engine/src/Multicorn @@ -1 +1 @@ -Subproject commit 5a7b00c823adcdeebcc8ef21589940b58c1e0243 +Subproject commit 0a39ac3a84fbcb8a8ca460f0a6f0070b8eb731ae From b0184888a7381462847587df5a7f3b1da75877c1 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:01:47 +0000 Subject: [PATCH 04/10] Mode `dedupe_sg_schema` out from Socrata into common ingestion routines (isn't specific to it) --- splitgraph/ingestion/common.py | 55 +++++++++++++++++++++-- splitgraph/ingestion/socrata/querying.py | 50 +-------------------- test/splitgraph/ingestion/test_socrata.py | 2 +- 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/splitgraph/ingestion/common.py b/splitgraph/ingestion/common.py index e559c9c8..ba8d683c 100644 --- a/splitgraph/ingestion/common.py +++ b/splitgraph/ingestion/common.py @@ -1,11 +1,12 @@ from abc import abstractmethod -from typing import Optional, Union +from typing import Optional, Union, Dict, List, Tuple from psycopg2.sql import SQL, Identifier from splitgraph.core.image import Image from splitgraph.core.repository import Repository -from splitgraph.core.types import TableSchema +from splitgraph.core.sql import POSTGRES_MAX_IDENTIFIER +from splitgraph.core.types import TableSchema, TableColumn from splitgraph.engine.postgres.engine import PsycopgEngine from splitgraph.exceptions import CheckoutError @@ -99,7 +100,7 @@ def to_table( if_exists: str = "patch", schema_check: bool = True, no_header: bool = False, - **kwargs + **kwargs, ): tmp_schema = repository.to_schema() @@ -170,7 +171,7 @@ def to_data( image: Optional[Union[Image, str]] = None, repository: Optional[Repository] = None, use_lq: bool = False, - **kwargs + **kwargs, ): if image is None: if repository is None: @@ -195,3 +196,49 @@ def to_data( # (won't download objects unless needed). with image.query_schema() as tmp_schema: return self.query_to_data(image.engine, query, tmp_schema, **kwargs) + + +def dedupe_sg_schema(schema_spec: TableSchema, prefix_len: int = 59) -> TableSchema: + """ + Some foreign schemas have columns that are longer than 63 characters + where the first 63 characters are the same between several columns + (e.g. odn.data.socrata.com). This routine renames columns in a schema + to make sure this can't happen (by giving duplicates a number suffix). + """ + + # We truncate the column name to 59 to leave space for the underscore + # and 3 digits (max PG identifier is 63 chars) + prefix_counts: Dict[str, int] = {} + columns_nums: List[Tuple[str, int]] = [] + + for column in schema_spec: + column_short = column.name[:prefix_len] + count = prefix_counts.get(column_short, 0) + columns_nums.append((column_short, count)) + prefix_counts[column_short] = count + 1 + + result = [] + for (_, position), column in zip(columns_nums, schema_spec): + column_short = column.name[:prefix_len] + count = prefix_counts[column_short] + if count > 1: + result.append( + TableColumn( + column.ordinal, + f"{column_short}_{position:03d}", + column.pg_type, + column.is_pk, + column.comment, + ) + ) + else: + result.append( + TableColumn( + column.ordinal, + column.name[:POSTGRES_MAX_IDENTIFIER], + column.pg_type, + column.is_pk, + column.comment, + ) + ) + return result diff --git a/splitgraph/ingestion/socrata/querying.py b/splitgraph/ingestion/socrata/querying.py index 3fdab99b..42ede2f5 100644 --- a/splitgraph/ingestion/socrata/querying.py +++ b/splitgraph/ingestion/socrata/querying.py @@ -1,7 +1,7 @@ -from typing import Dict, Any, List, Tuple, Optional +from typing import Dict, Any, Tuple, Optional -from splitgraph.core.sql import POSTGRES_MAX_IDENTIFIER from splitgraph.core.types import TableSchema, TableColumn +from splitgraph.ingestion.common import dedupe_sg_schema try: from multicorn import ANY @@ -39,52 +39,6 @@ def _socrata_to_pg_type(socrata_type): return "text" -def dedupe_sg_schema(schema_spec: TableSchema, prefix_len: int = 59) -> TableSchema: - """ - Some Socrata schemas have columns that are longer than 63 characters - where the first 63 characters are the same between several columns - (e.g. odn.data.socrata.com). This routine renames columns in a schema - to make sure this can't happen (by giving duplicates a number suffix). - """ - - # We truncate the column name to 59 to leave space for the underscore - # and 3 digits (max PG identifier is 63 chars) - prefix_counts: Dict[str, int] = {} - columns_nums: List[Tuple[str, int]] = [] - - for column in schema_spec: - column_short = column.name[:prefix_len] - count = prefix_counts.get(column_short, 0) - columns_nums.append((column_short, count)) - prefix_counts[column_short] = count + 1 - - result = [] - for (_, position), column in zip(columns_nums, schema_spec): - column_short = column.name[:prefix_len] - count = prefix_counts[column_short] - if count > 1: - result.append( - TableColumn( - column.ordinal, - f"{column_short}_{position:03d}", - column.pg_type, - column.is_pk, - column.comment, - ) - ) - else: - result.append( - TableColumn( - column.ordinal, - column.name[:POSTGRES_MAX_IDENTIFIER], - column.pg_type, - column.is_pk, - column.comment, - ) - ) - return result - - def socrata_to_sg_schema(metadata: Dict[str, Any]) -> Tuple[TableSchema, Dict[str, str]]: try: col_names = metadata["resource"]["columns_field_name"] diff --git a/test/splitgraph/ingestion/test_socrata.py b/test/splitgraph/ingestion/test_socrata.py index e4c11f96..3888d099 100644 --- a/test/splitgraph/ingestion/test_socrata.py +++ b/test/splitgraph/ingestion/test_socrata.py @@ -18,8 +18,8 @@ cols_to_socrata, sortkeys_to_socrata, _socrata_to_pg_type, - dedupe_sg_schema, ) +from splitgraph.ingestion.common import dedupe_sg_schema class Q: From 2adaac14923fad3782736bf8893f5330d497cfcb Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:36:11 +0000 Subject: [PATCH 05/10] Get commandline help to partially autogenerate from the JSONSchema --- splitgraph/ingestion/common.py | 24 ++++++++++++++++++++++++ splitgraph/ingestion/csv/__init__.py | 23 +++++------------------ 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/splitgraph/ingestion/common.py b/splitgraph/ingestion/common.py index ba8d683c..6dd50741 100644 --- a/splitgraph/ingestion/common.py +++ b/splitgraph/ingestion/common.py @@ -242,3 +242,27 @@ def dedupe_sg_schema(schema_spec: TableSchema, prefix_len: int = 59) -> TableSch ) ) return result + + +def _format_jsonschema(prop, schema, required): + if prop == "tables": + return """tables: Tables to mount (default all). If a list, will import only these tables. +If a dictionary, must have the format + {"table_name": {"schema": {"col_1": "type_1", ...}, + "options": {[get passed to CREATE FOREIGN TABLE]}}}.""" + parts = [f"{prop}:"] + if "description" in schema: + parts.append(schema["description"]) + if parts[-1][-1] != ".": + parts[-1] += "." + + if prop in required: + parts.append("Required.") + return " ".join(parts) + + +def build_commandline_help(json_schema): + required = json_schema.get("required", []) + return "\n".join( + _format_jsonschema(p, pd, required) for p, pd in json_schema["properties"].items() + ) diff --git a/splitgraph/ingestion/csv/__init__.py b/splitgraph/ingestion/csv/__init__.py index f3b4abbd..0ea5e62f 100644 --- a/splitgraph/ingestion/csv/__init__.py +++ b/splitgraph/ingestion/csv/__init__.py @@ -4,7 +4,7 @@ from psycopg2.sql import SQL, Identifier from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource -from splitgraph.ingestion.common import IngestionAdapter +from splitgraph.ingestion.common import IngestionAdapter, build_commandline_help if TYPE_CHECKING: from splitgraph.engine.postgres.engine import PsycopgEngine @@ -119,6 +119,7 @@ class CSVDataSource(ForeignDataWrapperDataSource): For example: +\b ``` sgr mount csv target_schema -o@- < Date: Thu, 11 Mar 2021 18:40:46 +0000 Subject: [PATCH 06/10] Add descriptions to Postgres data source's JSONSchema --- splitgraph/hooks/data_source/fdw.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/splitgraph/hooks/data_source/fdw.py b/splitgraph/hooks/data_source/fdw.py index 9c63694a..29139143 100644 --- a/splitgraph/hooks/data_source/fdw.py +++ b/splitgraph/hooks/data_source/fdw.py @@ -345,10 +345,10 @@ def get_description(cls) -> str: params_schema = { "type": "object", "properties": { - "host": {"type": "string"}, - "port": {"type": "integer"}, - "dbname": {"type": "string"}, - "remote_schema": {"type": "string"}, + "host": {"type": "string", "description": "Remote hostname"}, + "port": {"type": "integer", "description": "Port"}, + "dbname": {"type": "string", "description": "Database name"}, + "remote_schema": {"type": "string", "description": "Remote schema name"}, "tables": _table_options_schema, }, "required": ["host", "port", "dbname", "remote_schema"], From 444ab06ca0f2bd561ddcfa0c53670154ae7eff6f Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:44:24 +0000 Subject: [PATCH 07/10] Add a Snowflake data source, backed by a modified SQLAlchemy FDW. * Had to install sqlalchemy-snowflake in the engine outside of the standard Python machinery (not required in sgr) * WHERE clauses are pushed down but joins aren't because of Multicorn. Add an escape hatch in the form of a custom `subquery` parameter that turns a remote query to a mounted table. --- engine/Dockerfile | 4 + splitgraph/ingestion/snowflake/__init__.py | 131 +++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 splitgraph/ingestion/snowflake/__init__.py diff --git a/engine/Dockerfile b/engine/Dockerfile index e02d135b..3241a469 100644 --- a/engine/Dockerfile +++ b/engine/Dockerfile @@ -185,6 +185,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \ pip install "elasticsearch>=7.7.0" COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/pg_es_fdw +# Install the Snowflake SQLAlchemy connector +RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \ + pip install "snowflake-sqlalchemy>=1.2.4" + ENV PATH "${PATH}:/splitgraph/bin" ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw" diff --git a/splitgraph/ingestion/snowflake/__init__.py b/splitgraph/ingestion/snowflake/__init__.py new file mode 100644 index 00000000..24833684 --- /dev/null +++ b/splitgraph/ingestion/snowflake/__init__.py @@ -0,0 +1,131 @@ +import urllib.parse +from typing import Dict, Optional, cast, Mapping + +from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource +from splitgraph.ingestion.common import build_commandline_help + + +class SnowflakeDataSource(ForeignDataWrapperDataSource): + credentials_schema = { + "type": "object", + "properties": { + "username": {"type": "string", "description": "Username"}, + "password": {"type": "string", "description": "Password"}, + "account": { + "type": "string", + "description": "Account Locator, e.g. xy12345.us-east-2.aws. For more information, see https://docs.snowflake.com/en/user-guide/connecting.html", + }, + }, + "required": ["username", "password", "account"], + } + + params_schema = { + "type": "object", + "properties": { + "tables": { + "type": "object", + "additionalProperties": { + "options": {"type": "object", "additionalProperties": {"type": "string"}}, + }, + }, + "database": {"type": "string", "description": "Snowflake database name"}, + "schema": {"type": "string", "description": "Snowflake schema"}, + "warehouse": {"type": "string", "description": "Warehouse name"}, + "role": {"type": "string", "description": "Role"}, + }, + "required": ["database", "schema"], + } + + supports_mount = True + supports_load = True + supports_sync = False + + commandline_help = """Mount a Snowflake database. + +This will mount a remote Snowflake schema or a table. You can also get a mounted table to point to the result of a subquery that will be executed on the Snowflake instance. For example: + +\b +``` +$ sgr mount snowflake test_snowflake -o@- < str: + return "Snowflake" + + @classmethod + def get_description(cls) -> str: + return "Schema, table or a subquery from a Snowflake database" + + def get_table_options(self, table_name: str) -> Mapping[str, str]: + result = cast(Dict[str, str], super().get_table_options(table_name)) + result["tablename"] = result.get("tablename", table_name) + return result + + def get_server_options(self): + options: Dict[str, Optional[str]] = { + "wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw", + } + + # Construct the SQLAlchemy db_url + + db_url = f"snowflake://{self.credentials['username']}:{self.credentials['password']}@{self.credentials['account']}" + + if "database" in self.params: + db_url += f"/{self.params['database']}" + if "schema" in self.params: + db_url += f"/{self.params['schema']}" + + extra_params = {} + if "warehouse" in self.params: + extra_params["warehouse"] = self.params["warehouse"] + if "role" in self.params: + extra_params["role"] = self.params["role"] + + db_url += urllib.parse.urlencode(extra_params) + + options["db_url"] = db_url + + return options + + def get_remote_schema_name(self) -> str: + if "schema" not in self.params: + raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a schema!") + return str(self.params["schema"]) From 7e4f2daae73150fbcbc2ec1d1cd90d325d7095c8 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 18:45:03 +0000 Subject: [PATCH 08/10] Add the Snowflake data source to the config by default. --- splitgraph/config/keys.py | 1 + 1 file changed, 1 insertion(+) diff --git a/splitgraph/config/keys.py b/splitgraph/config/keys.py index 7709fe69..74d775e0 100644 --- a/splitgraph/config/keys.py +++ b/splitgraph/config/keys.py @@ -64,6 +64,7 @@ "socrata": "splitgraph.ingestion.socrata.mount.SocrataDataSource", "elasticsearch": "splitgraph.hooks.data_source.ElasticSearchDataSource", "csv": "splitgraph.ingestion.csv.CSVDataSource", + "snowflake": "splitgraph.ingestion.snowflake.SnowflakeDataSource", }, } From 41db1816b160efcda806d21db360f4c4852b9d98 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 19:19:34 +0000 Subject: [PATCH 09/10] Remove schema from the list of required parameters in Snowflake (if the subquery is set, it can have the schema inside of it). --- splitgraph/ingestion/snowflake/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/splitgraph/ingestion/snowflake/__init__.py b/splitgraph/ingestion/snowflake/__init__.py index 24833684..5fd36ae1 100644 --- a/splitgraph/ingestion/snowflake/__init__.py +++ b/splitgraph/ingestion/snowflake/__init__.py @@ -33,7 +33,7 @@ class SnowflakeDataSource(ForeignDataWrapperDataSource): "warehouse": {"type": "string", "description": "Warehouse name"}, "role": {"type": "string", "description": "Role"}, }, - "required": ["database", "schema"], + "required": ["database"], } supports_mount = True @@ -68,7 +68,7 @@ class SnowflakeDataSource(ForeignDataWrapperDataSource): "n_nation": "varchar", "segment": "varchar", "avg_balance": "numeric" - } + }, "options": { "subquery": "SELECT n_nation AS nation, c_mktsegment AS segment, AVG(c_acctbal) AS avg_balance FROM TPCH_SF100.customer c JOIN TPCH_SF100.nation n ON c_nationkey = n_nationkey" } @@ -80,7 +80,11 @@ class SnowflakeDataSource(ForeignDataWrapperDataSource): """ commandline_kwargs_help: str = ( - build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema) + build_commandline_help(credentials_schema) + + "\n" + + build_commandline_help(params_schema) + + "\n" + + "The schema parameter is required when subquery isn't used." ) def get_fdw_name(self): From 8435e5e7384a6719bf4c2365228fe66e4efb6b41 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Thu, 11 Mar 2021 20:10:40 +0000 Subject: [PATCH 10/10] Do some minor coverage pumping (add a test for the SQLAlchemy dburl conversion), remove the CSV FDW from coverage (is tested inside of the engine) --- .coveragerc | 2 +- test/splitgraph/ingestion/test_snowflake.py | 48 +++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 test/splitgraph/ingestion/test_snowflake.py diff --git a/.coveragerc b/.coveragerc index 9f054259..724893d1 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,6 +1,6 @@ [report] # Tested inside of the actual engine -omit = splitgraph/core/fdw_checkout.py,splitgraph/core/server.py +omit = splitgraph/core/fdw_checkout.py,splitgraph/core/server.py,splitgraph/ingestion/csv/fdw.py # Regexes for lines to exclude from consideration exclude_lines = diff --git a/test/splitgraph/ingestion/test_snowflake.py b/test/splitgraph/ingestion/test_snowflake.py new file mode 100644 index 00000000..f1deb365 --- /dev/null +++ b/test/splitgraph/ingestion/test_snowflake.py @@ -0,0 +1,48 @@ +from unittest.mock import Mock + +from splitgraph.ingestion.snowflake import SnowflakeDataSource + + +def test_snowflake_data_source_dburl_conversion(): + source = SnowflakeDataSource( + Mock(), + credentials={ + "username": "username", + "password": "password", + "account": "abcdef.eu-west-1.aws", + }, + params={ + "database": "SOME_DB", + "schema": "TPCH_SF100", + "warehouse": "my_warehouse", + "role": "role", + }, + ) + + assert source.get_server_options() == { + "db_url": "snowflake://username:password@abcdef.eu-west-1.aws/SOME_DB/TPCH_SF100warehouse=my_warehouse&role=role", + "wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw", + } + + source = SnowflakeDataSource( + Mock(), + credentials={ + "username": "username", + "password": "password", + "account": "abcdef.eu-west-1.aws", + }, + params={ + "database": "SOME_DB", + "tables": { + "test_table": { + "schema": {"col_1": "int", "col_2": "varchar"}, + "options": {"subquery": "SELECT col_1, col_2 FROM other_table"}, + } + }, + }, + ) + + assert source.get_table_options("test_table") == { + "subquery": "SELECT col_1, col_2 FROM other_table", + "tablename": "test_table", + }