Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[report]
# Tested inside of the actual engine
omit = splitgraph/core/fdw_checkout.py,splitgraph/core/server.py
omit = splitgraph/core/fdw_checkout.py,splitgraph/core/server.py,splitgraph/ingestion/csv/fdw.py

# Regexes for lines to exclude from consideration
exclude_lines =
Expand Down
4 changes: 4 additions & 0 deletions engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "elasticsearch>=7.7.0"
COPY ./engine/src/postgres-elasticsearch-fdw/pg_es_fdw /pg_es_fdw/pg_es_fdw

# Install the Snowflake SQLAlchemy connector
RUN --mount=type=cache,id=pip-cache,target=/root/.cache/pip \
pip install "snowflake-sqlalchemy>=1.2.4"

ENV PATH "${PATH}:/splitgraph/bin"
ENV PYTHONPATH "${PYTHONPATH}:/splitgraph:/pg_es_fdw"

Expand Down
2 changes: 1 addition & 1 deletion engine/src/Multicorn
1 change: 1 addition & 0 deletions splitgraph/config/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"socrata": "splitgraph.ingestion.socrata.mount.SocrataDataSource",
"elasticsearch": "splitgraph.hooks.data_source.ElasticSearchDataSource",
"csv": "splitgraph.ingestion.csv.CSVDataSource",
"snowflake": "splitgraph.ingestion.snowflake.SnowflakeDataSource",
},
}

Expand Down
2 changes: 1 addition & 1 deletion splitgraph/core/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def conn_string_to_dict(connection: Optional[str]) -> Dict[str, Any]:
password=match.group(3),
)
else:
return dict(server=None, port=None, username=None, password=None)
return {}


def parse_dt(string: str) -> datetime:
Expand Down
16 changes: 8 additions & 8 deletions splitgraph/hooks/data_source/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def from_commandline(cls, engine, commandline_kwargs) -> "ForeignDataWrapperData
else:
tables = None

credentials = {
"username": params.pop("username"),
"password": params.pop("password"),
}
credentials: Dict[str, Any] = {}
for k in cast(Dict[str, Any], cls.credentials_schema["properties"]).keys():
if k in params:
credentials[k] = params[k]
result = cls(engine, credentials, params)
result.tables = tables
return result
Expand Down Expand Up @@ -345,10 +345,10 @@ def get_description(cls) -> str:
params_schema = {
"type": "object",
"properties": {
"host": {"type": "string"},
"port": {"type": "integer"},
"dbname": {"type": "string"},
"remote_schema": {"type": "string"},
"host": {"type": "string", "description": "Remote hostname"},
"port": {"type": "integer", "description": "Port"},
"dbname": {"type": "string", "description": "Database name"},
"remote_schema": {"type": "string", "description": "Remote schema name"},
"tables": _table_options_schema,
},
"required": ["host", "port", "dbname", "remote_schema"],
Expand Down
79 changes: 75 additions & 4 deletions splitgraph/ingestion/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from abc import abstractmethod
from typing import Optional, Union
from typing import Optional, Union, Dict, List, Tuple

from psycopg2.sql import SQL, Identifier

from splitgraph.core.image import Image
from splitgraph.core.repository import Repository
from splitgraph.core.types import TableSchema
from splitgraph.core.sql import POSTGRES_MAX_IDENTIFIER
from splitgraph.core.types import TableSchema, TableColumn
from splitgraph.engine.postgres.engine import PsycopgEngine
from splitgraph.exceptions import CheckoutError

Expand Down Expand Up @@ -99,7 +100,7 @@ def to_table(
if_exists: str = "patch",
schema_check: bool = True,
no_header: bool = False,
**kwargs
**kwargs,
):
tmp_schema = repository.to_schema()

Expand Down Expand Up @@ -170,7 +171,7 @@ def to_data(
image: Optional[Union[Image, str]] = None,
repository: Optional[Repository] = None,
use_lq: bool = False,
**kwargs
**kwargs,
):
if image is None:
if repository is None:
Expand All @@ -195,3 +196,73 @@ def to_data(
# (won't download objects unless needed).
with image.query_schema() as tmp_schema:
return self.query_to_data(image.engine, query, tmp_schema, **kwargs)


def dedupe_sg_schema(schema_spec: TableSchema, prefix_len: int = 59) -> TableSchema:
"""
Some foreign schemas have columns that are longer than 63 characters
where the first 63 characters are the same between several columns
(e.g. odn.data.socrata.com). This routine renames columns in a schema
to make sure this can't happen (by giving duplicates a number suffix).
"""

# We truncate the column name to 59 to leave space for the underscore
# and 3 digits (max PG identifier is 63 chars)
prefix_counts: Dict[str, int] = {}
columns_nums: List[Tuple[str, int]] = []

for column in schema_spec:
column_short = column.name[:prefix_len]
count = prefix_counts.get(column_short, 0)
columns_nums.append((column_short, count))
prefix_counts[column_short] = count + 1

result = []
for (_, position), column in zip(columns_nums, schema_spec):
column_short = column.name[:prefix_len]
count = prefix_counts[column_short]
if count > 1:
result.append(
TableColumn(
column.ordinal,
f"{column_short}_{position:03d}",
column.pg_type,
column.is_pk,
column.comment,
)
)
else:
result.append(
TableColumn(
column.ordinal,
column.name[:POSTGRES_MAX_IDENTIFIER],
column.pg_type,
column.is_pk,
column.comment,
)
)
return result


def _format_jsonschema(prop, schema, required):
if prop == "tables":
return """tables: Tables to mount (default all). If a list, will import only these tables.
If a dictionary, must have the format
{"table_name": {"schema": {"col_1": "type_1", ...},
"options": {[get passed to CREATE FOREIGN TABLE]}}}."""
parts = [f"{prop}:"]
if "description" in schema:
parts.append(schema["description"])
if parts[-1][-1] != ".":
parts[-1] += "."

if prop in required:
parts.append("Required.")
return " ".join(parts)


def build_commandline_help(json_schema):
required = json_schema.get("required", [])
return "\n".join(
_format_jsonschema(p, pd, required) for p, pd in json_schema["properties"].items()
)
23 changes: 5 additions & 18 deletions splitgraph/ingestion/csv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from psycopg2.sql import SQL, Identifier

from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
from splitgraph.ingestion.common import IngestionAdapter
from splitgraph.ingestion.common import IngestionAdapter, build_commandline_help

if TYPE_CHECKING:
from splitgraph.engine.postgres.engine import PsycopgEngine
Expand Down Expand Up @@ -119,6 +119,7 @@ class CSVDataSource(ForeignDataWrapperDataSource):

For example:

\b
```
sgr mount csv target_schema -o@- <<EOF
{
Expand All @@ -134,23 +135,9 @@ class CSVDataSource(ForeignDataWrapperDataSource):
```
"""

commandline_kwargs_help: str = """url: HTTP URL (either the URL or S3 parameters are required)
s3_endpoint: S3 host and port (required)
s3_secure: Use SSL (default true)
s3_access_key: S3 access key (optional)
s3_secret_key: S3 secret key (optional)
s3_bucket: S3 bucket name (required)
s3_object_prefix: Prefix for object IDs to mount (optional)
autodetect_header: Detect whether the CSV file has a header automatically
autodetect_dialect: Detect the file's separator, quoting characters etc. automatically
header: Treats the first line as a header
separator: Override the character used as a CSV separator
quotechar: Override the character used as a CSV quoting character
tables: Objects to mount (default all). If a list, will import only these objects.
If a dictionary, must have the format
{"table_name": {"schema": {"col_1": "type_1", ...},
"options": {[get passed to CREATE FOREIGN TABLE]}}}.
"""
commandline_kwargs_help: str = (
build_commandline_help(credentials_schema) + "\n" + build_commandline_help(params_schema)
)

def get_fdw_name(self):
return "multicorn"
Expand Down
135 changes: 135 additions & 0 deletions splitgraph/ingestion/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import urllib.parse
from typing import Dict, Optional, cast, Mapping

from splitgraph.hooks.data_source.fdw import ForeignDataWrapperDataSource
from splitgraph.ingestion.common import build_commandline_help


class SnowflakeDataSource(ForeignDataWrapperDataSource):
credentials_schema = {
"type": "object",
"properties": {
"username": {"type": "string", "description": "Username"},
"password": {"type": "string", "description": "Password"},
"account": {
"type": "string",
"description": "Account Locator, e.g. xy12345.us-east-2.aws. For more information, see https://docs.snowflake.com/en/user-guide/connecting.html",
},
},
"required": ["username", "password", "account"],
}

params_schema = {
"type": "object",
"properties": {
"tables": {
"type": "object",
"additionalProperties": {
"options": {"type": "object", "additionalProperties": {"type": "string"}},
},
},
"database": {"type": "string", "description": "Snowflake database name"},
"schema": {"type": "string", "description": "Snowflake schema"},
"warehouse": {"type": "string", "description": "Warehouse name"},
"role": {"type": "string", "description": "Role"},
},
"required": ["database"],
}

supports_mount = True
supports_load = True
supports_sync = False

commandline_help = """Mount a Snowflake database.

This will mount a remote Snowflake schema or a table. You can also get a mounted table to point to the result of a subquery that will be executed on the Snowflake instance. For example:

\b
```
$ sgr mount snowflake test_snowflake -o@- <<EOF
{
"username": "username",
"password": "password",
"account": "acc-id.west-europe.azure",
"database": "SNOWFLAKE_SAMPLE_DATA",
"schema": "TPCH_SF100"
}
EOF

$ sgr mount snowflake test_snowflake_subquery -o@- <<EOF
{
"username": "username",
"password": "password",
"account": "acc-id.west-europe.azure",
"database": "SNOWFLAKE_SAMPLE_DATA",
"tables": {
"balances": {
"schema": {
"n_nation": "varchar",
"segment": "varchar",
"avg_balance": "numeric"
},
"options": {
"subquery": "SELECT n_nation AS nation, c_mktsegment AS segment, AVG(c_acctbal) AS avg_balance FROM TPCH_SF100.customer c JOIN TPCH_SF100.nation n ON c_nationkey = n_nationkey"
}
}
}
}
EOF
```
"""

commandline_kwargs_help: str = (
build_commandline_help(credentials_schema)
+ "\n"
+ build_commandline_help(params_schema)
+ "\n"
+ "The schema parameter is required when subquery isn't used."
)

def get_fdw_name(self):
return "multicorn"

@classmethod
def get_name(cls) -> str:
return "Snowflake"

@classmethod
def get_description(cls) -> str:
return "Schema, table or a subquery from a Snowflake database"

def get_table_options(self, table_name: str) -> Mapping[str, str]:
result = cast(Dict[str, str], super().get_table_options(table_name))
result["tablename"] = result.get("tablename", table_name)
return result

def get_server_options(self):
options: Dict[str, Optional[str]] = {
"wrapper": "multicorn.sqlalchemyfdw.SqlAlchemyFdw",
}

# Construct the SQLAlchemy db_url

db_url = f"snowflake://{self.credentials['username']}:{self.credentials['password']}@{self.credentials['account']}"

if "database" in self.params:
db_url += f"/{self.params['database']}"
if "schema" in self.params:
db_url += f"/{self.params['schema']}"

extra_params = {}
if "warehouse" in self.params:
extra_params["warehouse"] = self.params["warehouse"]
if "role" in self.params:
extra_params["role"] = self.params["role"]

db_url += urllib.parse.urlencode(extra_params)

options["db_url"] = db_url

return options

def get_remote_schema_name(self) -> str:
if "schema" not in self.params:
raise ValueError("Cannot IMPORT FOREIGN SCHEMA without a schema!")
return str(self.params["schema"])
Loading