From e9224e9fd77f586067005ed5badac07905929805 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 6 Oct 2021 13:14:53 +0100 Subject: [PATCH 1/6] Delete useless `null` type in Socrata JSONSchema #1mz7fnb --- splitgraph/ingestion/socrata/mount.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/splitgraph/ingestion/socrata/mount.py b/splitgraph/ingestion/socrata/mount.py index fdb709e7..b759961c 100644 --- a/splitgraph/ingestion/socrata/mount.py +++ b/splitgraph/ingestion/socrata/mount.py @@ -16,9 +16,7 @@ class SocrataDataSource(ForeignDataWrapperDataSource): credentials_schema = { "type": "object", - "properties": { - "app_token": {"type": ["string", "null"], "description": "Socrata app token, optional"} - }, + "properties": {"app_token": {"type": "string", "description": "Socrata app token"}}, } params_schema = { From 3545013ce4685d8cf0d90c06d510d0b25d95b9aa Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Wed, 6 Oct 2021 17:15:53 +0100 Subject: [PATCH 2/6] FDWs config: fix disjoint unions Push things like auth methods / CSV locations down into sub-objects and accept either set of fields with `oneOf`. Note this isn't complete yet, as we need to figure out how to migrate existing configuration or how to get the data sources to accept/store multiple versions of the config schema. CU-1mz7fdd --- splitgraph/ingestion/csv/__init__.py | 80 +++++++++++++++++++--- splitgraph/ingestion/snowflake/__init__.py | 30 ++++++-- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/splitgraph/ingestion/csv/__init__.py b/splitgraph/ingestion/csv/__init__.py index a035fae8..a43ee92b 100644 --- a/splitgraph/ingestion/csv/__init__.py +++ b/splitgraph/ingestion/csv/__init__.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from psycopg2.sql import SQL, Identifier -from splitgraph.core.types import Credentials, MountError, TableInfo +from splitgraph.core.types import Credentials, MountError, TableInfo, Params from splitgraph.hooks.data_source.fdw import ( ForeignDataWrapperDataSource, import_foreign_schema, @@ -78,16 +78,50 @@ class CSVDataSource(ForeignDataWrapperDataSource): params_schema: Dict[str, Any] = { "type": "object", "properties": { - "url": {"type": "string", "description": "HTTP URL to the CSV file"}, - "s3_endpoint": { - "type": "string", - "description": "S3 endpoint (including port if required)", + "connection": { + "type": "object", + "oneOf": [ + { + "type": "object", + "required": ["connection_type", "url"], + "properties": { + "connection_type": {"type": "string", "const": "http"}, + "url": {"type": "string", "description": "HTTP URL to the CSV file"}, + }, + }, + { + "type": "object", + "required": ["connection_type", "s3_endpoint", "s3_bucket"], + "properties": { + "connection_type": {"type": "string", "const": "s3"}, + "s3_endpoint": { + "type": "string", + "description": "S3 endpoint (including port if required)", + }, + "s3_region": { + "type": "string", + "description": "Region of the S3 bucket", + }, + "s3_secure": { + "type": "boolean", + "description": "Whether to use HTTPS for S3 access", + }, + "s3_bucket": { + "type": "string", + "description": "Bucket the object is in", + }, + "s3_object": { + "type": "string", + "description": "Limit the import to a single object", + }, + "s3_object_prefix": { + "type": "string", + "description": "Prefix for object in S3 bucket", + }, + }, + }, + ], }, - "s3_region": {"type": "string", "description": "Region of the S3 bucket"}, - "s3_secure": {"type": "boolean", "description": "Whether to use HTTPS for S3 access"}, - "s3_bucket": {"type": "string", "description": "Bucket the object is in"}, - "s3_object": {"type": "string", "description": "Limit the import to a single object"}, - "s3_object_prefix": {"type": "string", "description": "Prefix for object in S3 bucket"}, "autodetect_header": { "type": "boolean", "description": "Detect whether the CSV file has a header automatically", @@ -123,7 +157,6 @@ class CSVDataSource(ForeignDataWrapperDataSource): }, "quotechar": {"type": "string", "description": "Character used to quote fields"}, }, - "oneOf": [{"required": ["url"]}, {"required": ["s3_endpoint", "s3_bucket"]}], } table_params_schema: Dict[str, Any] = { @@ -187,6 +220,31 @@ def from_commandline(cls, engine, commandline_kwargs) -> "CSVDataSource": credentials[k] = params[k] return cls(engine, credentials, params) + @classmethod + def migrate_params(cls, params: Params) -> Params: + # TODO figure out how/when/whether to apply this migration to the DB and how data sources + # should define their config migrations. + + if "url" in params: + params["connection"] = {"connection_type": "http", "url": params["url"]} + else: + connection = {"connection_type": "s3"} + for key in [ + "s3_endpoint", + "s3_region", + "s3_secure", + "s3_bucket", + "s3_object", + "s3_object_prefix", + ]: + try: + connection[key] = params.pop(key) + except KeyError: + pass + + params["connection"] = connection + return params + def get_table_options( self, table_name: str, tables: Optional[TableInfo] = None ) -> Dict[str, str]: diff --git a/splitgraph/ingestion/snowflake/__init__.py b/splitgraph/ingestion/snowflake/__init__.py index 71354e66..3a5a0bd9 100644 --- a/splitgraph/ingestion/snowflake/__init__.py +++ b/splitgraph/ingestion/snowflake/__init__.py @@ -34,18 +34,36 @@ class SnowflakeDataSource(ForeignDataWrapperDataSource): "type": "object", "properties": { "username": {"type": "string", "description": "Username"}, - "password": {"type": "string", "description": "Password"}, + "secret": { + "type": "object", + "oneOf": [ + { + "type": "object", + "required": ["secret_type", "password"], + "properties": { + "secret_type": {"type": "string", "const": "password"}, + "password": {"type": "string", "description": "Password"}, + }, + }, + { + "type": "object", + "required": ["secret_type", "private_key"], + "properties": { + "secret_type": {"type": "string", "const": "private_key"}, + "private_key": { + "type": "string", + "description": "Private key in PEM format", + }, + }, + }, + ], + }, "account": { "type": "string", "description": "Account Locator, e.g. xy12345.us-east-2.aws. For more information, see https://docs.snowflake.com/en/user-guide/connecting.html", }, - "private_key": { - "type": "string", - "description": "Private key in PEM format", - }, }, "required": ["username", "account"], - "oneOf": [{"required": ["password"]}, {"required": ["private_key"]}], } params_schema = { From a1b81b97b74becc139535a510fb790405877d19b Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 8 Oct 2021 11:24:38 +0100 Subject: [PATCH 3/6] Use JSONSchema min/max in the Socrata data source. #1gh68kk --- splitgraph/ingestion/socrata/mount.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/splitgraph/ingestion/socrata/mount.py b/splitgraph/ingestion/socrata/mount.py index b759961c..c0754214 100644 --- a/splitgraph/ingestion/socrata/mount.py +++ b/splitgraph/ingestion/socrata/mount.py @@ -28,7 +28,10 @@ class SocrataDataSource(ForeignDataWrapperDataSource): }, "batch_size": { "type": "integer", - "description": "Amount of rows to fetch from Socrata per request (limit parameter). Maximum 50000.", + "description": "Amount of rows to fetch from Socrata per request (limit parameter)", + "minimum": 1, + "default": 1000, + "maximum": 50000, }, }, "required": ["domain"], From 4d9741753619b27a5be63878ed1d38dac0c6dcee Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 8 Oct 2021 15:35:00 +0100 Subject: [PATCH 4/6] Fix Socrata tests (don't pass app_token=None) --- splitgraph/ingestion/socrata/mount.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/splitgraph/ingestion/socrata/mount.py b/splitgraph/ingestion/socrata/mount.py index c0754214..f7477976 100644 --- a/splitgraph/ingestion/socrata/mount.py +++ b/splitgraph/ingestion/socrata/mount.py @@ -70,7 +70,7 @@ def from_commandline(cls, engine, commandline_kwargs) -> "SocrataDataSource": if isinstance(tables, dict) and isinstance(next(iter(tables.values())), str): tables = {k: ([], {"socrata_id": v}) for k, v in tables.items()} - credentials = Credentials({"app_token": params.pop("app_token", None)}) + credentials = Credentials({}) return cls(engine, credentials, params, tables) def get_server_options(self): From 60ec1369809a5422f95aee0cee4265ea99b36843 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 8 Oct 2021 16:10:30 +0100 Subject: [PATCH 5/6] Get the CSV plugin to accept old and new param schemas. Instead of writing a config migration interface for data sources, punt on it and get the CSV data source to attempt to migrate the parameters when it's initialized (see if they have `url` or `s3_object, ...` at toplevel and push them down into a separate `connection` object). We still flatten these back when initializing the FDW, as PostgreSQL only lets us pass a key-string value map (`OPTIONS ...`). --- splitgraph/ingestion/csv/__init__.py | 29 +++++++++++++++++++++------ test/splitgraph/ingestion/test_csv.py | 29 ++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/splitgraph/ingestion/csv/__init__.py b/splitgraph/ingestion/csv/__init__.py index a43ee92b..12e4958f 100644 --- a/splitgraph/ingestion/csv/__init__.py +++ b/splitgraph/ingestion/csv/__init__.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from psycopg2.sql import SQL, Identifier -from splitgraph.core.types import Credentials, MountError, TableInfo, Params +from splitgraph.core.types import Credentials, MountError, Params, TableInfo from splitgraph.hooks.data_source.fdw import ( ForeignDataWrapperDataSource, import_foreign_schema, @@ -200,6 +200,18 @@ class CSVDataSource(ForeignDataWrapperDataSource): build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema) ) + def __init__( + self, + engine: "PsycopgEngine", + credentials: Credentials, + params: Params, + tables: Optional[TableInfo] = None, + ): + # TODO this is a hack to automatically accept both old and new versions of CSV params. + # We might need a more robust data source config migration system. + params = CSVDataSource.migrate_params(params) + super().__init__(engine, credentials, params, tables) + def get_fdw_name(self): return "multicorn" @@ -222,11 +234,10 @@ def from_commandline(cls, engine, commandline_kwargs) -> "CSVDataSource": @classmethod def migrate_params(cls, params: Params) -> Params: - # TODO figure out how/when/whether to apply this migration to the DB and how data sources - # should define their config migrations. - + params = deepcopy(params) if "url" in params: params["connection"] = {"connection_type": "http", "url": params["url"]} + del params["url"] else: connection = {"connection_type": "s3"} for key in [ @@ -316,8 +327,13 @@ def get_server_options(self): "wrapper": "splitgraph.ingestion.csv.fdw.CSVForeignDataWrapper" } for k in self.params_schema["properties"].keys(): + # Flatten the options and extract connection parameters if k in self.params: - options[k] = str(self.params[k]) + if k != "connection": + options[k] = str(self.params[k]) + else: + options.update(self.params[k]) + for k in self.credentials_schema["properties"].keys(): if k in self.credentials: options[k] = str(self.credentials[k]) @@ -361,9 +377,10 @@ def get_raw_url( # Merge the table options to take care of overrides and use them to get URLs # for each table. + server_options = self.get_server_options() for table in tables: table_options = super().get_table_options(table, tables) - full_options = {**self.credentials, **self.params, **table_options} + full_options = {**server_options, **table_options} result[table] = self._get_url(full_options) return result diff --git a/test/splitgraph/ingestion/test_csv.py b/test/splitgraph/ingestion/test_csv.py index 3c2319ce..7f193908 100644 --- a/test/splitgraph/ingestion/test_csv.py +++ b/test/splitgraph/ingestion/test_csv.py @@ -5,7 +5,7 @@ from unittest import mock import pytest -from splitgraph.core.types import MountError, TableColumn, unwrap +from splitgraph.core.types import MountError, Params, TableColumn, unwrap from splitgraph.engine import ResultShape from splitgraph.hooks.s3_server import MINIO from splitgraph.ingestion.common import generate_column_names @@ -37,6 +37,33 @@ } +def test_csv_param_migration(): + assert CSVDataSource.migrate_params( + Params( + { + "s3_endpoint": "objectstorage:9000", + "s3_secure": False, + "s3_bucket": "test_csv", + "delimiter": ",", + } + ) + ) == Params( + { + "delimiter": ",", + "connection": { + "connection_type": "s3", + "s3_endpoint": "objectstorage:9000", + "s3_secure": False, + "s3_bucket": "test_csv", + }, + } + ) + + assert CSVDataSource.migrate_params(Params({"url": "some-url"})) == Params( + {"connection": {"connection_type": "http", "url": "some-url"}} + ) + + def test_csv_introspection_s3(): fdw_options = { "s3_endpoint": "objectstorage:9000", From 0b9a7faf9ac2d85e2ce3d48fc80466ada51d1b3a Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 8 Oct 2021 16:18:04 +0100 Subject: [PATCH 6/6] Get the Snowflake plugin to auto-migrate credentials. Use a similar hack to the CSV plugin (if the `secret` object doesn't exist, convert it before the JSONSchema validation and unpack it back when we're passing FDW params). #1mz7fdd --- splitgraph/ingestion/snowflake/__init__.py | 44 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/splitgraph/ingestion/snowflake/__init__.py b/splitgraph/ingestion/snowflake/__init__.py index 3a5a0bd9..29b0540d 100644 --- a/splitgraph/ingestion/snowflake/__init__.py +++ b/splitgraph/ingestion/snowflake/__init__.py @@ -1,12 +1,16 @@ import base64 import json import urllib.parse -from typing import Any, Dict, Optional +from copy import deepcopy +from typing import TYPE_CHECKING, Any, Dict, Optional -from splitgraph.core.types import TableInfo +from splitgraph.core.types import Credentials, Params, TableInfo from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource from splitgraph.ingestion.common import build_commandline_help +if TYPE_CHECKING: + from splitgraph.engine.postgres.engine import PsycopgEngine + def _encode_private_key(privkey: str): from cryptography.hazmat.backends import default_backend @@ -146,6 +150,18 @@ class SnowflakeDataSource(ForeignDataWrapperDataSource): + "The schema parameter is required when subquery isn't used." ) + def __init__( + self, + engine: "PsycopgEngine", + credentials: Credentials, + params: Params, + tables: Optional[TableInfo] = None, + ): + # TODO this is a hack to automatically accept both old and new versions of CSV params. + # We might need a more robust data source config migration system. + credentials = SnowflakeDataSource.migrate_credentials(credentials) + super().__init__(engine, credentials, params, tables) + def get_fdw_name(self): return "multicorn" @@ -183,19 +199,35 @@ def get_server_options(self): if "batch_size" in self.params: options["batch_size"] = str(self.params["batch_size"]) - if "private_key" in self.credentials: + if self.credentials["secret"]["secret_type"] == "private_key": options["connect_args"] = json.dumps( - {"private_key": _encode_private_key(self.credentials["private_key"])} + {"private_key": _encode_private_key(self.credentials["secret"]["private_key"])} ) return options + @classmethod + def migrate_credentials(cls, credentials: Credentials) -> Credentials: + credentials = deepcopy(credentials) + if "private_key" in credentials: + credentials["secret"] = { + "secret_type": "private_key", + "private_key": credentials.pop("private_key"), + } + elif "password" in credentials: + credentials["secret"] = { + "secret_type": "password", + "password": credentials.pop("password"), + } + + return credentials + def _build_db_url(self) -> str: """Construct the SQLAlchemy Snowflake db_url""" uname = self.credentials["username"] - if "password" in self.credentials: - uname += f":{self.credentials['password']}" + if self.credentials["secret"]["secret_type"] == "password": + uname += f":{self.credentials['secret']['password']}" db_url = f"snowflake://{uname}@{self.credentials['account']}" if "database" in self.params: